solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7        banking_stage::BankingStage,
8        banking_trace::{self, BankingTracer, TraceError},
9        cluster_info_vote_listener::VoteTracker,
10        completed_data_sets_service::CompletedDataSetsService,
11        consensus::{
12            reconcile_blockstore_roots_with_external_source,
13            tower_storage::{NullTowerStorage, TowerStorage},
14            ExternalRootSource, Tower,
15        },
16        repair::{
17            self,
18            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19            repair_handler::RepairHandlerType,
20            serve_repair_service::ServeRepairService,
21        },
22        sample_performance_service::SamplePerformanceService,
23        sigverify,
24        snapshot_packager_service::SnapshotPackagerService,
25        stats_reporter_service::StatsReporterService,
26        system_monitor_service::{
27            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28        },
29        tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30        tvu::{Tvu, TvuConfig, TvuSockets},
31    },
32    anyhow::{anyhow, Context, Result},
33    crossbeam_channel::{bounded, unbounded, Receiver},
34    quinn::Endpoint,
35    solana_accounts_db::{
36        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37        accounts_update_notifier_interface::AccountsUpdateNotifier,
38        hardened_unpack::{
39            open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40        },
41        utils::move_and_async_delete_path_contents,
42    },
43    solana_client::connection_cache::{ConnectionCache, Protocol},
44    solana_clock::Slot,
45    solana_cluster_type::ClusterType,
46    solana_entry::poh::compute_hash_time,
47    solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
48    solana_genesis_config::GenesisConfig,
49    solana_geyser_plugin_manager::{
50        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
51    },
52    solana_gossip::{
53        cluster_info::{
54            ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
55            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
56        },
57        contact_info::ContactInfo,
58        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
59        gossip_service::GossipService,
60        node::{Node, NodeMultihoming},
61    },
62    solana_hard_forks::HardForks,
63    solana_hash::Hash,
64    solana_keypair::Keypair,
65    solana_ledger::{
66        bank_forks_utils,
67        blockstore::{
68            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
69            MAX_REPLAY_WAKE_UP_SIGNALS,
70        },
71        blockstore_metric_report_service::BlockstoreMetricReportService,
72        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
73        blockstore_processor::{self, TransactionStatusSender},
74        entry_notifier_interface::EntryNotifierArc,
75        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
76        leader_schedule::FixedSchedule,
77        leader_schedule_cache::LeaderScheduleCache,
78        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
79    },
80    solana_measure::measure::Measure,
81    solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
82    solana_poh::{
83        poh_recorder::PohRecorder,
84        poh_service::{self, PohService},
85        transaction_recorder::TransactionRecorder,
86    },
87    solana_pubkey::Pubkey,
88    solana_rayon_threadlimit::get_thread_count,
89    solana_rpc::{
90        max_slots::MaxSlots,
91        optimistically_confirmed_bank_tracker::{
92            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
93            OptimisticallyConfirmedBankTracker,
94        },
95        rpc::JsonRpcConfig,
96        rpc_completed_slots_service::RpcCompletedSlotsService,
97        rpc_pubsub_service::{PubSubConfig, PubSubService},
98        rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
99        rpc_subscriptions::RpcSubscriptions,
100        transaction_notifier_interface::TransactionNotifierArc,
101        transaction_status_service::TransactionStatusService,
102    },
103    solana_runtime::{
104        accounts_background_service::{
105            AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
106            PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
107        },
108        bank::Bank,
109        bank_forks::BankForks,
110        commitment::BlockCommitmentCache,
111        dependency_tracker::DependencyTracker,
112        prioritization_fee_cache::PrioritizationFeeCache,
113        runtime_config::RuntimeConfig,
114        snapshot_archive_info::SnapshotArchiveInfoGetter,
115        snapshot_bank_utils,
116        snapshot_config::SnapshotConfig,
117        snapshot_controller::SnapshotController,
118        snapshot_hash::StartingSnapshotHashes,
119        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, SnapshotInterval},
120    },
121    solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
122    solana_shred_version::compute_shred_version,
123    solana_signer::Signer,
124    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
125    solana_time_utils::timestamp,
126    solana_tpu_client::tpu_client::{
127        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
128    },
129    solana_turbine::{
130        self,
131        broadcast_stage::BroadcastStageType,
132        xdp::{XdpConfig, XdpRetransmitter},
133    },
134    solana_unified_scheduler_pool::DefaultSchedulerPool,
135    solana_validator_exit::Exit,
136    solana_vote_program::vote_state,
137    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
138    std::{
139        borrow::Cow,
140        collections::{HashMap, HashSet},
141        net::SocketAddr,
142        num::NonZeroUsize,
143        path::{Path, PathBuf},
144        sync::{
145            atomic::{AtomicBool, AtomicU64, Ordering},
146            Arc, Mutex, RwLock,
147        },
148        thread::{sleep, Builder, JoinHandle},
149        time::{Duration, Instant},
150    },
151    strum::VariantNames,
152    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
153    thiserror::Error,
154    tokio::runtime::Runtime as TokioRuntime,
155    tokio_util::sync::CancellationToken,
156};
157
158const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
159const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
160// Right now since we reuse the wait for supermajority code, the
161// following threshold should always greater than or equal to
162// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
163const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
164    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
165
166#[derive(
167    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
168)]
169#[strum(serialize_all = "kebab-case")]
170pub enum BlockVerificationMethod {
171    BlockstoreProcessor,
172    #[default]
173    UnifiedScheduler,
174}
175
176impl BlockVerificationMethod {
177    pub const fn cli_names() -> &'static [&'static str] {
178        Self::VARIANTS
179    }
180
181    pub fn cli_message() -> &'static str {
182        "Switch transaction scheduling method for verifying ledger entries"
183    }
184}
185
186#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
187#[strum(serialize_all = "kebab-case")]
188pub enum BlockProductionMethod {
189    CentralScheduler,
190    #[default]
191    CentralSchedulerGreedy,
192}
193
194impl BlockProductionMethod {
195    pub const fn cli_names() -> &'static [&'static str] {
196        Self::VARIANTS
197    }
198
199    pub fn cli_message() -> &'static str {
200        "Switch transaction scheduling method for producing ledger entries"
201    }
202}
203
204#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
205#[strum(serialize_all = "kebab-case")]
206pub enum TransactionStructure {
207    Sdk,
208    #[default]
209    View,
210}
211
212impl TransactionStructure {
213    pub const fn cli_names() -> &'static [&'static str] {
214        Self::VARIANTS
215    }
216
217    pub fn cli_message() -> &'static str {
218        "Switch internal transaction structure/representation"
219    }
220}
221
222/// Configuration for the block generator invalidator for replay.
223#[derive(Clone, Debug)]
224pub struct GeneratorConfig {
225    pub accounts_path: String,
226    pub starting_keypairs: Arc<Vec<Keypair>>,
227}
228
229pub struct ValidatorConfig {
230    pub halt_at_slot: Option<Slot>,
231    pub expected_genesis_hash: Option<Hash>,
232    pub expected_bank_hash: Option<Hash>,
233    pub expected_shred_version: Option<u16>,
234    pub voting_disabled: bool,
235    pub account_paths: Vec<PathBuf>,
236    pub account_snapshot_paths: Vec<PathBuf>,
237    pub rpc_config: JsonRpcConfig,
238    /// Specifies which plugins to start up with
239    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
240    pub geyser_plugin_always_enabled: bool,
241    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
242    pub pubsub_config: PubSubConfig,
243    pub snapshot_config: SnapshotConfig,
244    pub max_ledger_shreds: Option<u64>,
245    pub blockstore_options: BlockstoreOptions,
246    pub broadcast_stage_type: BroadcastStageType,
247    pub turbine_disabled: Arc<AtomicBool>,
248    pub fixed_leader_schedule: Option<FixedSchedule>,
249    pub wait_for_supermajority: Option<Slot>,
250    pub new_hard_forks: Option<Vec<Slot>>,
251    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
252    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
253    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
254    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
255    pub max_genesis_archive_unpacked_size: u64,
256    /// Run PoH, transaction signature and other transaction verifications during blockstore
257    /// processing.
258    pub run_verification: bool,
259    pub require_tower: bool,
260    pub tower_storage: Arc<dyn TowerStorage>,
261    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
262    pub contact_debug_interval: u64,
263    pub contact_save_interval: u64,
264    pub send_transaction_service_config: SendTransactionServiceConfig,
265    pub no_poh_speed_test: bool,
266    pub no_os_memory_stats_reporting: bool,
267    pub no_os_network_stats_reporting: bool,
268    pub no_os_cpu_stats_reporting: bool,
269    pub no_os_disk_stats_reporting: bool,
270    pub poh_pinned_cpu_core: usize,
271    pub poh_hashes_per_batch: u64,
272    pub process_ledger_before_services: bool,
273    pub accounts_db_config: Option<AccountsDbConfig>,
274    pub warp_slot: Option<Slot>,
275    pub accounts_db_skip_shrink: bool,
276    pub accounts_db_force_initial_clean: bool,
277    pub tpu_coalesce: Duration,
278    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
279    pub validator_exit: Arc<RwLock<Exit>>,
280    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
281    pub no_wait_for_vote_to_start_leader: bool,
282    pub wait_to_vote_slot: Option<Slot>,
283    pub runtime_config: RuntimeConfig,
284    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
285    pub block_verification_method: BlockVerificationMethod,
286    pub block_production_method: BlockProductionMethod,
287    pub block_production_num_workers: NonZeroUsize,
288    pub transaction_struct: TransactionStructure,
289    pub enable_block_production_forwarding: bool,
290    pub generator_config: Option<GeneratorConfig>,
291    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
292    pub wen_restart_proto_path: Option<PathBuf>,
293    pub wen_restart_coordinator: Option<Pubkey>,
294    pub unified_scheduler_handler_threads: Option<usize>,
295    pub ip_echo_server_threads: NonZeroUsize,
296    pub rayon_global_threads: NonZeroUsize,
297    pub replay_forks_threads: NonZeroUsize,
298    pub replay_transactions_threads: NonZeroUsize,
299    pub tvu_shred_sigverify_threads: NonZeroUsize,
300    pub delay_leader_block_for_pending_fork: bool,
301    pub use_tpu_client_next: bool,
302    pub retransmit_xdp: Option<XdpConfig>,
303    pub repair_handler_type: RepairHandlerType,
304}
305
306impl ValidatorConfig {
307    pub fn default_for_test() -> Self {
308        let max_thread_count =
309            NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
310
311        Self {
312            halt_at_slot: None,
313            expected_genesis_hash: None,
314            expected_bank_hash: None,
315            expected_shred_version: None,
316            voting_disabled: false,
317            max_ledger_shreds: None,
318            blockstore_options: BlockstoreOptions::default_for_tests(),
319            account_paths: Vec::new(),
320            account_snapshot_paths: Vec::new(),
321            rpc_config: JsonRpcConfig::default_for_test(),
322            on_start_geyser_plugin_config_files: None,
323            geyser_plugin_always_enabled: false,
324            rpc_addrs: None,
325            pubsub_config: PubSubConfig::default(),
326            snapshot_config: SnapshotConfig::new_load_only(),
327            broadcast_stage_type: BroadcastStageType::Standard,
328            turbine_disabled: Arc::<AtomicBool>::default(),
329            fixed_leader_schedule: None,
330            wait_for_supermajority: None,
331            new_hard_forks: None,
332            known_validators: None,
333            repair_validators: None,
334            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
335            gossip_validators: None,
336            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
337            run_verification: true,
338            require_tower: false,
339            tower_storage: Arc::new(NullTowerStorage::default()),
340            debug_keys: None,
341            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
342            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
343            send_transaction_service_config: SendTransactionServiceConfig::default(),
344            no_poh_speed_test: true,
345            no_os_memory_stats_reporting: true,
346            no_os_network_stats_reporting: true,
347            no_os_cpu_stats_reporting: true,
348            no_os_disk_stats_reporting: true,
349            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
350            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
351            process_ledger_before_services: false,
352            warp_slot: None,
353            accounts_db_skip_shrink: false,
354            accounts_db_force_initial_clean: false,
355            tpu_coalesce: DEFAULT_TPU_COALESCE,
356            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
357            validator_exit: Arc::new(RwLock::new(Exit::default())),
358            validator_exit_backpressure: HashMap::default(),
359            no_wait_for_vote_to_start_leader: true,
360            accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
361            wait_to_vote_slot: None,
362            runtime_config: RuntimeConfig::default(),
363            banking_trace_dir_byte_limit: 0,
364            block_verification_method: BlockVerificationMethod::default(),
365            block_production_method: BlockProductionMethod::default(),
366            block_production_num_workers: BankingStage::default_num_workers(),
367            transaction_struct: TransactionStructure::default(),
368            // enable forwarding by default for tests
369            enable_block_production_forwarding: true,
370            generator_config: None,
371            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
372            wen_restart_proto_path: None,
373            wen_restart_coordinator: None,
374            unified_scheduler_handler_threads: None,
375            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
376            rayon_global_threads: max_thread_count,
377            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
378            replay_transactions_threads: max_thread_count,
379            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
380                .expect("thread count is non-zero"),
381            delay_leader_block_for_pending_fork: false,
382            use_tpu_client_next: true,
383            retransmit_xdp: None,
384            repair_handler_type: RepairHandlerType::default(),
385        }
386    }
387
388    pub fn enable_default_rpc_block_subscribe(&mut self) {
389        let pubsub_config = PubSubConfig {
390            enable_block_subscription: true,
391            ..PubSubConfig::default()
392        };
393        let rpc_config = JsonRpcConfig {
394            enable_rpc_transaction_history: true,
395            ..JsonRpcConfig::default_for_test()
396        };
397
398        self.pubsub_config = pubsub_config;
399        self.rpc_config = rpc_config;
400    }
401}
402
403// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
404// the admin RPC channel to help them to follow the general progress of node startup without
405// having to watch log messages.
406#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
407pub enum ValidatorStartProgress {
408    Initializing, // Catch all, default state
409    SearchingForRpcService,
410    DownloadingSnapshot {
411        slot: Slot,
412        rpc_addr: SocketAddr,
413    },
414    CleaningBlockStore,
415    CleaningAccounts,
416    LoadingLedger,
417    ProcessingLedger {
418        slot: Slot,
419        max_slot: Slot,
420    },
421    StartingServices,
422    Halted, // Validator halted due to `--dev-halt-at-slot` argument
423    WaitingForSupermajority {
424        slot: Slot,
425        gossip_stake_percent: u64,
426    },
427
428    // `Running` is the terminal state once the validator fully starts and all services are
429    // operational
430    Running,
431}
432
433impl Default for ValidatorStartProgress {
434    fn default() -> Self {
435        Self::Initializing
436    }
437}
438
439struct BlockstoreRootScan {
440    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
441}
442
443impl BlockstoreRootScan {
444    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
445        let thread = if config.rpc_addrs.is_some()
446            && config.rpc_config.enable_rpc_transaction_history
447            && config.rpc_config.rpc_scan_and_fix_roots
448        {
449            Some(
450                Builder::new()
451                    .name("solBStoreRtScan".to_string())
452                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
453                    .unwrap(),
454            )
455        } else {
456            None
457        };
458        Self { thread }
459    }
460
461    fn join(self) {
462        if let Some(blockstore_root_scan) = self.thread {
463            if let Err(err) = blockstore_root_scan.join() {
464                warn!("blockstore_root_scan failed to join {err:?}");
465            }
466        }
467    }
468}
469
470#[derive(Default)]
471struct TransactionHistoryServices {
472    transaction_status_sender: Option<TransactionStatusSender>,
473    transaction_status_service: Option<TransactionStatusService>,
474    max_complete_transaction_status_slot: Arc<AtomicU64>,
475}
476
477/// A struct easing passing Validator TPU Configurations
478pub struct ValidatorTpuConfig {
479    /// Controls if to use QUIC for sending regular TPU transaction
480    pub use_quic: bool,
481    /// Controls if to use QUIC for sending TPU votes
482    pub vote_use_quic: bool,
483    /// Controls the connection cache pool size
484    pub tpu_connection_pool_size: usize,
485    /// Controls if to enable UDP for TPU tansactions.
486    pub tpu_enable_udp: bool,
487    /// QUIC server config for regular TPU
488    pub tpu_quic_server_config: QuicServerParams,
489    /// QUIC server config for TPU forward
490    pub tpu_fwd_quic_server_config: QuicServerParams,
491    /// QUIC server config for Vote
492    pub vote_quic_server_config: QuicServerParams,
493}
494
495impl ValidatorTpuConfig {
496    /// A convenient function to build a ValidatorTpuConfig for testing with good
497    /// default.
498    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
499        let tpu_quic_server_config = QuicServerParams {
500            max_connections_per_ipaddr_per_min: 32,
501            coalesce_channel_size: 100_000, // smaller channel size for faster test
502            ..Default::default()
503        };
504
505        let tpu_fwd_quic_server_config = QuicServerParams {
506            max_connections_per_ipaddr_per_min: 32,
507            max_unstaked_connections: 0,
508            coalesce_channel_size: 100_000, // smaller channel size for faster test
509            ..Default::default()
510        };
511
512        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
513        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
514
515        ValidatorTpuConfig {
516            use_quic: DEFAULT_TPU_USE_QUIC,
517            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
518            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
519            tpu_enable_udp,
520            tpu_quic_server_config,
521            tpu_fwd_quic_server_config,
522            vote_quic_server_config,
523        }
524    }
525}
526
527pub struct Validator {
528    validator_exit: Arc<RwLock<Exit>>,
529    json_rpc_service: Option<JsonRpcService>,
530    pubsub_service: Option<PubSubService>,
531    rpc_completed_slots_service: Option<JoinHandle<()>>,
532    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
533    transaction_status_service: Option<TransactionStatusService>,
534    entry_notifier_service: Option<EntryNotifierService>,
535    system_monitor_service: Option<SystemMonitorService>,
536    sample_performance_service: Option<SamplePerformanceService>,
537    stats_reporter_service: StatsReporterService,
538    gossip_service: GossipService,
539    serve_repair_service: ServeRepairService,
540    completed_data_sets_service: Option<CompletedDataSetsService>,
541    snapshot_packager_service: Option<SnapshotPackagerService>,
542    poh_recorder: Arc<RwLock<PohRecorder>>,
543    poh_service: PohService,
544    tpu: Tpu,
545    tvu: Tvu,
546    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
547    pub cluster_info: Arc<ClusterInfo>,
548    pub bank_forks: Arc<RwLock<BankForks>>,
549    pub blockstore: Arc<Blockstore>,
550    geyser_plugin_service: Option<GeyserPluginService>,
551    blockstore_metric_report_service: BlockstoreMetricReportService,
552    accounts_background_service: AccountsBackgroundService,
553    turbine_quic_endpoint: Option<Endpoint>,
554    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
555    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
556    repair_quic_endpoints: Option<[Endpoint; 3]>,
557    repair_quic_endpoints_runtime: Option<TokioRuntime>,
558    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
559    xdp_retransmitter: Option<XdpRetransmitter>,
560    // This runtime is used to run the client owned by SendTransactionService.
561    // We don't wait for its JoinHandle here because ownership and shutdown
562    // are managed elsewhere. This variable is intentionally unused.
563    _tpu_client_next_runtime: Option<TokioRuntime>,
564}
565
566impl Validator {
567    #[allow(clippy::too_many_arguments)]
568    pub fn new(
569        mut node: Node,
570        identity_keypair: Arc<Keypair>,
571        ledger_path: &Path,
572        vote_account: &Pubkey,
573        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
574        cluster_entrypoints: Vec<ContactInfo>,
575        config: &ValidatorConfig,
576        should_check_duplicate_instance: bool,
577        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
578        start_progress: Arc<RwLock<ValidatorStartProgress>>,
579        socket_addr_space: SocketAddrSpace,
580        tpu_config: ValidatorTpuConfig,
581        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
582    ) -> Result<Self> {
583        #[cfg(debug_assertions)]
584        const DEBUG_ASSERTION_STATUS: &str = "enabled";
585        #[cfg(not(debug_assertions))]
586        const DEBUG_ASSERTION_STATUS: &str = "disabled";
587        info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
588
589        let ValidatorTpuConfig {
590            use_quic,
591            vote_use_quic,
592            tpu_connection_pool_size,
593            tpu_enable_udp,
594            tpu_quic_server_config,
595            tpu_fwd_quic_server_config,
596            vote_quic_server_config,
597        } = tpu_config;
598
599        let start_time = Instant::now();
600
601        // Initialize the global rayon pool first to ensure the value in config
602        // is honored. Otherwise, some code accessing the global pool could
603        // cause it to get initialized with Rayon's default (not ours)
604        if rayon::ThreadPoolBuilder::new()
605            .thread_name(|i| format!("solRayonGlob{i:02}"))
606            .num_threads(config.rayon_global_threads.get())
607            .build_global()
608            .is_err()
609        {
610            warn!("Rayon global thread pool already initialized");
611        }
612
613        let id = identity_keypair.pubkey();
614        assert_eq!(&id, node.info.pubkey());
615
616        info!("identity pubkey: {id}");
617        info!("vote account pubkey: {vote_account}");
618
619        if !config.no_os_network_stats_reporting {
620            verify_net_stats_access().map_err(|e| {
621                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
622            })?;
623        }
624
625        let mut bank_notification_senders = Vec::new();
626
627        let exit = Arc::new(AtomicBool::new(false));
628
629        let geyser_plugin_config_files = config
630            .on_start_geyser_plugin_config_files
631            .as_ref()
632            .map(Cow::Borrowed)
633            .or_else(|| {
634                config
635                    .geyser_plugin_always_enabled
636                    .then_some(Cow::Owned(vec![]))
637            });
638        let geyser_plugin_service =
639            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
640                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
641                bank_notification_senders.push(confirmed_bank_sender);
642                let rpc_to_plugin_manager_receiver_and_exit =
643                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
644                Some(
645                    GeyserPluginService::new_with_receiver(
646                        confirmed_bank_receiver,
647                        config.geyser_plugin_always_enabled,
648                        geyser_plugin_config_files.as_ref(),
649                        rpc_to_plugin_manager_receiver_and_exit,
650                    )
651                    .map_err(|err| {
652                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
653                    })?,
654                )
655            } else {
656                None
657            };
658
659        if config.voting_disabled {
660            warn!("voting disabled");
661            authorized_voter_keypairs.write().unwrap().clear();
662        } else {
663            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
664                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
665            }
666        }
667
668        for cluster_entrypoint in &cluster_entrypoints {
669            info!("entrypoint: {cluster_entrypoint:?}");
670        }
671
672        if solana_perf::perf_libs::api().is_some() {
673            info!("Initializing sigverify, this could take a while...");
674        } else {
675            info!("Initializing sigverify...");
676        }
677        sigverify::init();
678        info!("Initializing sigverify done.");
679
680        if !ledger_path.is_dir() {
681            return Err(anyhow!(
682                "ledger directory does not exist or is not accessible: {ledger_path:?}"
683            ));
684        }
685        let genesis_config = load_genesis(config, ledger_path)?;
686        metrics_config_sanity_check(genesis_config.cluster_type)?;
687
688        info!("Cleaning accounts paths..");
689        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
690        let mut timer = Measure::start("clean_accounts_paths");
691        cleanup_accounts_paths(config);
692        timer.stop();
693        info!("Cleaning accounts paths done. {timer}");
694
695        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
696        snapshot_utils::purge_old_bank_snapshots_at_startup(
697            &config.snapshot_config.bank_snapshots_dir,
698        );
699
700        info!("Cleaning orphaned account snapshot directories..");
701        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
702        clean_orphaned_account_snapshot_dirs(
703            &config.snapshot_config.bank_snapshots_dir,
704            &config.account_snapshot_paths,
705        )
706        .context("failed to clean orphaned account snapshot directories")?;
707        timer.stop();
708        info!("Cleaning orphaned account snapshot directories done. {timer}");
709
710        // token used to cancel tpu-client-next.
711        let cancel_tpu_client_next = CancellationToken::new();
712        {
713            let exit = exit.clone();
714            config
715                .validator_exit
716                .write()
717                .unwrap()
718                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
719            let cancel_tpu_client_next = cancel_tpu_client_next.clone();
720            config
721                .validator_exit
722                .write()
723                .unwrap()
724                .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
725        }
726
727        let (
728            accounts_update_notifier,
729            transaction_notifier,
730            entry_notifier,
731            block_metadata_notifier,
732            slot_status_notifier,
733        ) = if let Some(service) = &geyser_plugin_service {
734            (
735                service.get_accounts_update_notifier(),
736                service.get_transaction_notifier(),
737                service.get_entry_notifier(),
738                service.get_block_metadata_notifier(),
739                service.get_slot_status_notifier(),
740            )
741        } else {
742            (None, None, None, None, None)
743        };
744
745        info!(
746            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
747             entry_notifier: {}",
748            accounts_update_notifier.is_some(),
749            transaction_notifier.is_some(),
750            entry_notifier.is_some()
751        );
752
753        let system_monitor_service = Some(SystemMonitorService::new(
754            exit.clone(),
755            SystemMonitorStatsReportConfig {
756                report_os_memory_stats: !config.no_os_memory_stats_reporting,
757                report_os_network_stats: !config.no_os_network_stats_reporting,
758                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
759                report_os_disk_stats: !config.no_os_disk_stats_reporting,
760            },
761        ));
762
763        let dependency_tracker = Arc::new(DependencyTracker::default());
764
765        let (
766            bank_forks,
767            blockstore,
768            original_blockstore_root,
769            ledger_signal_receiver,
770            leader_schedule_cache,
771            starting_snapshot_hashes,
772            TransactionHistoryServices {
773                transaction_status_sender,
774                transaction_status_service,
775                max_complete_transaction_status_slot,
776            },
777            blockstore_process_options,
778            blockstore_root_scan,
779            pruned_banks_receiver,
780            entry_notifier_service,
781        ) = load_blockstore(
782            config,
783            ledger_path,
784            &genesis_config,
785            exit.clone(),
786            &start_progress,
787            accounts_update_notifier,
788            transaction_notifier,
789            entry_notifier,
790            config
791                .rpc_addrs
792                .is_some()
793                .then(|| dependency_tracker.clone()),
794        )
795        .map_err(ValidatorError::Other)?;
796
797        if !config.no_poh_speed_test {
798            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
799        }
800
801        let (root_slot, hard_forks) = {
802            let root_bank = bank_forks.read().unwrap().root_bank();
803            (root_bank.slot(), root_bank.hard_forks())
804        };
805        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
806        info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
807
808        if let Some(expected_shred_version) = config.expected_shred_version {
809            if expected_shred_version != shred_version {
810                return Err(ValidatorError::ShredVersionMismatch {
811                    actual: shred_version,
812                    expected: expected_shred_version,
813                }
814                .into());
815            }
816        }
817
818        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
819            config,
820            &blockstore,
821            root_slot,
822            &hard_forks,
823        )? {
824            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
825            cleanup_blockstore_incorrect_shred_versions(
826                &blockstore,
827                config,
828                start_slot,
829                shred_version,
830            )?;
831        } else {
832            info!("Skipping the blockstore check for shreds with incorrect version");
833        }
834
835        node.info.set_shred_version(shred_version);
836        node.info.set_wallclock(timestamp());
837        Self::print_node_info(&node);
838
839        let mut cluster_info = ClusterInfo::new(
840            node.info.clone(),
841            identity_keypair.clone(),
842            socket_addr_space,
843        );
844        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
845        cluster_info.set_entrypoints(cluster_entrypoints);
846        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
847        cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
848        let cluster_info = Arc::new(cluster_info);
849        let node_multihoming = NodeMultihoming::from(&node);
850
851        assert!(is_snapshot_config_valid(&config.snapshot_config));
852
853        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
854        let snapshot_controller = Arc::new(SnapshotController::new(
855            snapshot_request_sender.clone(),
856            config.snapshot_config.clone(),
857            bank_forks.read().unwrap().root(),
858        ));
859
860        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
861        let snapshot_packager_service = if snapshot_controller
862            .snapshot_config()
863            .should_generate_snapshots()
864        {
865            let exit_backpressure = config
866                .validator_exit_backpressure
867                .get(SnapshotPackagerService::NAME)
868                .cloned();
869            let enable_gossip_push = true;
870            let snapshot_packager_service = SnapshotPackagerService::new(
871                pending_snapshot_packages.clone(),
872                starting_snapshot_hashes,
873                exit.clone(),
874                exit_backpressure,
875                cluster_info.clone(),
876                snapshot_controller.clone(),
877                enable_gossip_push,
878            );
879            Some(snapshot_packager_service)
880        } else {
881            None
882        };
883
884        let snapshot_request_handler = SnapshotRequestHandler {
885            snapshot_controller: snapshot_controller.clone(),
886            snapshot_request_receiver,
887            pending_snapshot_packages,
888        };
889        let pruned_banks_request_handler = PrunedBanksRequestHandler {
890            pruned_banks_receiver,
891        };
892        let accounts_background_service = AccountsBackgroundService::new(
893            bank_forks.clone(),
894            exit.clone(),
895            AbsRequestHandlers {
896                snapshot_request_handler,
897                pruned_banks_request_handler,
898            },
899        );
900        info!(
901            "Using: block-verification-method: {}, block-production-method: {}, \
902             transaction-structure: {}",
903            config.block_verification_method,
904            config.block_production_method,
905            config.transaction_struct
906        );
907
908        let (replay_vote_sender, replay_vote_receiver) = unbounded();
909
910        // block min prioritization fee cache should be readable by RPC, and writable by validator
911        // (by both replay stage and banking stage)
912        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
913
914        let leader_schedule_cache = Arc::new(leader_schedule_cache);
915        let startup_verification_complete;
916        let (mut poh_recorder, entry_receiver) = {
917            let bank = &bank_forks.read().unwrap().working_bank();
918            startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
919            PohRecorder::new_with_clear_signal(
920                bank.tick_height(),
921                bank.last_blockhash(),
922                bank.clone(),
923                None,
924                bank.ticks_per_slot(),
925                config.delay_leader_block_for_pending_fork,
926                blockstore.clone(),
927                blockstore.get_new_shred_signal(0),
928                &leader_schedule_cache,
929                &genesis_config.poh_config,
930                exit.clone(),
931            )
932        };
933        if transaction_status_sender.is_some() {
934            poh_recorder.track_transaction_indexes();
935        }
936        let (record_sender, record_receiver) = unbounded();
937        let transaction_recorder =
938            TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
939        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
940
941        let (banking_tracer, tracer_thread) =
942            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
943                &blockstore.banking_trace_path(),
944                exit.clone(),
945                config.banking_trace_dir_byte_limit,
946            )))?;
947        if banking_tracer.is_enabled() {
948            info!(
949                "Enabled banking trace (dir_byte_limit: {})",
950                config.banking_trace_dir_byte_limit
951            );
952        } else {
953            info!("Disabled banking trace");
954        }
955        let banking_tracer_channels = banking_tracer.create_channels(false);
956
957        match &config.block_verification_method {
958            BlockVerificationMethod::BlockstoreProcessor => {
959                info!("no scheduler pool is installed for block verification...");
960                if let Some(count) = config.unified_scheduler_handler_threads {
961                    warn!(
962                        "--unified-scheduler-handler-threads={count} is ignored because unified \
963                         scheduler isn't enabled"
964                    );
965                }
966            }
967            BlockVerificationMethod::UnifiedScheduler => {
968                let scheduler_pool = DefaultSchedulerPool::new_dyn(
969                    config.unified_scheduler_handler_threads,
970                    config.runtime_config.log_messages_bytes_limit,
971                    transaction_status_sender.clone(),
972                    Some(replay_vote_sender.clone()),
973                    prioritization_fee_cache.clone(),
974                );
975                bank_forks
976                    .write()
977                    .unwrap()
978                    .install_scheduler_pool(scheduler_pool);
979            }
980        }
981
982        let entry_notification_sender = entry_notifier_service
983            .as_ref()
984            .map(|service| service.sender());
985        let mut process_blockstore = ProcessBlockStore::new(
986            &id,
987            vote_account,
988            &start_progress,
989            &blockstore,
990            original_blockstore_root,
991            &bank_forks,
992            &leader_schedule_cache,
993            &blockstore_process_options,
994            transaction_status_sender.as_ref(),
995            entry_notification_sender,
996            blockstore_root_scan,
997            &snapshot_controller,
998            config,
999        );
1000
1001        maybe_warp_slot(
1002            config,
1003            &mut process_blockstore,
1004            ledger_path,
1005            &bank_forks,
1006            &leader_schedule_cache,
1007            &snapshot_controller,
1008        )
1009        .map_err(ValidatorError::Other)?;
1010
1011        if config.process_ledger_before_services {
1012            process_blockstore
1013                .process()
1014                .map_err(ValidatorError::Other)?;
1015        }
1016        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1017
1018        let sample_performance_service =
1019            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1020                Some(SamplePerformanceService::new(
1021                    &bank_forks,
1022                    blockstore.clone(),
1023                    exit.clone(),
1024                ))
1025            } else {
1026                None
1027            };
1028
1029        let mut block_commitment_cache = BlockCommitmentCache::default();
1030        let bank_forks_guard = bank_forks.read().unwrap();
1031        block_commitment_cache.initialize_slots(
1032            bank_forks_guard.working_bank().slot(),
1033            bank_forks_guard.root(),
1034        );
1035        drop(bank_forks_guard);
1036        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1037
1038        let optimistically_confirmed_bank =
1039            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1040
1041        let max_slots = Arc::new(MaxSlots::default());
1042
1043        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044
1045        let mut tpu_transactions_forwards_client =
1046            Some(node.sockets.tpu_transaction_forwarding_client);
1047
1048        let connection_cache = match (config.use_tpu_client_next, use_quic) {
1049            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1050                "connection_cache_tpu_quic",
1051                tpu_connection_pool_size,
1052                Some(
1053                    tpu_transactions_forwards_client
1054                        .take()
1055                        .expect("Socket should exist."),
1056                ),
1057                Some((
1058                    &identity_keypair,
1059                    node.info
1060                        .tpu(Protocol::UDP)
1061                        .ok_or_else(|| {
1062                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1063                        })?
1064                        .ip(),
1065                )),
1066                Some((&staked_nodes, &identity_keypair.pubkey())),
1067            ))),
1068            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1069                "connection_cache_tpu_udp",
1070                tpu_connection_pool_size,
1071            ))),
1072            (true, _) => None,
1073        };
1074
1075        let vote_connection_cache = if vote_use_quic {
1076            let vote_connection_cache = ConnectionCache::new_with_client_options(
1077                "connection_cache_vote_quic",
1078                tpu_connection_pool_size,
1079                Some(node.sockets.quic_vote_client),
1080                Some((
1081                    &identity_keypair,
1082                    node.info
1083                        .tpu_vote(Protocol::QUIC)
1084                        .ok_or_else(|| {
1085                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1086                        })?
1087                        .ip(),
1088                )),
1089                Some((&staked_nodes, &identity_keypair.pubkey())),
1090            );
1091            Arc::new(vote_connection_cache)
1092        } else {
1093            Arc::new(ConnectionCache::with_udp(
1094                "connection_cache_vote_udp",
1095                tpu_connection_pool_size,
1096            ))
1097        };
1098
1099        // test-validator crate may start the validator in a tokio runtime
1100        // context which forces us to use the same runtime because a nested
1101        // runtime will cause panic at drop. Outside test-validator crate, we
1102        // always need a tokio runtime (and the respective handle) to initialize
1103        // the turbine QUIC endpoint.
1104        let current_runtime_handle = tokio::runtime::Handle::try_current();
1105        let tpu_client_next_runtime =
1106            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1107                tokio::runtime::Builder::new_multi_thread()
1108                    .enable_all()
1109                    .worker_threads(2)
1110                    .thread_name("solTpuClientRt")
1111                    .build()
1112                    .unwrap()
1113            });
1114
1115        let rpc_override_health_check =
1116            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1117        let (
1118            json_rpc_service,
1119            rpc_subscriptions,
1120            pubsub_service,
1121            completed_data_sets_sender,
1122            completed_data_sets_service,
1123            rpc_completed_slots_service,
1124            optimistically_confirmed_bank_tracker,
1125            bank_notification_sender,
1126        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1127            assert_eq!(
1128                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1129                node.info
1130                    .rpc_pubsub()
1131                    .map(|addr| socket_addr_space.check(&addr))
1132            );
1133            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1134            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1135                Some(Arc::new(RwLock::new(bank_notification_senders)))
1136            } else {
1137                None
1138            };
1139
1140            let client_option = if config.use_tpu_client_next {
1141                let runtime_handle = tpu_client_next_runtime
1142                    .as_ref()
1143                    .map(TokioRuntime::handle)
1144                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1145                ClientOption::TpuClientNext(
1146                    Arc::as_ref(&identity_keypair),
1147                    node.sockets.rpc_sts_client,
1148                    runtime_handle.clone(),
1149                    cancel_tpu_client_next.clone(),
1150                )
1151            } else {
1152                let Some(connection_cache) = &connection_cache else {
1153                    panic!("ConnectionCache should exist by construction.");
1154                };
1155                ClientOption::ConnectionCache(connection_cache.clone())
1156            };
1157            let rpc_svc_config = JsonRpcServiceConfig {
1158                rpc_addr,
1159                rpc_config: config.rpc_config.clone(),
1160                snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1161                bank_forks: bank_forks.clone(),
1162                block_commitment_cache: block_commitment_cache.clone(),
1163                blockstore: blockstore.clone(),
1164                cluster_info: cluster_info.clone(),
1165                poh_recorder: Some(poh_recorder.clone()),
1166                genesis_hash: genesis_config.hash(),
1167                ledger_path: ledger_path.to_path_buf(),
1168                validator_exit: config.validator_exit.clone(),
1169                exit: exit.clone(),
1170                override_health_check: rpc_override_health_check.clone(),
1171                startup_verification_complete,
1172                optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1173                send_transaction_service_config: config.send_transaction_service_config.clone(),
1174                max_slots: max_slots.clone(),
1175                leader_schedule_cache: leader_schedule_cache.clone(),
1176                max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1177                prioritization_fee_cache: prioritization_fee_cache.clone(),
1178                client_option,
1179            };
1180            let json_rpc_service =
1181                JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1182            let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1183                exit.clone(),
1184                max_complete_transaction_status_slot,
1185                blockstore.clone(),
1186                bank_forks.clone(),
1187                block_commitment_cache.clone(),
1188                optimistically_confirmed_bank.clone(),
1189                &config.pubsub_config,
1190                None,
1191            ));
1192            let pubsub_service = if !config.rpc_config.full_api {
1193                None
1194            } else {
1195                let (trigger, pubsub_service) = PubSubService::new(
1196                    config.pubsub_config.clone(),
1197                    &rpc_subscriptions,
1198                    rpc_pubsub_addr,
1199                );
1200                config
1201                    .validator_exit
1202                    .write()
1203                    .unwrap()
1204                    .register_exit(Box::new(move || trigger.cancel()));
1205
1206                Some(pubsub_service)
1207            };
1208
1209            let (completed_data_sets_sender, completed_data_sets_service) =
1210                if !config.rpc_config.full_api {
1211                    (None, None)
1212                } else {
1213                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1214                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1215                    let completed_data_sets_service = CompletedDataSetsService::new(
1216                        completed_data_sets_receiver,
1217                        blockstore.clone(),
1218                        rpc_subscriptions.clone(),
1219                        exit.clone(),
1220                        max_slots.clone(),
1221                    );
1222                    (
1223                        Some(completed_data_sets_sender),
1224                        Some(completed_data_sets_service),
1225                    )
1226                };
1227
1228            let rpc_completed_slots_service =
1229                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1230                    let (completed_slots_sender, completed_slots_receiver) =
1231                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1232                    blockstore.add_completed_slots_signal(completed_slots_sender);
1233
1234                    Some(RpcCompletedSlotsService::spawn(
1235                        completed_slots_receiver,
1236                        rpc_subscriptions.clone(),
1237                        slot_status_notifier.clone(),
1238                        exit.clone(),
1239                    ))
1240                } else {
1241                    None
1242                };
1243
1244            let dependency_tracker = transaction_status_sender
1245                .is_some()
1246                .then_some(dependency_tracker);
1247            let optimistically_confirmed_bank_tracker =
1248                Some(OptimisticallyConfirmedBankTracker::new(
1249                    bank_notification_receiver,
1250                    exit.clone(),
1251                    bank_forks.clone(),
1252                    optimistically_confirmed_bank,
1253                    rpc_subscriptions.clone(),
1254                    confirmed_bank_subscribers,
1255                    prioritization_fee_cache.clone(),
1256                    dependency_tracker.clone(),
1257                ));
1258            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1259                sender: bank_notification_sender,
1260                should_send_parents: geyser_plugin_service.is_some(),
1261                dependency_tracker,
1262            });
1263            (
1264                Some(json_rpc_service),
1265                Some(rpc_subscriptions),
1266                pubsub_service,
1267                completed_data_sets_sender,
1268                completed_data_sets_service,
1269                rpc_completed_slots_service,
1270                optimistically_confirmed_bank_tracker,
1271                bank_notification_sender_config,
1272            )
1273        } else {
1274            (None, None, None, None, None, None, None, None)
1275        };
1276
1277        if config.halt_at_slot.is_some() {
1278            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1279            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1280            block_commitment_cache
1281                .write()
1282                .unwrap()
1283                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1284
1285            // Park with the RPC service running, ready for inspection!
1286            warn!("Validator halted");
1287            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1288            std::thread::park();
1289        }
1290        let ip_echo_server = match node.sockets.ip_echo {
1291            None => None,
1292            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1293                tcp_listener,
1294                config.ip_echo_server_threads,
1295                Some(node.info.shred_version()),
1296            )),
1297        };
1298
1299        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1300
1301        let stats_reporter_service =
1302            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1303
1304        let gossip_service = GossipService::new(
1305            &cluster_info,
1306            Some(bank_forks.clone()),
1307            node.sockets.gossip.clone(),
1308            config.gossip_validators.clone(),
1309            should_check_duplicate_instance,
1310            Some(stats_reporter_sender.clone()),
1311            exit.clone(),
1312        );
1313        let serve_repair = config.repair_handler_type.create_serve_repair(
1314            blockstore.clone(),
1315            cluster_info.clone(),
1316            bank_forks.read().unwrap().sharable_root_bank(),
1317            config.repair_whitelist.clone(),
1318        );
1319        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1320        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1321        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1322            unbounded();
1323
1324        let waited_for_supermajority = wait_for_supermajority(
1325            config,
1326            Some(&mut process_blockstore),
1327            &bank_forks,
1328            &cluster_info,
1329            rpc_override_health_check,
1330            &start_progress,
1331        )?;
1332
1333        let blockstore_metric_report_service =
1334            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1335
1336        let wait_for_vote_to_start_leader =
1337            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1338
1339        let poh_service = PohService::new(
1340            poh_recorder.clone(),
1341            &genesis_config.poh_config,
1342            exit.clone(),
1343            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1344            config.poh_pinned_cpu_core,
1345            config.poh_hashes_per_batch,
1346            record_receiver,
1347        );
1348        assert_eq!(
1349            blockstore.get_new_shred_signals_len(),
1350            1,
1351            "New shred signal for the TVU should be the same as the clear bank signal."
1352        );
1353
1354        let vote_tracker = Arc::<VoteTracker>::default();
1355
1356        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1357        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1358        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1359        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1360
1361        let entry_notification_sender = entry_notifier_service
1362            .as_ref()
1363            .map(|service| service.sender_cloned());
1364
1365        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367            .then(|| {
1368                tokio::runtime::Builder::new_multi_thread()
1369                    .enable_all()
1370                    .thread_name("solTurbineQuic")
1371                    .build()
1372                    .unwrap()
1373            });
1374        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375        let (
1376            turbine_quic_endpoint,
1377            turbine_quic_endpoint_sender,
1378            turbine_quic_endpoint_join_handle,
1379        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381            (None, sender, None)
1382        } else {
1383            solana_turbine::quic_endpoint::new_quic_endpoint(
1384                turbine_quic_endpoint_runtime
1385                    .as_ref()
1386                    .map(TokioRuntime::handle)
1387                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388                &identity_keypair,
1389                node.sockets.tvu_quic,
1390                turbine_quic_endpoint_sender,
1391                bank_forks.clone(),
1392            )
1393            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394            .unwrap()
1395        };
1396
1397        // Repair quic endpoint.
1398        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400            .then(|| {
1401                tokio::runtime::Builder::new_multi_thread()
1402                    .enable_all()
1403                    .thread_name("solRepairQuic")
1404                    .build()
1405                    .unwrap()
1406            });
1407        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409                (None, RepairQuicAsyncSenders::new_dummy(), None)
1410            } else {
1411                let repair_quic_sockets = RepairQuicSockets {
1412                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1413                    repair_client_quic_socket: node.sockets.repair_quic,
1414                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415                };
1416                let repair_quic_senders = RepairQuicSenders {
1417                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1418                    repair_response_quic_sender,
1419                    ancestor_hashes_response_quic_sender,
1420                };
1421                repair::quic_endpoint::new_quic_endpoints(
1422                    repair_quic_endpoints_runtime
1423                        .as_ref()
1424                        .map(TokioRuntime::handle)
1425                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426                    &identity_keypair,
1427                    repair_quic_sockets,
1428                    repair_quic_senders,
1429                    bank_forks.clone(),
1430                )
1431                .map(|(endpoints, senders, join_handle)| {
1432                    (Some(endpoints), senders, Some(join_handle))
1433                })
1434                .unwrap()
1435            };
1436        let serve_repair_service = ServeRepairService::new(
1437            serve_repair,
1438            // Incoming UDP repair requests are adapted into RemoteRequest
1439            // and also sent through the same channel.
1440            repair_request_quic_sender,
1441            repair_request_quic_receiver,
1442            repair_quic_async_senders.repair_response_quic_sender,
1443            node.sockets.serve_repair,
1444            socket_addr_space,
1445            stats_reporter_sender,
1446            exit.clone(),
1447        );
1448
1449        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1450        let wen_restart_repair_slots = if in_wen_restart {
1451            Some(Arc::new(RwLock::new(Vec::new())))
1452        } else {
1453            None
1454        };
1455        let tower = match process_blockstore.process_to_create_tower() {
1456            Ok(tower) => {
1457                info!("Tower state: {tower:?}");
1458                tower
1459            }
1460            Err(e) => {
1461                warn!("Unable to retrieve tower: {e:?} creating default tower....");
1462                Tower::default()
1463            }
1464        };
1465        let last_vote = tower.last_vote();
1466
1467        let outstanding_repair_requests =
1468            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1469        let cluster_slots =
1470            Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1471
1472        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
1473        let connection_cache_for_warmup =
1474            if json_rpc_service.is_some() && connection_cache.is_some() {
1475                connection_cache.as_ref()
1476            } else {
1477                None
1478            };
1479        let (xdp_retransmitter, xdp_sender) =
1480            if let Some(xdp_config) = config.retransmit_xdp.clone() {
1481                let src_port = node.sockets.retransmit_sockets[0]
1482                    .local_addr()
1483                    .expect("failed to get local address")
1484                    .port();
1485                let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port)
1486                    .expect("failed to create xdp retransmitter");
1487                (Some(rtx), Some(sender))
1488            } else {
1489                (None, None)
1490            };
1491
1492        // disable all2all tests if not allowed for a given cluster type
1493        let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1494            || genesis_config.cluster_type == ClusterType::Development
1495        {
1496            node.sockets.alpenglow
1497        } else {
1498            None
1499        };
1500
1501        let tvu = Tvu::new(
1502            vote_account,
1503            authorized_voter_keypairs,
1504            &bank_forks,
1505            &cluster_info,
1506            TvuSockets {
1507                repair: node.sockets.repair.try_clone().unwrap(),
1508                retransmit: node.sockets.retransmit_sockets,
1509                fetch: node.sockets.tvu,
1510                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1511                alpenglow: alpenglow_socket,
1512            },
1513            blockstore.clone(),
1514            ledger_signal_receiver,
1515            rpc_subscriptions.clone(),
1516            &poh_recorder,
1517            tower,
1518            config.tower_storage.clone(),
1519            &leader_schedule_cache,
1520            exit.clone(),
1521            block_commitment_cache,
1522            config.turbine_disabled.clone(),
1523            transaction_status_sender.clone(),
1524            entry_notification_sender.clone(),
1525            vote_tracker.clone(),
1526            retransmit_slots_sender,
1527            gossip_verified_vote_hash_receiver,
1528            verified_vote_receiver,
1529            replay_vote_sender.clone(),
1530            completed_data_sets_sender,
1531            bank_notification_sender.clone(),
1532            duplicate_confirmed_slots_receiver,
1533            TvuConfig {
1534                max_ledger_shreds: config.max_ledger_shreds,
1535                shred_version: node.info.shred_version(),
1536                repair_validators: config.repair_validators.clone(),
1537                repair_whitelist: config.repair_whitelist.clone(),
1538                wait_for_vote_to_start_leader,
1539                replay_forks_threads: config.replay_forks_threads,
1540                replay_transactions_threads: config.replay_transactions_threads,
1541                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1542                xdp_sender: xdp_sender.clone(),
1543            },
1544            &max_slots,
1545            block_metadata_notifier,
1546            config.wait_to_vote_slot,
1547            Some(snapshot_controller.clone()),
1548            config.runtime_config.log_messages_bytes_limit,
1549            connection_cache_for_warmup,
1550            &prioritization_fee_cache,
1551            banking_tracer.clone(),
1552            turbine_quic_endpoint_sender.clone(),
1553            turbine_quic_endpoint_receiver,
1554            repair_response_quic_receiver,
1555            repair_quic_async_senders.repair_request_quic_sender,
1556            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1557            ancestor_hashes_response_quic_receiver,
1558            outstanding_repair_requests.clone(),
1559            cluster_slots.clone(),
1560            wen_restart_repair_slots.clone(),
1561            slot_status_notifier,
1562            vote_connection_cache,
1563        )
1564        .map_err(ValidatorError::Other)?;
1565
1566        if in_wen_restart {
1567            info!("Waiting for wen_restart to finish");
1568            wait_for_wen_restart(WenRestartConfig {
1569                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1570                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1571                last_vote,
1572                blockstore: blockstore.clone(),
1573                cluster_info: cluster_info.clone(),
1574                bank_forks: bank_forks.clone(),
1575                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1576                wait_for_supermajority_threshold_percent:
1577                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1578                snapshot_controller: Some(snapshot_controller.clone()),
1579                abs_status: accounts_background_service.status().clone(),
1580                genesis_config_hash: genesis_config.hash(),
1581                exit: exit.clone(),
1582            })?;
1583            return Err(ValidatorError::WenRestartFinished.into());
1584        }
1585
1586        let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1587        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1588            ForwardingClientOption::ConnectionCache(connection_cache.clone())
1589        } else {
1590            let runtime_handle = tpu_client_next_runtime
1591                .as_ref()
1592                .map(TokioRuntime::handle)
1593                .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1594            ForwardingClientOption::TpuClientNext((
1595                Arc::as_ref(&identity_keypair),
1596                tpu_transactions_forwards_client
1597                    .take()
1598                    .expect("Socket should exist."),
1599                runtime_handle.clone(),
1600                cancel_tpu_client_next,
1601            ))
1602        };
1603        let tpu = Tpu::new_with_client(
1604            &cluster_info,
1605            &poh_recorder,
1606            transaction_recorder,
1607            entry_receiver,
1608            retransmit_slots_receiver,
1609            TpuSockets {
1610                transactions: node.sockets.tpu,
1611                transaction_forwards: node.sockets.tpu_forwards,
1612                vote: node.sockets.tpu_vote,
1613                broadcast: node.sockets.broadcast,
1614                transactions_quic: node.sockets.tpu_quic,
1615                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1616                vote_quic: node.sockets.tpu_vote_quic,
1617                vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1618                vortexor_receivers: node.sockets.vortexor_receivers,
1619            },
1620            rpc_subscriptions.clone(),
1621            transaction_status_sender,
1622            entry_notification_sender,
1623            blockstore.clone(),
1624            &config.broadcast_stage_type,
1625            xdp_sender,
1626            exit,
1627            node.info.shred_version(),
1628            vote_tracker,
1629            bank_forks.clone(),
1630            verified_vote_sender,
1631            gossip_verified_vote_hash_sender,
1632            replay_vote_receiver,
1633            replay_vote_sender,
1634            bank_notification_sender,
1635            config.tpu_coalesce,
1636            duplicate_confirmed_slot_sender,
1637            forwarding_tpu_client,
1638            turbine_quic_endpoint_sender,
1639            &identity_keypair,
1640            config.runtime_config.log_messages_bytes_limit,
1641            &staked_nodes,
1642            config.staked_nodes_overrides.clone(),
1643            banking_tracer_channels,
1644            tracer_thread,
1645            tpu_enable_udp,
1646            tpu_quic_server_config,
1647            tpu_fwd_quic_server_config,
1648            vote_quic_server_config,
1649            &prioritization_fee_cache,
1650            config.block_production_method.clone(),
1651            config.block_production_num_workers,
1652            config.transaction_struct.clone(),
1653            config.enable_block_production_forwarding,
1654            config.generator_config.clone(),
1655            key_notifiers.clone(),
1656        );
1657
1658        datapoint_info!(
1659            "validator-new",
1660            ("id", id.to_string(), String),
1661            ("version", solana_version::version!(), String),
1662            ("cluster_type", genesis_config.cluster_type as u32, i64),
1663            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1664            ("waited_for_supermajority", waited_for_supermajority, bool),
1665            ("shred_version", shred_version as i64, i64),
1666        );
1667
1668        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1669        if config.use_tpu_client_next {
1670            if let Some(json_rpc_service) = &json_rpc_service {
1671                key_notifiers.write().unwrap().add(
1672                    KeyUpdaterType::RpcService,
1673                    json_rpc_service.get_client_key_updater(),
1674                );
1675            }
1676            // note, that we don't need to add ConnectionClient to key_notifiers
1677            // because it is added inside Tpu.
1678        }
1679
1680        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1681            bank_forks: bank_forks.clone(),
1682            cluster_info: cluster_info.clone(),
1683            vote_account: *vote_account,
1684            repair_whitelist: config.repair_whitelist.clone(),
1685            notifies: key_notifiers,
1686            repair_socket: Arc::new(node.sockets.repair),
1687            outstanding_repair_requests,
1688            cluster_slots,
1689            node: Some(Arc::new(node_multihoming)),
1690        });
1691
1692        Ok(Self {
1693            stats_reporter_service,
1694            gossip_service,
1695            serve_repair_service,
1696            json_rpc_service,
1697            pubsub_service,
1698            rpc_completed_slots_service,
1699            optimistically_confirmed_bank_tracker,
1700            transaction_status_service,
1701            entry_notifier_service,
1702            system_monitor_service,
1703            sample_performance_service,
1704            snapshot_packager_service,
1705            completed_data_sets_service,
1706            tpu,
1707            tvu,
1708            poh_service,
1709            poh_recorder,
1710            ip_echo_server,
1711            validator_exit: config.validator_exit.clone(),
1712            cluster_info,
1713            bank_forks,
1714            blockstore,
1715            geyser_plugin_service,
1716            blockstore_metric_report_service,
1717            accounts_background_service,
1718            turbine_quic_endpoint,
1719            turbine_quic_endpoint_runtime,
1720            turbine_quic_endpoint_join_handle,
1721            repair_quic_endpoints,
1722            repair_quic_endpoints_runtime,
1723            repair_quic_endpoints_join_handle,
1724            xdp_retransmitter,
1725            _tpu_client_next_runtime: tpu_client_next_runtime,
1726        })
1727    }
1728
1729    // Used for notifying many nodes in parallel to exit
1730    pub fn exit(&mut self) {
1731        self.validator_exit.write().unwrap().exit();
1732
1733        // drop all signals in blockstore
1734        self.blockstore.drop_signal();
1735    }
1736
1737    pub fn close(mut self) {
1738        self.exit();
1739        self.join();
1740    }
1741
1742    fn print_node_info(node: &Node) {
1743        info!("{:?}", node.info);
1744        info!(
1745            "local gossip address: {}",
1746            node.sockets.gossip[0].local_addr().unwrap()
1747        );
1748        info!(
1749            "local broadcast address: {}",
1750            node.sockets
1751                .broadcast
1752                .first()
1753                .unwrap()
1754                .local_addr()
1755                .unwrap()
1756        );
1757        info!(
1758            "local repair address: {}",
1759            node.sockets.repair.local_addr().unwrap()
1760        );
1761        info!(
1762            "local retransmit address: {}",
1763            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1764        );
1765    }
1766
1767    pub fn join(self) {
1768        drop(self.bank_forks);
1769        drop(self.cluster_info);
1770
1771        self.poh_service.join().expect("poh_service");
1772        drop(self.poh_recorder);
1773
1774        if let Some(json_rpc_service) = self.json_rpc_service {
1775            json_rpc_service.join().expect("rpc_service");
1776        }
1777
1778        if let Some(pubsub_service) = self.pubsub_service {
1779            pubsub_service.join().expect("pubsub_service");
1780        }
1781
1782        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1783            rpc_completed_slots_service
1784                .join()
1785                .expect("rpc_completed_slots_service");
1786        }
1787
1788        if let Some(optimistically_confirmed_bank_tracker) =
1789            self.optimistically_confirmed_bank_tracker
1790        {
1791            optimistically_confirmed_bank_tracker
1792                .join()
1793                .expect("optimistically_confirmed_bank_tracker");
1794        }
1795
1796        if let Some(transaction_status_service) = self.transaction_status_service {
1797            transaction_status_service
1798                .join()
1799                .expect("transaction_status_service");
1800        }
1801
1802        if let Some(system_monitor_service) = self.system_monitor_service {
1803            system_monitor_service
1804                .join()
1805                .expect("system_monitor_service");
1806        }
1807
1808        if let Some(sample_performance_service) = self.sample_performance_service {
1809            sample_performance_service
1810                .join()
1811                .expect("sample_performance_service");
1812        }
1813
1814        if let Some(entry_notifier_service) = self.entry_notifier_service {
1815            entry_notifier_service
1816                .join()
1817                .expect("entry_notifier_service");
1818        }
1819
1820        if let Some(s) = self.snapshot_packager_service {
1821            s.join().expect("snapshot_packager_service");
1822        }
1823
1824        self.gossip_service.join().expect("gossip_service");
1825        self.repair_quic_endpoints
1826            .iter()
1827            .flatten()
1828            .for_each(repair::quic_endpoint::close_quic_endpoint);
1829        self.serve_repair_service
1830            .join()
1831            .expect("serve_repair_service");
1832        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1833            self.repair_quic_endpoints_runtime
1834                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1835                .transpose()
1836                .unwrap();
1837        }
1838        self.stats_reporter_service
1839            .join()
1840            .expect("stats_reporter_service");
1841        self.blockstore_metric_report_service
1842            .join()
1843            .expect("ledger_metric_report_service");
1844        self.accounts_background_service
1845            .join()
1846            .expect("accounts_background_service");
1847        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1848            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1849        }
1850        if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1851            xdp_retransmitter.join().expect("xdp_retransmitter");
1852        }
1853        self.tpu.join().expect("tpu");
1854        self.tvu.join().expect("tvu");
1855        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1856            self.turbine_quic_endpoint_runtime
1857                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1858                .transpose()
1859                .unwrap();
1860        }
1861        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1862            completed_data_sets_service
1863                .join()
1864                .expect("completed_data_sets_service");
1865        }
1866        if let Some(ip_echo_server) = self.ip_echo_server {
1867            ip_echo_server.shutdown_background();
1868        }
1869
1870        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1871            geyser_plugin_service.join().expect("geyser_plugin_service");
1872        }
1873    }
1874}
1875
1876fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1877    if let Some(account) = &bank.get_account(vote_account) {
1878        if let Some(vote_state) = vote_state::from(account) {
1879            return !vote_state.votes.is_empty();
1880        }
1881    }
1882    false
1883}
1884
1885fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1886    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1887        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1888        return Ok(());
1889    };
1890
1891    let ticks_per_slot = bank.ticks_per_slot();
1892    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1893    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1894
1895    let hash_time = compute_hash_time(hash_samples);
1896    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1897
1898    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1899    let target_hashes_per_second =
1900        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1901
1902    info!(
1903        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1904         second {target_hashes_per_second}"
1905    );
1906    if my_hashes_per_second < target_hashes_per_second {
1907        return Err(ValidatorError::PohTooSlow {
1908            mine: my_hashes_per_second,
1909            target: target_hashes_per_second,
1910        });
1911    }
1912
1913    Ok(())
1914}
1915
1916fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1917    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
1918    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1919        if wait_slot_for_supermajority == root_slot {
1920            return Some(wait_slot_for_supermajority);
1921        }
1922    }
1923
1924    None
1925}
1926
1927fn post_process_restored_tower(
1928    restored_tower: crate::consensus::Result<Tower>,
1929    validator_identity: &Pubkey,
1930    vote_account: &Pubkey,
1931    config: &ValidatorConfig,
1932    bank_forks: &BankForks,
1933) -> Result<Tower, String> {
1934    let mut should_require_tower = config.require_tower;
1935
1936    let restored_tower = restored_tower.and_then(|tower| {
1937        let root_bank = bank_forks.root_bank();
1938        let slot_history = root_bank.get_slot_history();
1939        // make sure tower isn't corrupted first before the following hard fork check
1940        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1941
1942        if let Some(hard_fork_restart_slot) =
1943            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1944        {
1945            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
1946            // out-of-chain vote state doesn't make sense at all
1947            // what if --wait-for-supermajority again if the validator restarted?
1948            let message =
1949                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1950            datapoint_error!("tower_error", ("error", message, String),);
1951            error!("{message}");
1952
1953            // unconditionally relax tower requirement so that we can always restore tower
1954            // from root bank.
1955            should_require_tower = false;
1956            return Err(crate::consensus::TowerError::HardFork(
1957                hard_fork_restart_slot,
1958            ));
1959        }
1960
1961        if let Some(warp_slot) = config.warp_slot {
1962            // unconditionally relax tower requirement so that we can always restore tower
1963            // from root bank after the warp
1964            should_require_tower = false;
1965            return Err(crate::consensus::TowerError::HardFork(warp_slot));
1966        }
1967
1968        tower
1969    });
1970
1971    let restored_tower = match restored_tower {
1972        Ok(tower) => tower,
1973        Err(err) => {
1974            let voting_has_been_active =
1975                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1976            if !err.is_file_missing() {
1977                datapoint_error!(
1978                    "tower_error",
1979                    ("error", format!("Unable to restore tower: {err}"), String),
1980                );
1981            }
1982            if should_require_tower && voting_has_been_active {
1983                return Err(format!(
1984                    "Requested mandatory tower restore failed: {err}. And there is an existing \
1985                     vote_account containing actual votes. Aborting due to possible conflicting \
1986                     duplicate votes"
1987                ));
1988            }
1989            if err.is_file_missing() && !voting_has_been_active {
1990                // Currently, don't protect against spoofed snapshots with no tower at all
1991                info!(
1992                    "Ignoring expected failed tower restore because this is the initial validator \
1993                     start with the vote account..."
1994                );
1995            } else {
1996                error!(
1997                    "Rebuilding a new tower from the latest vote account due to failed tower \
1998                     restore: {err}"
1999                );
2000            }
2001
2002            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2003        }
2004    };
2005
2006    Ok(restored_tower)
2007}
2008
2009fn load_genesis(
2010    config: &ValidatorConfig,
2011    ledger_path: &Path,
2012) -> Result<GenesisConfig, ValidatorError> {
2013    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2014        .map_err(ValidatorError::OpenGenesisConfig)?;
2015
2016    // This needs to be limited otherwise the state in the VoteAccount data
2017    // grows too large
2018    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2019    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2020    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2021    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2022
2023    let genesis_hash = genesis_config.hash();
2024    info!("genesis hash: {genesis_hash}");
2025
2026    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2027        if genesis_hash != expected_genesis_hash {
2028            return Err(ValidatorError::GenesisHashMismatch(
2029                genesis_hash,
2030                expected_genesis_hash,
2031            ));
2032        }
2033    }
2034
2035    Ok(genesis_config)
2036}
2037
2038#[allow(clippy::type_complexity)]
2039fn load_blockstore(
2040    config: &ValidatorConfig,
2041    ledger_path: &Path,
2042    genesis_config: &GenesisConfig,
2043    exit: Arc<AtomicBool>,
2044    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2045    accounts_update_notifier: Option<AccountsUpdateNotifier>,
2046    transaction_notifier: Option<TransactionNotifierArc>,
2047    entry_notifier: Option<EntryNotifierArc>,
2048    dependency_tracker: Option<Arc<DependencyTracker>>,
2049) -> Result<
2050    (
2051        Arc<RwLock<BankForks>>,
2052        Arc<Blockstore>,
2053        Slot,
2054        Receiver<bool>,
2055        LeaderScheduleCache,
2056        Option<StartingSnapshotHashes>,
2057        TransactionHistoryServices,
2058        blockstore_processor::ProcessOptions,
2059        BlockstoreRootScan,
2060        DroppedSlotsReceiver,
2061        Option<EntryNotifierService>,
2062    ),
2063    String,
2064> {
2065    info!("loading ledger from {ledger_path:?}...");
2066    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2067
2068    let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2069        .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2070
2071    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2072    blockstore.add_new_shred_signal(ledger_signal_sender);
2073
2074    // following boot sequence (esp BankForks) could set root. so stash the original value
2075    // of blockstore root away here as soon as possible.
2076    let original_blockstore_root = blockstore.max_root();
2077
2078    let blockstore = Arc::new(blockstore);
2079    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2080    let halt_at_slot = config
2081        .halt_at_slot
2082        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2083
2084    let process_options = blockstore_processor::ProcessOptions {
2085        run_verification: config.run_verification,
2086        halt_at_slot,
2087        new_hard_forks: config.new_hard_forks.clone(),
2088        debug_keys: config.debug_keys.clone(),
2089        accounts_db_config: config.accounts_db_config.clone(),
2090        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2091        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2092        runtime_config: config.runtime_config.clone(),
2093        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2094        ..blockstore_processor::ProcessOptions::default()
2095    };
2096
2097    let enable_rpc_transaction_history =
2098        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2099    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2100    let transaction_history_services =
2101        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2102            initialize_rpc_transaction_history_services(
2103                blockstore.clone(),
2104                exit.clone(),
2105                enable_rpc_transaction_history,
2106                config.rpc_config.enable_extended_tx_metadata_storage,
2107                transaction_notifier,
2108                dependency_tracker,
2109            )
2110        } else {
2111            TransactionHistoryServices::default()
2112        };
2113
2114    let entry_notifier_service = entry_notifier
2115        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2116
2117    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2118        bank_forks_utils::load_bank_forks(
2119            genesis_config,
2120            &blockstore,
2121            config.account_paths.clone(),
2122            &config.snapshot_config,
2123            &process_options,
2124            transaction_history_services
2125                .transaction_status_sender
2126                .as_ref(),
2127            entry_notifier_service
2128                .as_ref()
2129                .map(|service| service.sender()),
2130            accounts_update_notifier,
2131            exit,
2132        )
2133        .map_err(|err| err.to_string())?;
2134
2135    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2136    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2137    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2138    // cache flush, clean, shrink, as long as the same thread performing those activities also
2139    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2140    let pruned_banks_receiver =
2141        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2142
2143    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2144
2145    Ok((
2146        bank_forks,
2147        blockstore,
2148        original_blockstore_root,
2149        ledger_signal_receiver,
2150        leader_schedule_cache,
2151        starting_snapshot_hashes,
2152        transaction_history_services,
2153        process_options,
2154        blockstore_root_scan,
2155        pruned_banks_receiver,
2156        entry_notifier_service,
2157    ))
2158}
2159
2160pub struct ProcessBlockStore<'a> {
2161    id: &'a Pubkey,
2162    vote_account: &'a Pubkey,
2163    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2164    blockstore: &'a Blockstore,
2165    original_blockstore_root: Slot,
2166    bank_forks: &'a Arc<RwLock<BankForks>>,
2167    leader_schedule_cache: &'a LeaderScheduleCache,
2168    process_options: &'a blockstore_processor::ProcessOptions,
2169    transaction_status_sender: Option<&'a TransactionStatusSender>,
2170    entry_notification_sender: Option<&'a EntryNotifierSender>,
2171    blockstore_root_scan: Option<BlockstoreRootScan>,
2172    snapshot_controller: &'a SnapshotController,
2173    config: &'a ValidatorConfig,
2174    tower: Option<Tower>,
2175}
2176
2177impl<'a> ProcessBlockStore<'a> {
2178    #[allow(clippy::too_many_arguments)]
2179    fn new(
2180        id: &'a Pubkey,
2181        vote_account: &'a Pubkey,
2182        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2183        blockstore: &'a Blockstore,
2184        original_blockstore_root: Slot,
2185        bank_forks: &'a Arc<RwLock<BankForks>>,
2186        leader_schedule_cache: &'a LeaderScheduleCache,
2187        process_options: &'a blockstore_processor::ProcessOptions,
2188        transaction_status_sender: Option<&'a TransactionStatusSender>,
2189        entry_notification_sender: Option<&'a EntryNotifierSender>,
2190        blockstore_root_scan: BlockstoreRootScan,
2191        snapshot_controller: &'a SnapshotController,
2192        config: &'a ValidatorConfig,
2193    ) -> Self {
2194        Self {
2195            id,
2196            vote_account,
2197            start_progress,
2198            blockstore,
2199            original_blockstore_root,
2200            bank_forks,
2201            leader_schedule_cache,
2202            process_options,
2203            transaction_status_sender,
2204            entry_notification_sender,
2205            blockstore_root_scan: Some(blockstore_root_scan),
2206            snapshot_controller,
2207            config,
2208            tower: None,
2209        }
2210    }
2211
2212    pub(crate) fn process(&mut self) -> Result<(), String> {
2213        if self.tower.is_none() {
2214            let previous_start_process = *self.start_progress.read().unwrap();
2215            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2216
2217            let exit = Arc::new(AtomicBool::new(false));
2218            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2219                let bank_forks = self.bank_forks.clone();
2220                let exit = exit.clone();
2221                let start_progress = self.start_progress.clone();
2222
2223                let _ = Builder::new()
2224                    .name("solRptLdgrStat".to_string())
2225                    .spawn(move || {
2226                        while !exit.load(Ordering::Relaxed) {
2227                            let slot = bank_forks.read().unwrap().working_bank().slot();
2228                            *start_progress.write().unwrap() =
2229                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2230                            sleep(Duration::from_secs(2));
2231                        }
2232                    })
2233                    .unwrap();
2234            }
2235            blockstore_processor::process_blockstore_from_root(
2236                self.blockstore,
2237                self.bank_forks,
2238                self.leader_schedule_cache,
2239                self.process_options,
2240                self.transaction_status_sender,
2241                self.entry_notification_sender,
2242                Some(self.snapshot_controller),
2243            )
2244            .map_err(|err| {
2245                exit.store(true, Ordering::Relaxed);
2246                format!("Failed to load ledger: {err:?}")
2247            })?;
2248            exit.store(true, Ordering::Relaxed);
2249
2250            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2251                blockstore_root_scan.join();
2252            }
2253
2254            self.tower = Some({
2255                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2256                if let Ok(tower) = &restored_tower {
2257                    // reconciliation attempt 1 of 2 with tower
2258                    reconcile_blockstore_roots_with_external_source(
2259                        ExternalRootSource::Tower(tower.root()),
2260                        self.blockstore,
2261                        &mut self.original_blockstore_root,
2262                    )
2263                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2264                }
2265
2266                post_process_restored_tower(
2267                    restored_tower,
2268                    self.id,
2269                    self.vote_account,
2270                    self.config,
2271                    &self.bank_forks.read().unwrap(),
2272                )?
2273            });
2274
2275            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2276                self.config,
2277                self.bank_forks.read().unwrap().root(),
2278            ) {
2279                // reconciliation attempt 2 of 2 with hard fork
2280                // this should be #2 because hard fork root > tower root in almost all cases
2281                reconcile_blockstore_roots_with_external_source(
2282                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2283                    self.blockstore,
2284                    &mut self.original_blockstore_root,
2285                )
2286                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2287            }
2288
2289            *self.start_progress.write().unwrap() = previous_start_process;
2290        }
2291        Ok(())
2292    }
2293
2294    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2295        self.process()?;
2296        Ok(self.tower.unwrap())
2297    }
2298}
2299
2300fn maybe_warp_slot(
2301    config: &ValidatorConfig,
2302    process_blockstore: &mut ProcessBlockStore,
2303    ledger_path: &Path,
2304    bank_forks: &RwLock<BankForks>,
2305    leader_schedule_cache: &LeaderScheduleCache,
2306    snapshot_controller: &SnapshotController,
2307) -> Result<(), String> {
2308    if let Some(warp_slot) = config.warp_slot {
2309        let mut bank_forks = bank_forks.write().unwrap();
2310
2311        let working_bank = bank_forks.working_bank();
2312
2313        if warp_slot <= working_bank.slot() {
2314            return Err(format!(
2315                "warp slot ({}) cannot be less than the working bank slot ({})",
2316                warp_slot,
2317                working_bank.slot()
2318            ));
2319        }
2320        info!("warping to slot {warp_slot}");
2321
2322        let root_bank = bank_forks.root_bank();
2323
2324        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2325        // requires that the accounts cache has been flushed, which requires the parent slot to be
2326        // rooted.
2327        root_bank.squash();
2328        root_bank.force_flush_accounts_cache();
2329
2330        bank_forks.insert(Bank::warp_from_parent(
2331            root_bank,
2332            &Pubkey::default(),
2333            warp_slot,
2334        ));
2335        bank_forks
2336            .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2337            .map_err(|err| err.to_string())?;
2338        leader_schedule_cache.set_root(&bank_forks.root_bank());
2339
2340        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2341            ledger_path,
2342            &bank_forks.root_bank(),
2343            None,
2344            &config.snapshot_config.full_snapshot_archives_dir,
2345            &config.snapshot_config.incremental_snapshot_archives_dir,
2346            config.snapshot_config.archive_format,
2347        ) {
2348            Ok(archive_info) => archive_info,
2349            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2350        };
2351        info!(
2352            "created snapshot: {}",
2353            full_snapshot_archive_info.path().display()
2354        );
2355
2356        drop(bank_forks);
2357        // Process blockstore after warping bank forks to make sure tower and
2358        // bank forks are in sync.
2359        process_blockstore.process()?;
2360    }
2361    Ok(())
2362}
2363
2364/// Returns the starting slot at which the blockstore should be scanned for
2365/// shreds with an incorrect shred version, or None if the check is unnecessary
2366fn should_cleanup_blockstore_incorrect_shred_versions(
2367    config: &ValidatorConfig,
2368    blockstore: &Blockstore,
2369    root_slot: Slot,
2370    hard_forks: &HardForks,
2371) -> Result<Option<Slot>, BlockstoreError> {
2372    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2373    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2374    if maybe_cluster_restart_slot.is_some() {
2375        return Ok(Some(root_slot + 1));
2376    }
2377
2378    // If there are no hard forks, the shred version cannot have changed
2379    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2380        return Ok(None);
2381    };
2382
2383    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2384    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2385        return Ok(None);
2386    };
2387    let blockstore_min_slot = blockstore.lowest_slot();
2388    info!(
2389        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2390         latest hard fork is {latest_hard_fork}"
2391    );
2392
2393    if latest_hard_fork < blockstore_min_slot {
2394        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2395        //
2396        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2397        // the correct shred version should have been inserted since the latest hard fork
2398        //
2399        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2400        // can skip the check for this case
2401        Ok(None)
2402    } else if latest_hard_fork < blockstore_max_slot {
2403        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2404        //
2405        // This could be a case where there was a cluster restart, but this node was not part of
2406        // the supermajority that actually restarted the cluster. Rather, this node likely
2407        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2408        // chosen restart slot. We need to perform the blockstore check for this case
2409        //
2410        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2411        // fork slot. Even though this node will only replay slots after root_slot, start the check
2412        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2413        Ok(Some(latest_hard_fork + 1))
2414    } else {
2415        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2416        //
2417        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2418        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2419        //
2420        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2421        // and just restarted with a new downloaded snapshot but the old blockstore
2422        Ok(None)
2423    }
2424}
2425
2426/// Searches the blockstore for data shreds with a shred version that differs
2427/// from the passed `expected_shred_version`
2428fn scan_blockstore_for_incorrect_shred_version(
2429    blockstore: &Blockstore,
2430    start_slot: Slot,
2431    expected_shred_version: u16,
2432) -> Result<Option<u16>, BlockstoreError> {
2433    const TIMEOUT: Duration = Duration::from_secs(60);
2434    let timer = Instant::now();
2435    // Search for shreds with incompatible version in blockstore
2436    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2437
2438    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2439    for (slot, _meta) in slot_meta_iterator {
2440        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2441        for shred in &shreds {
2442            if shred.version() != expected_shred_version {
2443                return Ok(Some(shred.version()));
2444            }
2445        }
2446        if timer.elapsed() > TIMEOUT {
2447            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2448            break;
2449        }
2450    }
2451    Ok(None)
2452}
2453
2454/// If the blockstore contains any shreds with the incorrect shred version,
2455/// copy them to a backup blockstore and purge them from the actual blockstore.
2456fn cleanup_blockstore_incorrect_shred_versions(
2457    blockstore: &Blockstore,
2458    config: &ValidatorConfig,
2459    start_slot: Slot,
2460    expected_shred_version: u16,
2461) -> Result<(), BlockstoreError> {
2462    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2463        blockstore,
2464        start_slot,
2465        expected_shred_version,
2466    )?;
2467    let Some(incorrect_shred_version) = incorrect_shred_version else {
2468        info!("Only shreds with the correct version were found in the blockstore");
2469        return Ok(());
2470    };
2471
2472    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2473    let end_slot = blockstore.highest_slot()?.unwrap();
2474
2475    // Backing up the shreds that will be deleted from primary blockstore is
2476    // not critical, so swallow errors from backup blockstore operations.
2477    let backup_folder = format!(
2478        "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2479    );
2480    match Blockstore::open_with_options(
2481        &blockstore.ledger_path().join(backup_folder),
2482        config.blockstore_options.clone(),
2483    ) {
2484        Ok(backup_blockstore) => {
2485            info!("Backing up slots from {start_slot} to {end_slot}");
2486            let mut timer = Measure::start("blockstore backup");
2487
2488            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2489            let mut print_timer = Instant::now();
2490            let mut num_slots_copied = 0;
2491            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2492            for (slot, _meta) in slot_meta_iterator {
2493                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2494                let shreds = shreds.into_iter().map(Cow::Owned);
2495                let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2496                num_slots_copied += 1;
2497
2498                if print_timer.elapsed() > PRINT_INTERVAL {
2499                    info!("Backed up {num_slots_copied} slots thus far");
2500                    print_timer = Instant::now();
2501                }
2502            }
2503
2504            timer.stop();
2505            info!("Backing up slots done. {timer}");
2506        }
2507        Err(err) => {
2508            warn!("Unable to backup shreds with incorrect shred version: {err}");
2509        }
2510    }
2511
2512    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2513    let mut timer = Measure::start("blockstore purge");
2514    blockstore.purge_from_next_slots(start_slot, end_slot);
2515    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2516    timer.stop();
2517    info!("Purging slots done. {timer}");
2518
2519    Ok(())
2520}
2521
2522fn initialize_rpc_transaction_history_services(
2523    blockstore: Arc<Blockstore>,
2524    exit: Arc<AtomicBool>,
2525    enable_rpc_transaction_history: bool,
2526    enable_extended_tx_metadata_storage: bool,
2527    transaction_notifier: Option<TransactionNotifierArc>,
2528    dependency_tracker: Option<Arc<DependencyTracker>>,
2529) -> TransactionHistoryServices {
2530    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2531    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2532    let transaction_status_sender = Some(TransactionStatusSender {
2533        sender: transaction_status_sender,
2534        dependency_tracker: dependency_tracker.clone(),
2535    });
2536    let transaction_status_service = Some(TransactionStatusService::new(
2537        transaction_status_receiver,
2538        max_complete_transaction_status_slot.clone(),
2539        enable_rpc_transaction_history,
2540        transaction_notifier,
2541        blockstore.clone(),
2542        enable_extended_tx_metadata_storage,
2543        dependency_tracker,
2544        exit.clone(),
2545    ));
2546
2547    TransactionHistoryServices {
2548        transaction_status_sender,
2549        transaction_status_service,
2550        max_complete_transaction_status_slot,
2551    }
2552}
2553
2554#[derive(Error, Debug)]
2555pub enum ValidatorError {
2556    #[error("bank hash mismatch: actual={0}, expected={1}")]
2557    BankHashMismatch(Hash, Hash),
2558
2559    #[error("blockstore error: {0}")]
2560    Blockstore(#[source] BlockstoreError),
2561
2562    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2563    GenesisHashMismatch(Hash, Hash),
2564
2565    #[error(
2566        "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2567         slot={1}"
2568    )]
2569    NotEnoughLedgerData(Slot, Slot),
2570
2571    #[error("failed to open genesis: {0}")]
2572    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2573
2574    #[error("{0}")]
2575    Other(String),
2576
2577    #[error(
2578        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2579    )]
2580    PohTooSlow { mine: u64, target: u64 },
2581
2582    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2583    ShredVersionMismatch { actual: u16, expected: u16 },
2584
2585    #[error(transparent)]
2586    TraceError(#[from] TraceError),
2587
2588    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2589    WenRestartFinished,
2590}
2591
2592// Return if the validator waited on other nodes to start. In this case
2593// it should not wait for one of it's votes to land to produce blocks
2594// because if the whole network is waiting, then it will stall.
2595//
2596// Error indicates that a bad hash was encountered or another condition
2597// that is unrecoverable and the validator should exit.
2598fn wait_for_supermajority(
2599    config: &ValidatorConfig,
2600    process_blockstore: Option<&mut ProcessBlockStore>,
2601    bank_forks: &RwLock<BankForks>,
2602    cluster_info: &ClusterInfo,
2603    rpc_override_health_check: Arc<AtomicBool>,
2604    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2605) -> Result<bool, ValidatorError> {
2606    match config.wait_for_supermajority {
2607        None => Ok(false),
2608        Some(wait_for_supermajority_slot) => {
2609            if let Some(process_blockstore) = process_blockstore {
2610                process_blockstore
2611                    .process()
2612                    .map_err(ValidatorError::Other)?;
2613            }
2614
2615            let bank = bank_forks.read().unwrap().working_bank();
2616            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2617                std::cmp::Ordering::Less => return Ok(false),
2618                std::cmp::Ordering::Greater => {
2619                    return Err(ValidatorError::NotEnoughLedgerData(
2620                        bank.slot(),
2621                        wait_for_supermajority_slot,
2622                    ));
2623                }
2624                _ => {}
2625            }
2626
2627            if let Some(expected_bank_hash) = config.expected_bank_hash {
2628                if bank.hash() != expected_bank_hash {
2629                    return Err(ValidatorError::BankHashMismatch(
2630                        bank.hash(),
2631                        expected_bank_hash,
2632                    ));
2633                }
2634            }
2635
2636            for i in 1.. {
2637                let logging = i % 10 == 1;
2638                if logging {
2639                    info!(
2640                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2641                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2642                        bank.slot()
2643                    );
2644                }
2645
2646                let gossip_stake_percent =
2647                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2648
2649                *start_progress.write().unwrap() =
2650                    ValidatorStartProgress::WaitingForSupermajority {
2651                        slot: wait_for_supermajority_slot,
2652                        gossip_stake_percent,
2653                    };
2654
2655                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2656                    info!(
2657                        "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2658                         starting up now.",
2659                    );
2660                    break;
2661                }
2662                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2663                // prevent load balancers from removing the node from their list of candidates during a
2664                // manual restart.
2665                rpc_override_health_check.store(true, Ordering::Relaxed);
2666                sleep(Duration::new(1, 0));
2667            }
2668            rpc_override_health_check.store(false, Ordering::Relaxed);
2669            Ok(true)
2670        }
2671    }
2672}
2673
2674// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2675fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2676    let mut online_stake = 0;
2677    let mut wrong_shred_stake = 0;
2678    let mut wrong_shred_nodes = vec![];
2679    let mut offline_stake = 0;
2680    let mut offline_nodes = vec![];
2681
2682    let mut total_activated_stake = 0;
2683    let now = timestamp();
2684    // Nodes contact infos are saved to disk and restored on validator startup.
2685    // Staked nodes entries will not expire until an epoch after. So it
2686    // is necessary here to filter for recent entries to establish liveness.
2687    let peers: HashMap<_, _> = cluster_info
2688        .tvu_peers(ContactInfo::clone)
2689        .into_iter()
2690        .filter(|node| {
2691            let age = now.saturating_sub(node.wallclock());
2692            // Contact infos are refreshed twice during this period.
2693            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2694        })
2695        .map(|node| (*node.pubkey(), node))
2696        .collect();
2697    let my_shred_version = cluster_info.my_shred_version();
2698    let my_id = cluster_info.id();
2699
2700    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2701        let activated_stake = *activated_stake;
2702        total_activated_stake += activated_stake;
2703
2704        if activated_stake == 0 {
2705            continue;
2706        }
2707        let vote_state_node_pubkey = *vote_account.node_pubkey();
2708
2709        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2710            if peer.shred_version() == my_shred_version {
2711                trace!(
2712                    "observed {vote_state_node_pubkey} in gossip, \
2713                     (activated_stake={activated_stake})"
2714                );
2715                online_stake += activated_stake;
2716            } else {
2717                wrong_shred_stake += activated_stake;
2718                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2719            }
2720        } else if vote_state_node_pubkey == my_id {
2721            online_stake += activated_stake; // This node is online
2722        } else {
2723            offline_stake += activated_stake;
2724            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2725        }
2726    }
2727
2728    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2729    if log {
2730        info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2731
2732        if !wrong_shred_nodes.is_empty() {
2733            info!(
2734                "{:.3}% of active stake has the wrong shred version in gossip",
2735                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2736            );
2737            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2738            for (stake, identity) in wrong_shred_nodes {
2739                info!(
2740                    "    {:.3}% - {}",
2741                    (stake as f64 / total_activated_stake as f64) * 100.,
2742                    identity
2743                );
2744            }
2745        }
2746
2747        if !offline_nodes.is_empty() {
2748            info!(
2749                "{:.3}% of active stake is not visible in gossip",
2750                (offline_stake as f64 / total_activated_stake as f64) * 100.
2751            );
2752            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2753            for (stake, identity) in offline_nodes {
2754                info!(
2755                    "    {:.3}% - {}",
2756                    (stake as f64 / total_activated_stake as f64) * 100.,
2757                    identity
2758                );
2759            }
2760        }
2761    }
2762
2763    online_stake_percentage as u64
2764}
2765
2766fn cleanup_accounts_paths(config: &ValidatorConfig) {
2767    for account_path in &config.account_paths {
2768        move_and_async_delete_path_contents(account_path);
2769    }
2770    if let Some(shrink_paths) = config
2771        .accounts_db_config
2772        .as_ref()
2773        .and_then(|config| config.shrink_paths.as_ref())
2774    {
2775        for shrink_path in shrink_paths {
2776            move_and_async_delete_path_contents(shrink_path);
2777        }
2778    }
2779}
2780
2781pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2782    // if the snapshot config is configured to *not* take snapshots, then it is valid
2783    if !snapshot_config.should_generate_snapshots() {
2784        return true;
2785    }
2786
2787    let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2788        snapshot_config.full_snapshot_archive_interval
2789    else {
2790        // if we *are* generating snapshots, then the full snapshot interval cannot be disabled
2791        return false;
2792    };
2793
2794    match snapshot_config.incremental_snapshot_archive_interval {
2795        SnapshotInterval::Disabled => true,
2796        SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2797            full_snapshot_interval_slots > incremental_snapshot_interval_slots
2798        }
2799    }
2800}
2801
2802#[cfg(test)]
2803mod tests {
2804    use {
2805        super::*,
2806        crossbeam_channel::{bounded, RecvTimeoutError},
2807        solana_entry::entry,
2808        solana_genesis_config::create_genesis_config,
2809        solana_gossip::contact_info::ContactInfo,
2810        solana_ledger::{
2811            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2812            get_tmp_ledger_path_auto_delete,
2813        },
2814        solana_poh_config::PohConfig,
2815        solana_sha256_hasher::hash,
2816        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2817        std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2818    };
2819
2820    #[test]
2821    fn validator_exit() {
2822        solana_logger::setup();
2823        let leader_keypair = Keypair::new();
2824        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2825
2826        let validator_keypair = Keypair::new();
2827        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2828        let genesis_config =
2829            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2830                .genesis_config;
2831        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2832
2833        let voting_keypair = Arc::new(Keypair::new());
2834        let config = ValidatorConfig {
2835            rpc_addrs: Some((
2836                validator_node.info.rpc().unwrap(),
2837                validator_node.info.rpc_pubsub().unwrap(),
2838            )),
2839            ..ValidatorConfig::default_for_test()
2840        };
2841        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2842        let validator = Validator::new(
2843            validator_node,
2844            Arc::new(validator_keypair),
2845            &validator_ledger_path,
2846            &voting_keypair.pubkey(),
2847            Arc::new(RwLock::new(vec![voting_keypair])),
2848            vec![leader_node.info],
2849            &config,
2850            true, // should_check_duplicate_instance
2851            None, // rpc_to_plugin_manager_receiver
2852            start_progress.clone(),
2853            SocketAddrSpace::Unspecified,
2854            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2855            Arc::new(RwLock::new(None)),
2856        )
2857        .expect("assume successful validator start");
2858        assert_eq!(
2859            *start_progress.read().unwrap(),
2860            ValidatorStartProgress::Running
2861        );
2862        validator.close();
2863        remove_dir_all(validator_ledger_path).unwrap();
2864    }
2865
2866    #[test]
2867    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2868        solana_logger::setup();
2869
2870        let ledger_path = get_tmp_ledger_path_auto_delete!();
2871        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2872
2873        let mut validator_config = ValidatorConfig::default_for_test();
2874        let mut hard_forks = HardForks::default();
2875        let mut root_slot;
2876
2877        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2878        root_slot = 10;
2879        validator_config.wait_for_supermajority = Some(root_slot);
2880        assert_eq!(
2881            should_cleanup_blockstore_incorrect_shred_versions(
2882                &validator_config,
2883                &blockstore,
2884                root_slot,
2885                &hard_forks
2886            )
2887            .unwrap(),
2888            Some(root_slot + 1)
2889        );
2890
2891        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
2892        // Arguably operator error to pass a value for wait_for_supermajority in this case
2893        root_slot = 15;
2894        assert_eq!(
2895            should_cleanup_blockstore_incorrect_shred_versions(
2896                &validator_config,
2897                &blockstore,
2898                root_slot,
2899                &hard_forks
2900            )
2901            .unwrap(),
2902            None,
2903        );
2904
2905        // Emulate cluster restart at slot 10
2906        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
2907        hard_forks.register(10);
2908        assert_eq!(
2909            should_cleanup_blockstore_incorrect_shred_versions(
2910                &validator_config,
2911                &blockstore,
2912                root_slot,
2913                &hard_forks
2914            )
2915            .unwrap(),
2916            None,
2917        );
2918
2919        // Insert some shreds at newer slots than hard fork
2920        let entries = entry::create_ticks(1, 0, Hash::default());
2921        for i in 20..35 {
2922            let shreds = blockstore::entries_to_test_shreds(
2923                &entries,
2924                i,     // slot
2925                i - 1, // parent_slot
2926                true,  // is_full_slot
2927                1,     // version
2928            );
2929            blockstore.insert_shreds(shreds, None, true).unwrap();
2930        }
2931
2932        // No check as all blockstore data is newer than latest hard fork
2933        assert_eq!(
2934            should_cleanup_blockstore_incorrect_shred_versions(
2935                &validator_config,
2936                &blockstore,
2937                root_slot,
2938                &hard_forks
2939            )
2940            .unwrap(),
2941            None,
2942        );
2943
2944        // Emulate cluster restart at slot 25
2945        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
2946        root_slot = 25;
2947        hard_forks.register(root_slot);
2948        validator_config.wait_for_supermajority = Some(root_slot);
2949        assert_eq!(
2950            should_cleanup_blockstore_incorrect_shred_versions(
2951                &validator_config,
2952                &blockstore,
2953                root_slot,
2954                &hard_forks
2955            )
2956            .unwrap(),
2957            Some(root_slot + 1),
2958        );
2959        validator_config.wait_for_supermajority = None;
2960        assert_eq!(
2961            should_cleanup_blockstore_incorrect_shred_versions(
2962                &validator_config,
2963                &blockstore,
2964                root_slot,
2965                &hard_forks
2966            )
2967            .unwrap(),
2968            Some(root_slot + 1),
2969        );
2970
2971        // Do check with advanced root slot, even without wait_for_supermajority set correctly
2972        // Check starts from latest hard fork + 1
2973        root_slot = 30;
2974        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2975        assert_eq!(
2976            should_cleanup_blockstore_incorrect_shred_versions(
2977                &validator_config,
2978                &blockstore,
2979                root_slot,
2980                &hard_forks
2981            )
2982            .unwrap(),
2983            Some(latest_hard_fork + 1),
2984        );
2985
2986        // Purge blockstore up to latest hard fork
2987        // No check since all blockstore data newer than latest hard fork
2988        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2989        assert_eq!(
2990            should_cleanup_blockstore_incorrect_shred_versions(
2991                &validator_config,
2992                &blockstore,
2993                root_slot,
2994                &hard_forks
2995            )
2996            .unwrap(),
2997            None,
2998        );
2999    }
3000
3001    #[test]
3002    fn test_cleanup_blockstore_incorrect_shred_versions() {
3003        solana_logger::setup();
3004
3005        let validator_config = ValidatorConfig::default_for_test();
3006        let ledger_path = get_tmp_ledger_path_auto_delete!();
3007        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3008
3009        let entries = entry::create_ticks(1, 0, Hash::default());
3010        for i in 1..10 {
3011            let shreds = blockstore::entries_to_test_shreds(
3012                &entries,
3013                i,     // slot
3014                i - 1, // parent_slot
3015                true,  // is_full_slot
3016                1,     // version
3017            );
3018            blockstore.insert_shreds(shreds, None, true).unwrap();
3019        }
3020
3021        // this purges and compacts all slots greater than or equal to 5
3022        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3023        // assert that slots less than 5 aren't affected
3024        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3025        for i in 5..10 {
3026            assert!(blockstore
3027                .get_data_shreds_for_slot(i, 0)
3028                .unwrap()
3029                .is_empty());
3030        }
3031    }
3032
3033    #[test]
3034    fn validator_parallel_exit() {
3035        let leader_keypair = Keypair::new();
3036        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3037        let genesis_config =
3038            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3039                .genesis_config;
3040
3041        let mut ledger_paths = vec![];
3042        let mut validators: Vec<Validator> = (0..2)
3043            .map(|_| {
3044                let validator_keypair = Keypair::new();
3045                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3046                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3047                ledger_paths.push(validator_ledger_path.clone());
3048                let vote_account_keypair = Keypair::new();
3049                let config = ValidatorConfig {
3050                    rpc_addrs: Some((
3051                        validator_node.info.rpc().unwrap(),
3052                        validator_node.info.rpc_pubsub().unwrap(),
3053                    )),
3054                    ..ValidatorConfig::default_for_test()
3055                };
3056                Validator::new(
3057                    validator_node,
3058                    Arc::new(validator_keypair),
3059                    &validator_ledger_path,
3060                    &vote_account_keypair.pubkey(),
3061                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3062                    vec![leader_node.info.clone()],
3063                    &config,
3064                    true, // should_check_duplicate_instance.
3065                    None, // rpc_to_plugin_manager_receiver
3066                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3067                    SocketAddrSpace::Unspecified,
3068                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3069                    Arc::new(RwLock::new(None)),
3070                )
3071                .expect("assume successful validator start")
3072            })
3073            .collect();
3074
3075        // Each validator can exit in parallel to speed many sequential calls to join`
3076        validators.iter_mut().for_each(|v| v.exit());
3077
3078        // spawn a new thread to wait for the join of the validator
3079        let (sender, receiver) = bounded(0);
3080        let _ = thread::spawn(move || {
3081            validators.into_iter().for_each(|validator| {
3082                validator.join();
3083            });
3084            sender.send(()).unwrap();
3085        });
3086
3087        let timeout = Duration::from_secs(60);
3088        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3089            panic!("timeout for shutting down validators",);
3090        }
3091
3092        for path in ledger_paths {
3093            remove_dir_all(path).unwrap();
3094        }
3095    }
3096
3097    #[test]
3098    fn test_wait_for_supermajority() {
3099        solana_logger::setup();
3100        let node_keypair = Arc::new(Keypair::new());
3101        let cluster_info = ClusterInfo::new(
3102            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3103            node_keypair,
3104            SocketAddrSpace::Unspecified,
3105        );
3106
3107        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3108        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3109        let mut config = ValidatorConfig::default_for_test();
3110        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3111        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3112
3113        assert!(!wait_for_supermajority(
3114            &config,
3115            None,
3116            &bank_forks,
3117            &cluster_info,
3118            rpc_override_health_check.clone(),
3119            &start_progress,
3120        )
3121        .unwrap());
3122
3123        // bank=0, wait=1, should fail
3124        config.wait_for_supermajority = Some(1);
3125        assert!(matches!(
3126            wait_for_supermajority(
3127                &config,
3128                None,
3129                &bank_forks,
3130                &cluster_info,
3131                rpc_override_health_check.clone(),
3132                &start_progress,
3133            ),
3134            Err(ValidatorError::NotEnoughLedgerData(_, _)),
3135        ));
3136
3137        // bank=1, wait=0, should pass, bank is past the wait slot
3138        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3139            bank_forks.read().unwrap().root_bank(),
3140            &Pubkey::default(),
3141            1,
3142        ));
3143        config.wait_for_supermajority = Some(0);
3144        assert!(!wait_for_supermajority(
3145            &config,
3146            None,
3147            &bank_forks,
3148            &cluster_info,
3149            rpc_override_health_check.clone(),
3150            &start_progress,
3151        )
3152        .unwrap());
3153
3154        // bank=1, wait=1, equal, but bad hash provided
3155        config.wait_for_supermajority = Some(1);
3156        config.expected_bank_hash = Some(hash(&[1]));
3157        assert!(matches!(
3158            wait_for_supermajority(
3159                &config,
3160                None,
3161                &bank_forks,
3162                &cluster_info,
3163                rpc_override_health_check,
3164                &start_progress,
3165            ),
3166            Err(ValidatorError::BankHashMismatch(_, _)),
3167        ));
3168    }
3169
3170    #[test]
3171    fn test_is_snapshot_config_valid() {
3172        fn new_snapshot_config(
3173            full_snapshot_archive_interval_slots: Slot,
3174            incremental_snapshot_archive_interval_slots: Slot,
3175        ) -> SnapshotConfig {
3176            SnapshotConfig {
3177                full_snapshot_archive_interval: SnapshotInterval::Slots(
3178                    NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3179                ),
3180                incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3181                    NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3182                ),
3183                ..SnapshotConfig::default()
3184            }
3185        }
3186
3187        // default config must be valid
3188        assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3189
3190        // disabled incremental snapshot must be valid
3191        assert!(is_snapshot_config_valid(&SnapshotConfig {
3192            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3193            ..SnapshotConfig::default()
3194        }));
3195
3196        // disabled full snapshot must be invalid though (if generating snapshots)
3197        assert!(!is_snapshot_config_valid(&SnapshotConfig {
3198            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3199            ..SnapshotConfig::default()
3200        }));
3201
3202        // simple config must be valid
3203        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3204        assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3205        assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3206        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3207
3208        // config where full interval is not larger than incremental interval must be invalid
3209        assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3210        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3211        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3212
3213        // config with snapshots disabled (or load-only) must be valid
3214        assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3215        assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3216        assert!(is_snapshot_config_valid(&SnapshotConfig {
3217            full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3218            incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3219                NonZeroU64::new(41).unwrap()
3220            ),
3221            ..SnapshotConfig::new_load_only()
3222        }));
3223        assert!(is_snapshot_config_valid(&SnapshotConfig {
3224            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3225            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3226            ..SnapshotConfig::new_load_only()
3227        }));
3228    }
3229
3230    fn target_tick_duration() -> Duration {
3231        // DEFAULT_MS_PER_SLOT = 400
3232        // DEFAULT_TICKS_PER_SLOT = 64
3233        // MS_PER_TICK = 6
3234        //
3235        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3236        //
3237        // So, convert to microseconds first to avoid the integer rounding error
3238        let target_tick_duration_us =
3239            solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3240        assert_eq!(target_tick_duration_us, 6250);
3241        Duration::from_micros(target_tick_duration_us)
3242    }
3243
3244    #[test]
3245    fn test_poh_speed() {
3246        solana_logger::setup();
3247        let poh_config = PohConfig {
3248            target_tick_duration: target_tick_duration(),
3249            // make PoH rate really fast to cause the panic condition
3250            hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3251            ..PohConfig::default()
3252        };
3253        let genesis_config = GenesisConfig {
3254            poh_config,
3255            ..GenesisConfig::default()
3256        };
3257        let bank = Bank::new_for_tests(&genesis_config);
3258        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3259    }
3260
3261    #[test]
3262    fn test_poh_speed_no_hashes_per_tick() {
3263        solana_logger::setup();
3264        let poh_config = PohConfig {
3265            target_tick_duration: target_tick_duration(),
3266            hashes_per_tick: None,
3267            ..PohConfig::default()
3268        };
3269        let genesis_config = GenesisConfig {
3270            poh_config,
3271            ..GenesisConfig::default()
3272        };
3273        let bank = Bank::new_for_tests(&genesis_config);
3274        check_poh_speed(&bank, Some(10_000)).unwrap();
3275    }
3276}