solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7        banking_stage::BankingStage,
8        banking_trace::{self, BankingTracer, TraceError},
9        cluster_info_vote_listener::VoteTracker,
10        completed_data_sets_service::CompletedDataSetsService,
11        consensus::{
12            reconcile_blockstore_roots_with_external_source,
13            tower_storage::{NullTowerStorage, TowerStorage},
14            ExternalRootSource, Tower,
15        },
16        repair::{
17            self,
18            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19            repair_handler::RepairHandlerType,
20            serve_repair_service::ServeRepairService,
21        },
22        sample_performance_service::SamplePerformanceService,
23        sigverify,
24        snapshot_packager_service::SnapshotPackagerService,
25        stats_reporter_service::StatsReporterService,
26        system_monitor_service::{
27            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28        },
29        tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30        tvu::{Tvu, TvuConfig, TvuSockets},
31    },
32    anyhow::{anyhow, Context, Result},
33    crossbeam_channel::{bounded, unbounded, Receiver},
34    quinn::Endpoint,
35    solana_accounts_db::{
36        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37        accounts_update_notifier_interface::AccountsUpdateNotifier,
38        hardened_unpack::{
39            open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40        },
41        utils::move_and_async_delete_path_contents,
42    },
43    solana_client::connection_cache::{ConnectionCache, Protocol},
44    solana_clock::Slot,
45    solana_cluster_type::ClusterType,
46    solana_entry::poh::compute_hash_time,
47    solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
48    solana_genesis_config::GenesisConfig,
49    solana_geyser_plugin_manager::{
50        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
51    },
52    solana_gossip::{
53        cluster_info::{
54            ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
55            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
56        },
57        contact_info::ContactInfo,
58        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
59        gossip_service::GossipService,
60        node::{Node, NodeMultihoming},
61    },
62    solana_hard_forks::HardForks,
63    solana_hash::Hash,
64    solana_keypair::Keypair,
65    solana_ledger::{
66        bank_forks_utils,
67        blockstore::{
68            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
69            MAX_REPLAY_WAKE_UP_SIGNALS,
70        },
71        blockstore_metric_report_service::BlockstoreMetricReportService,
72        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
73        blockstore_processor::{self, TransactionStatusSender},
74        entry_notifier_interface::EntryNotifierArc,
75        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
76        leader_schedule::FixedSchedule,
77        leader_schedule_cache::LeaderScheduleCache,
78        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
79    },
80    solana_measure::measure::Measure,
81    solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
82    solana_poh::{
83        poh_recorder::PohRecorder,
84        poh_service::{self, PohService},
85        transaction_recorder::TransactionRecorder,
86    },
87    solana_pubkey::Pubkey,
88    solana_rayon_threadlimit::get_thread_count,
89    solana_rpc::{
90        max_slots::MaxSlots,
91        optimistically_confirmed_bank_tracker::{
92            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
93            OptimisticallyConfirmedBankTracker,
94        },
95        rpc::JsonRpcConfig,
96        rpc_completed_slots_service::RpcCompletedSlotsService,
97        rpc_pubsub_service::{PubSubConfig, PubSubService},
98        rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
99        rpc_subscriptions::RpcSubscriptions,
100        transaction_notifier_interface::TransactionNotifierArc,
101        transaction_status_service::TransactionStatusService,
102    },
103    solana_runtime::{
104        accounts_background_service::{
105            AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
106            PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
107        },
108        bank::Bank,
109        bank_forks::BankForks,
110        commitment::BlockCommitmentCache,
111        dependency_tracker::DependencyTracker,
112        prioritization_fee_cache::PrioritizationFeeCache,
113        runtime_config::RuntimeConfig,
114        snapshot_archive_info::SnapshotArchiveInfoGetter,
115        snapshot_bank_utils,
116        snapshot_config::SnapshotConfig,
117        snapshot_controller::SnapshotController,
118        snapshot_hash::StartingSnapshotHashes,
119        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, SnapshotInterval},
120    },
121    solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
122    solana_shred_version::compute_shred_version,
123    solana_signer::Signer,
124    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
125    solana_time_utils::timestamp,
126    solana_tpu_client::tpu_client::{
127        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
128    },
129    solana_turbine::{
130        self,
131        broadcast_stage::BroadcastStageType,
132        xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
133    },
134    solana_unified_scheduler_pool::DefaultSchedulerPool,
135    solana_validator_exit::Exit,
136    solana_vote_program::vote_state,
137    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
138    std::{
139        borrow::Cow,
140        collections::{HashMap, HashSet},
141        net::{IpAddr, SocketAddr},
142        num::NonZeroUsize,
143        path::{Path, PathBuf},
144        sync::{
145            atomic::{AtomicBool, AtomicU64, Ordering},
146            Arc, Mutex, RwLock,
147        },
148        thread::{sleep, Builder, JoinHandle},
149        time::{Duration, Instant},
150    },
151    strum::VariantNames,
152    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
153    thiserror::Error,
154    tokio::runtime::Runtime as TokioRuntime,
155    tokio_util::sync::CancellationToken,
156};
157
158const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
159const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
160// Right now since we reuse the wait for supermajority code, the
161// following threshold should always greater than or equal to
162// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
163const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
164    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
165
166#[derive(
167    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
168)]
169#[strum(serialize_all = "kebab-case")]
170pub enum BlockVerificationMethod {
171    BlockstoreProcessor,
172    #[default]
173    UnifiedScheduler,
174}
175
176impl BlockVerificationMethod {
177    pub const fn cli_names() -> &'static [&'static str] {
178        Self::VARIANTS
179    }
180
181    pub fn cli_message() -> &'static str {
182        "Switch transaction scheduling method for verifying ledger entries"
183    }
184}
185
186#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
187#[strum(serialize_all = "kebab-case")]
188pub enum BlockProductionMethod {
189    CentralScheduler,
190    #[default]
191    CentralSchedulerGreedy,
192}
193
194impl BlockProductionMethod {
195    pub const fn cli_names() -> &'static [&'static str] {
196        Self::VARIANTS
197    }
198
199    pub fn cli_message() -> &'static str {
200        "Switch transaction scheduling method for producing ledger entries"
201    }
202}
203
204#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
205#[strum(serialize_all = "kebab-case")]
206pub enum TransactionStructure {
207    Sdk,
208    #[default]
209    View,
210}
211
212impl TransactionStructure {
213    pub const fn cli_names() -> &'static [&'static str] {
214        Self::VARIANTS
215    }
216
217    pub fn cli_message() -> &'static str {
218        "Switch internal transaction structure/representation"
219    }
220}
221
222/// Configuration for the block generator invalidator for replay.
223#[derive(Clone, Debug)]
224pub struct GeneratorConfig {
225    pub accounts_path: String,
226    pub starting_keypairs: Arc<Vec<Keypair>>,
227}
228
229pub struct ValidatorConfig {
230    pub halt_at_slot: Option<Slot>,
231    pub expected_genesis_hash: Option<Hash>,
232    pub expected_bank_hash: Option<Hash>,
233    pub expected_shred_version: Option<u16>,
234    pub voting_disabled: bool,
235    pub account_paths: Vec<PathBuf>,
236    pub account_snapshot_paths: Vec<PathBuf>,
237    pub rpc_config: JsonRpcConfig,
238    /// Specifies which plugins to start up with
239    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
240    pub geyser_plugin_always_enabled: bool,
241    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
242    pub pubsub_config: PubSubConfig,
243    pub snapshot_config: SnapshotConfig,
244    pub max_ledger_shreds: Option<u64>,
245    pub blockstore_options: BlockstoreOptions,
246    pub broadcast_stage_type: BroadcastStageType,
247    pub turbine_disabled: Arc<AtomicBool>,
248    pub fixed_leader_schedule: Option<FixedSchedule>,
249    pub wait_for_supermajority: Option<Slot>,
250    pub new_hard_forks: Option<Vec<Slot>>,
251    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
252    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
253    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
254    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
255    pub max_genesis_archive_unpacked_size: u64,
256    /// Run PoH, transaction signature and other transaction verifications during blockstore
257    /// processing.
258    pub run_verification: bool,
259    pub require_tower: bool,
260    pub tower_storage: Arc<dyn TowerStorage>,
261    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
262    pub contact_debug_interval: u64,
263    pub contact_save_interval: u64,
264    pub send_transaction_service_config: SendTransactionServiceConfig,
265    pub no_poh_speed_test: bool,
266    pub no_os_memory_stats_reporting: bool,
267    pub no_os_network_stats_reporting: bool,
268    pub no_os_cpu_stats_reporting: bool,
269    pub no_os_disk_stats_reporting: bool,
270    pub poh_pinned_cpu_core: usize,
271    pub poh_hashes_per_batch: u64,
272    pub process_ledger_before_services: bool,
273    pub accounts_db_config: Option<AccountsDbConfig>,
274    pub warp_slot: Option<Slot>,
275    pub accounts_db_skip_shrink: bool,
276    pub accounts_db_force_initial_clean: bool,
277    pub tpu_coalesce: Duration,
278    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
279    pub validator_exit: Arc<RwLock<Exit>>,
280    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
281    pub no_wait_for_vote_to_start_leader: bool,
282    pub wait_to_vote_slot: Option<Slot>,
283    pub runtime_config: RuntimeConfig,
284    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
285    pub block_verification_method: BlockVerificationMethod,
286    pub block_production_method: BlockProductionMethod,
287    pub block_production_num_workers: NonZeroUsize,
288    pub transaction_struct: TransactionStructure,
289    pub enable_block_production_forwarding: bool,
290    pub generator_config: Option<GeneratorConfig>,
291    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
292    pub wen_restart_proto_path: Option<PathBuf>,
293    pub wen_restart_coordinator: Option<Pubkey>,
294    pub unified_scheduler_handler_threads: Option<usize>,
295    pub ip_echo_server_threads: NonZeroUsize,
296    pub rayon_global_threads: NonZeroUsize,
297    pub replay_forks_threads: NonZeroUsize,
298    pub replay_transactions_threads: NonZeroUsize,
299    pub tvu_shred_sigverify_threads: NonZeroUsize,
300    pub delay_leader_block_for_pending_fork: bool,
301    pub use_tpu_client_next: bool,
302    pub retransmit_xdp: Option<XdpConfig>,
303    pub repair_handler_type: RepairHandlerType,
304}
305
306impl ValidatorConfig {
307    pub fn default_for_test() -> Self {
308        let max_thread_count =
309            NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
310
311        Self {
312            halt_at_slot: None,
313            expected_genesis_hash: None,
314            expected_bank_hash: None,
315            expected_shred_version: None,
316            voting_disabled: false,
317            max_ledger_shreds: None,
318            blockstore_options: BlockstoreOptions::default_for_tests(),
319            account_paths: Vec::new(),
320            account_snapshot_paths: Vec::new(),
321            rpc_config: JsonRpcConfig::default_for_test(),
322            on_start_geyser_plugin_config_files: None,
323            geyser_plugin_always_enabled: false,
324            rpc_addrs: None,
325            pubsub_config: PubSubConfig::default(),
326            snapshot_config: SnapshotConfig::new_load_only(),
327            broadcast_stage_type: BroadcastStageType::Standard,
328            turbine_disabled: Arc::<AtomicBool>::default(),
329            fixed_leader_schedule: None,
330            wait_for_supermajority: None,
331            new_hard_forks: None,
332            known_validators: None,
333            repair_validators: None,
334            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
335            gossip_validators: None,
336            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
337            run_verification: true,
338            require_tower: false,
339            tower_storage: Arc::new(NullTowerStorage::default()),
340            debug_keys: None,
341            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
342            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
343            send_transaction_service_config: SendTransactionServiceConfig::default(),
344            no_poh_speed_test: true,
345            no_os_memory_stats_reporting: true,
346            no_os_network_stats_reporting: true,
347            no_os_cpu_stats_reporting: true,
348            no_os_disk_stats_reporting: true,
349            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
350            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
351            process_ledger_before_services: false,
352            warp_slot: None,
353            accounts_db_skip_shrink: false,
354            accounts_db_force_initial_clean: false,
355            tpu_coalesce: DEFAULT_TPU_COALESCE,
356            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
357            validator_exit: Arc::new(RwLock::new(Exit::default())),
358            validator_exit_backpressure: HashMap::default(),
359            no_wait_for_vote_to_start_leader: true,
360            accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
361            wait_to_vote_slot: None,
362            runtime_config: RuntimeConfig::default(),
363            banking_trace_dir_byte_limit: 0,
364            block_verification_method: BlockVerificationMethod::default(),
365            block_production_method: BlockProductionMethod::default(),
366            block_production_num_workers: BankingStage::default_num_workers(),
367            transaction_struct: TransactionStructure::default(),
368            // enable forwarding by default for tests
369            enable_block_production_forwarding: true,
370            generator_config: None,
371            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
372            wen_restart_proto_path: None,
373            wen_restart_coordinator: None,
374            unified_scheduler_handler_threads: None,
375            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
376            rayon_global_threads: max_thread_count,
377            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
378            replay_transactions_threads: max_thread_count,
379            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
380                .expect("thread count is non-zero"),
381            delay_leader_block_for_pending_fork: false,
382            use_tpu_client_next: true,
383            retransmit_xdp: None,
384            repair_handler_type: RepairHandlerType::default(),
385        }
386    }
387
388    pub fn enable_default_rpc_block_subscribe(&mut self) {
389        let pubsub_config = PubSubConfig {
390            enable_block_subscription: true,
391            ..PubSubConfig::default()
392        };
393        let rpc_config = JsonRpcConfig {
394            enable_rpc_transaction_history: true,
395            ..JsonRpcConfig::default_for_test()
396        };
397
398        self.pubsub_config = pubsub_config;
399        self.rpc_config = rpc_config;
400    }
401}
402
403// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
404// the admin RPC channel to help them to follow the general progress of node startup without
405// having to watch log messages.
406#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
407pub enum ValidatorStartProgress {
408    Initializing, // Catch all, default state
409    SearchingForRpcService,
410    DownloadingSnapshot {
411        slot: Slot,
412        rpc_addr: SocketAddr,
413    },
414    CleaningBlockStore,
415    CleaningAccounts,
416    LoadingLedger,
417    ProcessingLedger {
418        slot: Slot,
419        max_slot: Slot,
420    },
421    StartingServices,
422    Halted, // Validator halted due to `--dev-halt-at-slot` argument
423    WaitingForSupermajority {
424        slot: Slot,
425        gossip_stake_percent: u64,
426    },
427
428    // `Running` is the terminal state once the validator fully starts and all services are
429    // operational
430    Running,
431}
432
433impl Default for ValidatorStartProgress {
434    fn default() -> Self {
435        Self::Initializing
436    }
437}
438
439struct BlockstoreRootScan {
440    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
441}
442
443impl BlockstoreRootScan {
444    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
445        let thread = if config.rpc_addrs.is_some()
446            && config.rpc_config.enable_rpc_transaction_history
447            && config.rpc_config.rpc_scan_and_fix_roots
448        {
449            Some(
450                Builder::new()
451                    .name("solBStoreRtScan".to_string())
452                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
453                    .unwrap(),
454            )
455        } else {
456            None
457        };
458        Self { thread }
459    }
460
461    fn join(self) {
462        if let Some(blockstore_root_scan) = self.thread {
463            if let Err(err) = blockstore_root_scan.join() {
464                warn!("blockstore_root_scan failed to join {err:?}");
465            }
466        }
467    }
468}
469
470#[derive(Default)]
471struct TransactionHistoryServices {
472    transaction_status_sender: Option<TransactionStatusSender>,
473    transaction_status_service: Option<TransactionStatusService>,
474    max_complete_transaction_status_slot: Arc<AtomicU64>,
475}
476
477/// A struct easing passing Validator TPU Configurations
478pub struct ValidatorTpuConfig {
479    /// Controls if to use QUIC for sending regular TPU transaction
480    pub use_quic: bool,
481    /// Controls if to use QUIC for sending TPU votes
482    pub vote_use_quic: bool,
483    /// Controls the connection cache pool size
484    pub tpu_connection_pool_size: usize,
485    /// Controls if to enable UDP for TPU tansactions.
486    pub tpu_enable_udp: bool,
487    /// QUIC server config for regular TPU
488    pub tpu_quic_server_config: QuicServerParams,
489    /// QUIC server config for TPU forward
490    pub tpu_fwd_quic_server_config: QuicServerParams,
491    /// QUIC server config for Vote
492    pub vote_quic_server_config: QuicServerParams,
493}
494
495impl ValidatorTpuConfig {
496    /// A convenient function to build a ValidatorTpuConfig for testing with good
497    /// default.
498    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
499        let tpu_quic_server_config = QuicServerParams {
500            max_connections_per_ipaddr_per_min: 32,
501            coalesce_channel_size: 100_000, // smaller channel size for faster test
502            ..Default::default()
503        };
504
505        let tpu_fwd_quic_server_config = QuicServerParams {
506            max_connections_per_ipaddr_per_min: 32,
507            max_unstaked_connections: 0,
508            coalesce_channel_size: 100_000, // smaller channel size for faster test
509            ..Default::default()
510        };
511
512        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
513        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
514
515        ValidatorTpuConfig {
516            use_quic: DEFAULT_TPU_USE_QUIC,
517            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
518            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
519            tpu_enable_udp,
520            tpu_quic_server_config,
521            tpu_fwd_quic_server_config,
522            vote_quic_server_config,
523        }
524    }
525}
526
527pub struct Validator {
528    validator_exit: Arc<RwLock<Exit>>,
529    json_rpc_service: Option<JsonRpcService>,
530    pubsub_service: Option<PubSubService>,
531    rpc_completed_slots_service: Option<JoinHandle<()>>,
532    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
533    transaction_status_service: Option<TransactionStatusService>,
534    entry_notifier_service: Option<EntryNotifierService>,
535    system_monitor_service: Option<SystemMonitorService>,
536    sample_performance_service: Option<SamplePerformanceService>,
537    stats_reporter_service: StatsReporterService,
538    gossip_service: GossipService,
539    serve_repair_service: ServeRepairService,
540    completed_data_sets_service: Option<CompletedDataSetsService>,
541    snapshot_packager_service: Option<SnapshotPackagerService>,
542    poh_recorder: Arc<RwLock<PohRecorder>>,
543    poh_service: PohService,
544    tpu: Tpu,
545    tvu: Tvu,
546    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
547    pub cluster_info: Arc<ClusterInfo>,
548    pub bank_forks: Arc<RwLock<BankForks>>,
549    pub blockstore: Arc<Blockstore>,
550    geyser_plugin_service: Option<GeyserPluginService>,
551    blockstore_metric_report_service: BlockstoreMetricReportService,
552    accounts_background_service: AccountsBackgroundService,
553    turbine_quic_endpoint: Option<Endpoint>,
554    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
555    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
556    repair_quic_endpoints: Option<[Endpoint; 3]>,
557    repair_quic_endpoints_runtime: Option<TokioRuntime>,
558    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
559    xdp_retransmitter: Option<XdpRetransmitter>,
560    // This runtime is used to run the client owned by SendTransactionService.
561    // We don't wait for its JoinHandle here because ownership and shutdown
562    // are managed elsewhere. This variable is intentionally unused.
563    _tpu_client_next_runtime: Option<TokioRuntime>,
564}
565
566impl Validator {
567    #[allow(clippy::too_many_arguments)]
568    pub fn new(
569        mut node: Node,
570        identity_keypair: Arc<Keypair>,
571        ledger_path: &Path,
572        vote_account: &Pubkey,
573        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
574        cluster_entrypoints: Vec<ContactInfo>,
575        config: &ValidatorConfig,
576        should_check_duplicate_instance: bool,
577        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
578        start_progress: Arc<RwLock<ValidatorStartProgress>>,
579        socket_addr_space: SocketAddrSpace,
580        tpu_config: ValidatorTpuConfig,
581        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
582    ) -> Result<Self> {
583        #[cfg(debug_assertions)]
584        const DEBUG_ASSERTION_STATUS: &str = "enabled";
585        #[cfg(not(debug_assertions))]
586        const DEBUG_ASSERTION_STATUS: &str = "disabled";
587        info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
588
589        let ValidatorTpuConfig {
590            use_quic,
591            vote_use_quic,
592            tpu_connection_pool_size,
593            tpu_enable_udp,
594            tpu_quic_server_config,
595            tpu_fwd_quic_server_config,
596            vote_quic_server_config,
597        } = tpu_config;
598
599        let start_time = Instant::now();
600
601        // Initialize the global rayon pool first to ensure the value in config
602        // is honored. Otherwise, some code accessing the global pool could
603        // cause it to get initialized with Rayon's default (not ours)
604        if rayon::ThreadPoolBuilder::new()
605            .thread_name(|i| format!("solRayonGlob{i:02}"))
606            .num_threads(config.rayon_global_threads.get())
607            .build_global()
608            .is_err()
609        {
610            warn!("Rayon global thread pool already initialized");
611        }
612
613        let id = identity_keypair.pubkey();
614        assert_eq!(&id, node.info.pubkey());
615
616        info!("identity pubkey: {id}");
617        info!("vote account pubkey: {vote_account}");
618
619        if !config.no_os_network_stats_reporting {
620            verify_net_stats_access().map_err(|e| {
621                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
622            })?;
623        }
624
625        let mut bank_notification_senders = Vec::new();
626
627        let exit = Arc::new(AtomicBool::new(false));
628
629        let geyser_plugin_config_files = config
630            .on_start_geyser_plugin_config_files
631            .as_ref()
632            .map(Cow::Borrowed)
633            .or_else(|| {
634                config
635                    .geyser_plugin_always_enabled
636                    .then_some(Cow::Owned(vec![]))
637            });
638        let geyser_plugin_service =
639            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
640                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
641                bank_notification_senders.push(confirmed_bank_sender);
642                let rpc_to_plugin_manager_receiver_and_exit =
643                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
644                Some(
645                    GeyserPluginService::new_with_receiver(
646                        confirmed_bank_receiver,
647                        config.geyser_plugin_always_enabled,
648                        geyser_plugin_config_files.as_ref(),
649                        rpc_to_plugin_manager_receiver_and_exit,
650                    )
651                    .map_err(|err| {
652                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
653                    })?,
654                )
655            } else {
656                None
657            };
658
659        if config.voting_disabled {
660            warn!("voting disabled");
661            authorized_voter_keypairs.write().unwrap().clear();
662        } else {
663            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
664                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
665            }
666        }
667
668        for cluster_entrypoint in &cluster_entrypoints {
669            info!("entrypoint: {cluster_entrypoint:?}");
670        }
671
672        if solana_perf::perf_libs::api().is_some() {
673            info!("Initializing sigverify, this could take a while...");
674        } else {
675            info!("Initializing sigverify...");
676        }
677        sigverify::init();
678        info!("Initializing sigverify done.");
679
680        if !ledger_path.is_dir() {
681            return Err(anyhow!(
682                "ledger directory does not exist or is not accessible: {ledger_path:?}"
683            ));
684        }
685        let genesis_config = load_genesis(config, ledger_path)?;
686        metrics_config_sanity_check(genesis_config.cluster_type)?;
687
688        info!("Cleaning accounts paths..");
689        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
690        let mut timer = Measure::start("clean_accounts_paths");
691        cleanup_accounts_paths(config);
692        timer.stop();
693        info!("Cleaning accounts paths done. {timer}");
694
695        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
696        snapshot_utils::purge_old_bank_snapshots_at_startup(
697            &config.snapshot_config.bank_snapshots_dir,
698        );
699
700        info!("Cleaning orphaned account snapshot directories..");
701        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
702        clean_orphaned_account_snapshot_dirs(
703            &config.snapshot_config.bank_snapshots_dir,
704            &config.account_snapshot_paths,
705        )
706        .context("failed to clean orphaned account snapshot directories")?;
707        timer.stop();
708        info!("Cleaning orphaned account snapshot directories done. {timer}");
709
710        // token used to cancel tpu-client-next.
711        let cancel_tpu_client_next = CancellationToken::new();
712        {
713            let exit = exit.clone();
714            config
715                .validator_exit
716                .write()
717                .unwrap()
718                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
719            let cancel_tpu_client_next = cancel_tpu_client_next.clone();
720            config
721                .validator_exit
722                .write()
723                .unwrap()
724                .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
725        }
726
727        let (
728            accounts_update_notifier,
729            transaction_notifier,
730            entry_notifier,
731            block_metadata_notifier,
732            slot_status_notifier,
733        ) = if let Some(service) = &geyser_plugin_service {
734            (
735                service.get_accounts_update_notifier(),
736                service.get_transaction_notifier(),
737                service.get_entry_notifier(),
738                service.get_block_metadata_notifier(),
739                service.get_slot_status_notifier(),
740            )
741        } else {
742            (None, None, None, None, None)
743        };
744
745        info!(
746            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
747             entry_notifier: {}",
748            accounts_update_notifier.is_some(),
749            transaction_notifier.is_some(),
750            entry_notifier.is_some()
751        );
752
753        let system_monitor_service = Some(SystemMonitorService::new(
754            exit.clone(),
755            SystemMonitorStatsReportConfig {
756                report_os_memory_stats: !config.no_os_memory_stats_reporting,
757                report_os_network_stats: !config.no_os_network_stats_reporting,
758                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
759                report_os_disk_stats: !config.no_os_disk_stats_reporting,
760            },
761        ));
762
763        let dependency_tracker = Arc::new(DependencyTracker::default());
764
765        let (
766            bank_forks,
767            blockstore,
768            original_blockstore_root,
769            ledger_signal_receiver,
770            leader_schedule_cache,
771            starting_snapshot_hashes,
772            TransactionHistoryServices {
773                transaction_status_sender,
774                transaction_status_service,
775                max_complete_transaction_status_slot,
776            },
777            blockstore_process_options,
778            blockstore_root_scan,
779            pruned_banks_receiver,
780            entry_notifier_service,
781        ) = load_blockstore(
782            config,
783            ledger_path,
784            &genesis_config,
785            exit.clone(),
786            &start_progress,
787            accounts_update_notifier,
788            transaction_notifier,
789            entry_notifier,
790            config
791                .rpc_addrs
792                .is_some()
793                .then(|| dependency_tracker.clone()),
794        )
795        .map_err(ValidatorError::Other)?;
796
797        if !config.no_poh_speed_test {
798            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
799        }
800
801        let (root_slot, hard_forks) = {
802            let root_bank = bank_forks.read().unwrap().root_bank();
803            (root_bank.slot(), root_bank.hard_forks())
804        };
805        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
806        info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
807
808        if let Some(expected_shred_version) = config.expected_shred_version {
809            if expected_shred_version != shred_version {
810                return Err(ValidatorError::ShredVersionMismatch {
811                    actual: shred_version,
812                    expected: expected_shred_version,
813                }
814                .into());
815            }
816        }
817
818        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
819            config,
820            &blockstore,
821            root_slot,
822            &hard_forks,
823        )? {
824            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
825            cleanup_blockstore_incorrect_shred_versions(
826                &blockstore,
827                config,
828                start_slot,
829                shred_version,
830            )?;
831        } else {
832            info!("Skipping the blockstore check for shreds with incorrect version");
833        }
834
835        node.info.set_shred_version(shred_version);
836        node.info.set_wallclock(timestamp());
837        Self::print_node_info(&node);
838
839        let mut cluster_info = ClusterInfo::new(
840            node.info.clone(),
841            identity_keypair.clone(),
842            socket_addr_space,
843        );
844        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
845        cluster_info.set_entrypoints(cluster_entrypoints);
846        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
847        cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
848        let cluster_info = Arc::new(cluster_info);
849        let node_multihoming = NodeMultihoming::from(&node);
850
851        assert!(is_snapshot_config_valid(&config.snapshot_config));
852
853        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
854        let snapshot_controller = Arc::new(SnapshotController::new(
855            snapshot_request_sender.clone(),
856            config.snapshot_config.clone(),
857            bank_forks.read().unwrap().root(),
858        ));
859
860        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
861        let snapshot_packager_service = if snapshot_controller
862            .snapshot_config()
863            .should_generate_snapshots()
864        {
865            let exit_backpressure = config
866                .validator_exit_backpressure
867                .get(SnapshotPackagerService::NAME)
868                .cloned();
869            let enable_gossip_push = true;
870            let snapshot_packager_service = SnapshotPackagerService::new(
871                pending_snapshot_packages.clone(),
872                starting_snapshot_hashes,
873                exit.clone(),
874                exit_backpressure,
875                cluster_info.clone(),
876                snapshot_controller.clone(),
877                enable_gossip_push,
878            );
879            Some(snapshot_packager_service)
880        } else {
881            None
882        };
883
884        let snapshot_request_handler = SnapshotRequestHandler {
885            snapshot_controller: snapshot_controller.clone(),
886            snapshot_request_receiver,
887            pending_snapshot_packages,
888        };
889        let pruned_banks_request_handler = PrunedBanksRequestHandler {
890            pruned_banks_receiver,
891        };
892        let accounts_background_service = AccountsBackgroundService::new(
893            bank_forks.clone(),
894            exit.clone(),
895            AbsRequestHandlers {
896                snapshot_request_handler,
897                pruned_banks_request_handler,
898            },
899        );
900        info!(
901            "Using: block-verification-method: {}, block-production-method: {}, \
902             transaction-structure: {}",
903            config.block_verification_method,
904            config.block_production_method,
905            config.transaction_struct
906        );
907
908        let (replay_vote_sender, replay_vote_receiver) = unbounded();
909
910        // block min prioritization fee cache should be readable by RPC, and writable by validator
911        // (by both replay stage and banking stage)
912        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
913
914        let leader_schedule_cache = Arc::new(leader_schedule_cache);
915        let startup_verification_complete;
916        let (mut poh_recorder, entry_receiver) = {
917            let bank = &bank_forks.read().unwrap().working_bank();
918            startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
919            PohRecorder::new_with_clear_signal(
920                bank.tick_height(),
921                bank.last_blockhash(),
922                bank.clone(),
923                None,
924                bank.ticks_per_slot(),
925                config.delay_leader_block_for_pending_fork,
926                blockstore.clone(),
927                blockstore.get_new_shred_signal(0),
928                &leader_schedule_cache,
929                &genesis_config.poh_config,
930                exit.clone(),
931            )
932        };
933        if transaction_status_sender.is_some() {
934            poh_recorder.track_transaction_indexes();
935        }
936        let (record_sender, record_receiver) = unbounded();
937        let transaction_recorder =
938            TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
939        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
940
941        let (banking_tracer, tracer_thread) =
942            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
943                &blockstore.banking_trace_path(),
944                exit.clone(),
945                config.banking_trace_dir_byte_limit,
946            )))?;
947        if banking_tracer.is_enabled() {
948            info!(
949                "Enabled banking trace (dir_byte_limit: {})",
950                config.banking_trace_dir_byte_limit
951            );
952        } else {
953            info!("Disabled banking trace");
954        }
955        let banking_tracer_channels = banking_tracer.create_channels(false);
956
957        match &config.block_verification_method {
958            BlockVerificationMethod::BlockstoreProcessor => {
959                info!("no scheduler pool is installed for block verification...");
960                if let Some(count) = config.unified_scheduler_handler_threads {
961                    warn!(
962                        "--unified-scheduler-handler-threads={count} is ignored because unified \
963                         scheduler isn't enabled"
964                    );
965                }
966            }
967            BlockVerificationMethod::UnifiedScheduler => {
968                let scheduler_pool = DefaultSchedulerPool::new_dyn(
969                    config.unified_scheduler_handler_threads,
970                    config.runtime_config.log_messages_bytes_limit,
971                    transaction_status_sender.clone(),
972                    Some(replay_vote_sender.clone()),
973                    prioritization_fee_cache.clone(),
974                );
975                bank_forks
976                    .write()
977                    .unwrap()
978                    .install_scheduler_pool(scheduler_pool);
979            }
980        }
981
982        let entry_notification_sender = entry_notifier_service
983            .as_ref()
984            .map(|service| service.sender());
985        let mut process_blockstore = ProcessBlockStore::new(
986            &id,
987            vote_account,
988            &start_progress,
989            &blockstore,
990            original_blockstore_root,
991            &bank_forks,
992            &leader_schedule_cache,
993            &blockstore_process_options,
994            transaction_status_sender.as_ref(),
995            entry_notification_sender,
996            blockstore_root_scan,
997            &snapshot_controller,
998            config,
999        );
1000
1001        maybe_warp_slot(
1002            config,
1003            &mut process_blockstore,
1004            ledger_path,
1005            &bank_forks,
1006            &leader_schedule_cache,
1007            &snapshot_controller,
1008        )
1009        .map_err(ValidatorError::Other)?;
1010
1011        if config.process_ledger_before_services {
1012            process_blockstore
1013                .process()
1014                .map_err(ValidatorError::Other)?;
1015        }
1016        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1017
1018        let sample_performance_service =
1019            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1020                Some(SamplePerformanceService::new(
1021                    &bank_forks,
1022                    blockstore.clone(),
1023                    exit.clone(),
1024                ))
1025            } else {
1026                None
1027            };
1028
1029        let mut block_commitment_cache = BlockCommitmentCache::default();
1030        let bank_forks_guard = bank_forks.read().unwrap();
1031        block_commitment_cache.initialize_slots(
1032            bank_forks_guard.working_bank().slot(),
1033            bank_forks_guard.root(),
1034        );
1035        drop(bank_forks_guard);
1036        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1037
1038        let optimistically_confirmed_bank =
1039            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1040
1041        let max_slots = Arc::new(MaxSlots::default());
1042
1043        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044
1045        let mut tpu_transactions_forwards_client =
1046            Some(node.sockets.tpu_transaction_forwarding_client);
1047
1048        let connection_cache = match (config.use_tpu_client_next, use_quic) {
1049            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1050                "connection_cache_tpu_quic",
1051                tpu_connection_pool_size,
1052                Some(
1053                    tpu_transactions_forwards_client
1054                        .take()
1055                        .expect("Socket should exist."),
1056                ),
1057                Some((
1058                    &identity_keypair,
1059                    node.info
1060                        .tpu(Protocol::UDP)
1061                        .ok_or_else(|| {
1062                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1063                        })?
1064                        .ip(),
1065                )),
1066                Some((&staked_nodes, &identity_keypair.pubkey())),
1067            ))),
1068            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1069                "connection_cache_tpu_udp",
1070                tpu_connection_pool_size,
1071            ))),
1072            (true, _) => None,
1073        };
1074
1075        let vote_connection_cache = if vote_use_quic {
1076            let vote_connection_cache = ConnectionCache::new_with_client_options(
1077                "connection_cache_vote_quic",
1078                tpu_connection_pool_size,
1079                Some(node.sockets.quic_vote_client),
1080                Some((
1081                    &identity_keypair,
1082                    node.info
1083                        .tpu_vote(Protocol::QUIC)
1084                        .ok_or_else(|| {
1085                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1086                        })?
1087                        .ip(),
1088                )),
1089                Some((&staked_nodes, &identity_keypair.pubkey())),
1090            );
1091            Arc::new(vote_connection_cache)
1092        } else {
1093            Arc::new(ConnectionCache::with_udp(
1094                "connection_cache_vote_udp",
1095                tpu_connection_pool_size,
1096            ))
1097        };
1098
1099        // test-validator crate may start the validator in a tokio runtime
1100        // context which forces us to use the same runtime because a nested
1101        // runtime will cause panic at drop. Outside test-validator crate, we
1102        // always need a tokio runtime (and the respective handle) to initialize
1103        // the turbine QUIC endpoint.
1104        let current_runtime_handle = tokio::runtime::Handle::try_current();
1105        let tpu_client_next_runtime =
1106            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1107                tokio::runtime::Builder::new_multi_thread()
1108                    .enable_all()
1109                    .worker_threads(2)
1110                    .thread_name("solTpuClientRt")
1111                    .build()
1112                    .unwrap()
1113            });
1114
1115        let rpc_override_health_check =
1116            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1117        let (
1118            json_rpc_service,
1119            rpc_subscriptions,
1120            pubsub_service,
1121            completed_data_sets_sender,
1122            completed_data_sets_service,
1123            rpc_completed_slots_service,
1124            optimistically_confirmed_bank_tracker,
1125            bank_notification_sender,
1126        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1127            assert_eq!(
1128                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1129                node.info
1130                    .rpc_pubsub()
1131                    .map(|addr| socket_addr_space.check(&addr))
1132            );
1133            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1134            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1135                Some(Arc::new(RwLock::new(bank_notification_senders)))
1136            } else {
1137                None
1138            };
1139
1140            let client_option = if config.use_tpu_client_next {
1141                let runtime_handle = tpu_client_next_runtime
1142                    .as_ref()
1143                    .map(TokioRuntime::handle)
1144                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1145                ClientOption::TpuClientNext(
1146                    Arc::as_ref(&identity_keypair),
1147                    node.sockets.rpc_sts_client,
1148                    runtime_handle.clone(),
1149                    cancel_tpu_client_next.clone(),
1150                )
1151            } else {
1152                let Some(connection_cache) = &connection_cache else {
1153                    panic!("ConnectionCache should exist by construction.");
1154                };
1155                ClientOption::ConnectionCache(connection_cache.clone())
1156            };
1157            let rpc_svc_config = JsonRpcServiceConfig {
1158                rpc_addr,
1159                rpc_config: config.rpc_config.clone(),
1160                snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1161                bank_forks: bank_forks.clone(),
1162                block_commitment_cache: block_commitment_cache.clone(),
1163                blockstore: blockstore.clone(),
1164                cluster_info: cluster_info.clone(),
1165                poh_recorder: Some(poh_recorder.clone()),
1166                genesis_hash: genesis_config.hash(),
1167                ledger_path: ledger_path.to_path_buf(),
1168                validator_exit: config.validator_exit.clone(),
1169                exit: exit.clone(),
1170                override_health_check: rpc_override_health_check.clone(),
1171                startup_verification_complete,
1172                optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1173                send_transaction_service_config: config.send_transaction_service_config.clone(),
1174                max_slots: max_slots.clone(),
1175                leader_schedule_cache: leader_schedule_cache.clone(),
1176                max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1177                prioritization_fee_cache: prioritization_fee_cache.clone(),
1178                client_option,
1179            };
1180            let json_rpc_service =
1181                JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1182            let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1183                exit.clone(),
1184                max_complete_transaction_status_slot,
1185                blockstore.clone(),
1186                bank_forks.clone(),
1187                block_commitment_cache.clone(),
1188                optimistically_confirmed_bank.clone(),
1189                &config.pubsub_config,
1190                None,
1191            ));
1192            let pubsub_service = if !config.rpc_config.full_api {
1193                None
1194            } else {
1195                let (trigger, pubsub_service) = PubSubService::new(
1196                    config.pubsub_config.clone(),
1197                    &rpc_subscriptions,
1198                    rpc_pubsub_addr,
1199                );
1200                config
1201                    .validator_exit
1202                    .write()
1203                    .unwrap()
1204                    .register_exit(Box::new(move || trigger.cancel()));
1205
1206                Some(pubsub_service)
1207            };
1208
1209            let (completed_data_sets_sender, completed_data_sets_service) =
1210                if !config.rpc_config.full_api {
1211                    (None, None)
1212                } else {
1213                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1214                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1215                    let completed_data_sets_service = CompletedDataSetsService::new(
1216                        completed_data_sets_receiver,
1217                        blockstore.clone(),
1218                        rpc_subscriptions.clone(),
1219                        exit.clone(),
1220                        max_slots.clone(),
1221                    );
1222                    (
1223                        Some(completed_data_sets_sender),
1224                        Some(completed_data_sets_service),
1225                    )
1226                };
1227
1228            let rpc_completed_slots_service =
1229                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1230                    let (completed_slots_sender, completed_slots_receiver) =
1231                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1232                    blockstore.add_completed_slots_signal(completed_slots_sender);
1233
1234                    Some(RpcCompletedSlotsService::spawn(
1235                        completed_slots_receiver,
1236                        rpc_subscriptions.clone(),
1237                        slot_status_notifier.clone(),
1238                        exit.clone(),
1239                    ))
1240                } else {
1241                    None
1242                };
1243
1244            let dependency_tracker = transaction_status_sender
1245                .is_some()
1246                .then_some(dependency_tracker);
1247            let optimistically_confirmed_bank_tracker =
1248                Some(OptimisticallyConfirmedBankTracker::new(
1249                    bank_notification_receiver,
1250                    exit.clone(),
1251                    bank_forks.clone(),
1252                    optimistically_confirmed_bank,
1253                    rpc_subscriptions.clone(),
1254                    confirmed_bank_subscribers,
1255                    prioritization_fee_cache.clone(),
1256                    dependency_tracker.clone(),
1257                ));
1258            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1259                sender: bank_notification_sender,
1260                should_send_parents: geyser_plugin_service.is_some(),
1261                dependency_tracker,
1262            });
1263            (
1264                Some(json_rpc_service),
1265                Some(rpc_subscriptions),
1266                pubsub_service,
1267                completed_data_sets_sender,
1268                completed_data_sets_service,
1269                rpc_completed_slots_service,
1270                optimistically_confirmed_bank_tracker,
1271                bank_notification_sender_config,
1272            )
1273        } else {
1274            (None, None, None, None, None, None, None, None)
1275        };
1276
1277        if config.halt_at_slot.is_some() {
1278            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1279            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1280            block_commitment_cache
1281                .write()
1282                .unwrap()
1283                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1284
1285            // Park with the RPC service running, ready for inspection!
1286            warn!("Validator halted");
1287            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1288            std::thread::park();
1289        }
1290        let ip_echo_server = match node.sockets.ip_echo {
1291            None => None,
1292            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1293                tcp_listener,
1294                config.ip_echo_server_threads,
1295                Some(node.info.shred_version()),
1296            )),
1297        };
1298
1299        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1300
1301        let stats_reporter_service =
1302            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1303
1304        let gossip_service = GossipService::new(
1305            &cluster_info,
1306            Some(bank_forks.clone()),
1307            node.sockets.gossip.clone(),
1308            config.gossip_validators.clone(),
1309            should_check_duplicate_instance,
1310            Some(stats_reporter_sender.clone()),
1311            exit.clone(),
1312        );
1313        let serve_repair = config.repair_handler_type.create_serve_repair(
1314            blockstore.clone(),
1315            cluster_info.clone(),
1316            bank_forks.read().unwrap().sharable_root_bank(),
1317            config.repair_whitelist.clone(),
1318        );
1319        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1320        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1321        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1322            unbounded();
1323
1324        let waited_for_supermajority = wait_for_supermajority(
1325            config,
1326            Some(&mut process_blockstore),
1327            &bank_forks,
1328            &cluster_info,
1329            rpc_override_health_check,
1330            &start_progress,
1331        )?;
1332
1333        let blockstore_metric_report_service =
1334            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1335
1336        let wait_for_vote_to_start_leader =
1337            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1338
1339        let poh_service = PohService::new(
1340            poh_recorder.clone(),
1341            &genesis_config.poh_config,
1342            exit.clone(),
1343            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1344            config.poh_pinned_cpu_core,
1345            config.poh_hashes_per_batch,
1346            record_receiver,
1347        );
1348        assert_eq!(
1349            blockstore.get_new_shred_signals_len(),
1350            1,
1351            "New shred signal for the TVU should be the same as the clear bank signal."
1352        );
1353
1354        let vote_tracker = Arc::<VoteTracker>::default();
1355
1356        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1357        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1358        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1359        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1360
1361        let entry_notification_sender = entry_notifier_service
1362            .as_ref()
1363            .map(|service| service.sender_cloned());
1364
1365        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367            .then(|| {
1368                tokio::runtime::Builder::new_multi_thread()
1369                    .enable_all()
1370                    .thread_name("solTurbineQuic")
1371                    .build()
1372                    .unwrap()
1373            });
1374        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375        let (
1376            turbine_quic_endpoint,
1377            turbine_quic_endpoint_sender,
1378            turbine_quic_endpoint_join_handle,
1379        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381            (None, sender, None)
1382        } else {
1383            solana_turbine::quic_endpoint::new_quic_endpoint(
1384                turbine_quic_endpoint_runtime
1385                    .as_ref()
1386                    .map(TokioRuntime::handle)
1387                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388                &identity_keypair,
1389                node.sockets.tvu_quic,
1390                turbine_quic_endpoint_sender,
1391                bank_forks.clone(),
1392            )
1393            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394            .unwrap()
1395        };
1396
1397        // Repair quic endpoint.
1398        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400            .then(|| {
1401                tokio::runtime::Builder::new_multi_thread()
1402                    .enable_all()
1403                    .thread_name("solRepairQuic")
1404                    .build()
1405                    .unwrap()
1406            });
1407        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409                (None, RepairQuicAsyncSenders::new_dummy(), None)
1410            } else {
1411                let repair_quic_sockets = RepairQuicSockets {
1412                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1413                    repair_client_quic_socket: node.sockets.repair_quic,
1414                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415                };
1416                let repair_quic_senders = RepairQuicSenders {
1417                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1418                    repair_response_quic_sender,
1419                    ancestor_hashes_response_quic_sender,
1420                };
1421                repair::quic_endpoint::new_quic_endpoints(
1422                    repair_quic_endpoints_runtime
1423                        .as_ref()
1424                        .map(TokioRuntime::handle)
1425                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426                    &identity_keypair,
1427                    repair_quic_sockets,
1428                    repair_quic_senders,
1429                    bank_forks.clone(),
1430                )
1431                .map(|(endpoints, senders, join_handle)| {
1432                    (Some(endpoints), senders, Some(join_handle))
1433                })
1434                .unwrap()
1435            };
1436        let serve_repair_service = ServeRepairService::new(
1437            serve_repair,
1438            // Incoming UDP repair requests are adapted into RemoteRequest
1439            // and also sent through the same channel.
1440            repair_request_quic_sender,
1441            repair_request_quic_receiver,
1442            repair_quic_async_senders.repair_response_quic_sender,
1443            node.sockets.serve_repair,
1444            socket_addr_space,
1445            stats_reporter_sender,
1446            exit.clone(),
1447        );
1448
1449        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1450        let wen_restart_repair_slots = if in_wen_restart {
1451            Some(Arc::new(RwLock::new(Vec::new())))
1452        } else {
1453            None
1454        };
1455        let tower = match process_blockstore.process_to_create_tower() {
1456            Ok(tower) => {
1457                info!("Tower state: {tower:?}");
1458                tower
1459            }
1460            Err(e) => {
1461                warn!("Unable to retrieve tower: {e:?} creating default tower....");
1462                Tower::default()
1463            }
1464        };
1465        let last_vote = tower.last_vote();
1466
1467        let outstanding_repair_requests =
1468            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1469        let cluster_slots =
1470            Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1471
1472        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
1473        let connection_cache_for_warmup =
1474            if json_rpc_service.is_some() && connection_cache.is_some() {
1475                connection_cache.as_ref()
1476            } else {
1477                None
1478            };
1479        let (xdp_retransmitter, xdp_sender) =
1480            if let Some(xdp_config) = config.retransmit_xdp.clone() {
1481                let src_port = node.sockets.retransmit_sockets[0]
1482                    .local_addr()
1483                    .expect("failed to get local address")
1484                    .port();
1485                let src_ip = match node.bind_ip_addrs.active() {
1486                    IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
1487                    IpAddr::V4(_unspecified) => xdp_config
1488                        .interface
1489                        .as_ref()
1490                        .and_then(|iface| master_ip_if_bonded(iface)),
1491                    _ => panic!("IPv6 not supported"),
1492                };
1493                let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip)
1494                    .expect("failed to create xdp retransmitter");
1495                (Some(rtx), Some(sender))
1496            } else {
1497                (None, None)
1498            };
1499
1500        // disable all2all tests if not allowed for a given cluster type
1501        let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1502            || genesis_config.cluster_type == ClusterType::Development
1503        {
1504            node.sockets.alpenglow
1505        } else {
1506            None
1507        };
1508
1509        let tvu = Tvu::new(
1510            vote_account,
1511            authorized_voter_keypairs,
1512            &bank_forks,
1513            &cluster_info,
1514            TvuSockets {
1515                repair: node.sockets.repair.try_clone().unwrap(),
1516                retransmit: node.sockets.retransmit_sockets,
1517                fetch: node.sockets.tvu,
1518                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1519                alpenglow: alpenglow_socket,
1520            },
1521            blockstore.clone(),
1522            ledger_signal_receiver,
1523            rpc_subscriptions.clone(),
1524            &poh_recorder,
1525            tower,
1526            config.tower_storage.clone(),
1527            &leader_schedule_cache,
1528            exit.clone(),
1529            block_commitment_cache,
1530            config.turbine_disabled.clone(),
1531            transaction_status_sender.clone(),
1532            entry_notification_sender.clone(),
1533            vote_tracker.clone(),
1534            retransmit_slots_sender,
1535            gossip_verified_vote_hash_receiver,
1536            verified_vote_receiver,
1537            replay_vote_sender.clone(),
1538            completed_data_sets_sender,
1539            bank_notification_sender.clone(),
1540            duplicate_confirmed_slots_receiver,
1541            TvuConfig {
1542                max_ledger_shreds: config.max_ledger_shreds,
1543                shred_version: node.info.shred_version(),
1544                repair_validators: config.repair_validators.clone(),
1545                repair_whitelist: config.repair_whitelist.clone(),
1546                wait_for_vote_to_start_leader,
1547                replay_forks_threads: config.replay_forks_threads,
1548                replay_transactions_threads: config.replay_transactions_threads,
1549                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1550                xdp_sender: xdp_sender.clone(),
1551            },
1552            &max_slots,
1553            block_metadata_notifier,
1554            config.wait_to_vote_slot,
1555            Some(snapshot_controller.clone()),
1556            config.runtime_config.log_messages_bytes_limit,
1557            connection_cache_for_warmup,
1558            &prioritization_fee_cache,
1559            banking_tracer.clone(),
1560            turbine_quic_endpoint_sender.clone(),
1561            turbine_quic_endpoint_receiver,
1562            repair_response_quic_receiver,
1563            repair_quic_async_senders.repair_request_quic_sender,
1564            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1565            ancestor_hashes_response_quic_receiver,
1566            outstanding_repair_requests.clone(),
1567            cluster_slots.clone(),
1568            wen_restart_repair_slots.clone(),
1569            slot_status_notifier,
1570            vote_connection_cache,
1571        )
1572        .map_err(ValidatorError::Other)?;
1573
1574        if in_wen_restart {
1575            info!("Waiting for wen_restart to finish");
1576            wait_for_wen_restart(WenRestartConfig {
1577                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1578                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1579                last_vote,
1580                blockstore: blockstore.clone(),
1581                cluster_info: cluster_info.clone(),
1582                bank_forks: bank_forks.clone(),
1583                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1584                wait_for_supermajority_threshold_percent:
1585                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1586                snapshot_controller: Some(snapshot_controller.clone()),
1587                abs_status: accounts_background_service.status().clone(),
1588                genesis_config_hash: genesis_config.hash(),
1589                exit: exit.clone(),
1590            })?;
1591            return Err(ValidatorError::WenRestartFinished.into());
1592        }
1593
1594        let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1595        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1596            ForwardingClientOption::ConnectionCache(connection_cache.clone())
1597        } else {
1598            let runtime_handle = tpu_client_next_runtime
1599                .as_ref()
1600                .map(TokioRuntime::handle)
1601                .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1602            ForwardingClientOption::TpuClientNext((
1603                Arc::as_ref(&identity_keypair),
1604                tpu_transactions_forwards_client
1605                    .take()
1606                    .expect("Socket should exist."),
1607                runtime_handle.clone(),
1608                cancel_tpu_client_next,
1609            ))
1610        };
1611        let tpu = Tpu::new_with_client(
1612            &cluster_info,
1613            &poh_recorder,
1614            transaction_recorder,
1615            entry_receiver,
1616            retransmit_slots_receiver,
1617            TpuSockets {
1618                transactions: node.sockets.tpu,
1619                transaction_forwards: node.sockets.tpu_forwards,
1620                vote: node.sockets.tpu_vote,
1621                broadcast: node.sockets.broadcast,
1622                transactions_quic: node.sockets.tpu_quic,
1623                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1624                vote_quic: node.sockets.tpu_vote_quic,
1625                vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1626                vortexor_receivers: node.sockets.vortexor_receivers,
1627            },
1628            rpc_subscriptions.clone(),
1629            transaction_status_sender,
1630            entry_notification_sender,
1631            blockstore.clone(),
1632            &config.broadcast_stage_type,
1633            xdp_sender,
1634            exit,
1635            node.info.shred_version(),
1636            vote_tracker,
1637            bank_forks.clone(),
1638            verified_vote_sender,
1639            gossip_verified_vote_hash_sender,
1640            replay_vote_receiver,
1641            replay_vote_sender,
1642            bank_notification_sender,
1643            config.tpu_coalesce,
1644            duplicate_confirmed_slot_sender,
1645            forwarding_tpu_client,
1646            turbine_quic_endpoint_sender,
1647            &identity_keypair,
1648            config.runtime_config.log_messages_bytes_limit,
1649            &staked_nodes,
1650            config.staked_nodes_overrides.clone(),
1651            banking_tracer_channels,
1652            tracer_thread,
1653            tpu_enable_udp,
1654            tpu_quic_server_config,
1655            tpu_fwd_quic_server_config,
1656            vote_quic_server_config,
1657            &prioritization_fee_cache,
1658            config.block_production_method.clone(),
1659            config.block_production_num_workers,
1660            config.transaction_struct.clone(),
1661            config.enable_block_production_forwarding,
1662            config.generator_config.clone(),
1663            key_notifiers.clone(),
1664        );
1665
1666        datapoint_info!(
1667            "validator-new",
1668            ("id", id.to_string(), String),
1669            ("version", solana_version::version!(), String),
1670            ("cluster_type", genesis_config.cluster_type as u32, i64),
1671            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1672            ("waited_for_supermajority", waited_for_supermajority, bool),
1673            ("shred_version", shred_version as i64, i64),
1674        );
1675
1676        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1677        if config.use_tpu_client_next {
1678            if let Some(json_rpc_service) = &json_rpc_service {
1679                key_notifiers.write().unwrap().add(
1680                    KeyUpdaterType::RpcService,
1681                    json_rpc_service.get_client_key_updater(),
1682                );
1683            }
1684            // note, that we don't need to add ConnectionClient to key_notifiers
1685            // because it is added inside Tpu.
1686        }
1687
1688        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1689            bank_forks: bank_forks.clone(),
1690            cluster_info: cluster_info.clone(),
1691            vote_account: *vote_account,
1692            repair_whitelist: config.repair_whitelist.clone(),
1693            notifies: key_notifiers,
1694            repair_socket: Arc::new(node.sockets.repair),
1695            outstanding_repair_requests,
1696            cluster_slots,
1697            node: Some(Arc::new(node_multihoming)),
1698        });
1699
1700        Ok(Self {
1701            stats_reporter_service,
1702            gossip_service,
1703            serve_repair_service,
1704            json_rpc_service,
1705            pubsub_service,
1706            rpc_completed_slots_service,
1707            optimistically_confirmed_bank_tracker,
1708            transaction_status_service,
1709            entry_notifier_service,
1710            system_monitor_service,
1711            sample_performance_service,
1712            snapshot_packager_service,
1713            completed_data_sets_service,
1714            tpu,
1715            tvu,
1716            poh_service,
1717            poh_recorder,
1718            ip_echo_server,
1719            validator_exit: config.validator_exit.clone(),
1720            cluster_info,
1721            bank_forks,
1722            blockstore,
1723            geyser_plugin_service,
1724            blockstore_metric_report_service,
1725            accounts_background_service,
1726            turbine_quic_endpoint,
1727            turbine_quic_endpoint_runtime,
1728            turbine_quic_endpoint_join_handle,
1729            repair_quic_endpoints,
1730            repair_quic_endpoints_runtime,
1731            repair_quic_endpoints_join_handle,
1732            xdp_retransmitter,
1733            _tpu_client_next_runtime: tpu_client_next_runtime,
1734        })
1735    }
1736
1737    // Used for notifying many nodes in parallel to exit
1738    pub fn exit(&mut self) {
1739        self.validator_exit.write().unwrap().exit();
1740
1741        // drop all signals in blockstore
1742        self.blockstore.drop_signal();
1743    }
1744
1745    pub fn close(mut self) {
1746        self.exit();
1747        self.join();
1748    }
1749
1750    fn print_node_info(node: &Node) {
1751        info!("{:?}", node.info);
1752        info!(
1753            "local gossip address: {}",
1754            node.sockets.gossip[0].local_addr().unwrap()
1755        );
1756        info!(
1757            "local broadcast address: {}",
1758            node.sockets
1759                .broadcast
1760                .first()
1761                .unwrap()
1762                .local_addr()
1763                .unwrap()
1764        );
1765        info!(
1766            "local repair address: {}",
1767            node.sockets.repair.local_addr().unwrap()
1768        );
1769        info!(
1770            "local retransmit address: {}",
1771            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1772        );
1773    }
1774
1775    pub fn join(self) {
1776        drop(self.bank_forks);
1777        drop(self.cluster_info);
1778
1779        self.poh_service.join().expect("poh_service");
1780        drop(self.poh_recorder);
1781
1782        if let Some(json_rpc_service) = self.json_rpc_service {
1783            json_rpc_service.join().expect("rpc_service");
1784        }
1785
1786        if let Some(pubsub_service) = self.pubsub_service {
1787            pubsub_service.join().expect("pubsub_service");
1788        }
1789
1790        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1791            rpc_completed_slots_service
1792                .join()
1793                .expect("rpc_completed_slots_service");
1794        }
1795
1796        if let Some(optimistically_confirmed_bank_tracker) =
1797            self.optimistically_confirmed_bank_tracker
1798        {
1799            optimistically_confirmed_bank_tracker
1800                .join()
1801                .expect("optimistically_confirmed_bank_tracker");
1802        }
1803
1804        if let Some(transaction_status_service) = self.transaction_status_service {
1805            transaction_status_service
1806                .join()
1807                .expect("transaction_status_service");
1808        }
1809
1810        if let Some(system_monitor_service) = self.system_monitor_service {
1811            system_monitor_service
1812                .join()
1813                .expect("system_monitor_service");
1814        }
1815
1816        if let Some(sample_performance_service) = self.sample_performance_service {
1817            sample_performance_service
1818                .join()
1819                .expect("sample_performance_service");
1820        }
1821
1822        if let Some(entry_notifier_service) = self.entry_notifier_service {
1823            entry_notifier_service
1824                .join()
1825                .expect("entry_notifier_service");
1826        }
1827
1828        if let Some(s) = self.snapshot_packager_service {
1829            s.join().expect("snapshot_packager_service");
1830        }
1831
1832        self.gossip_service.join().expect("gossip_service");
1833        self.repair_quic_endpoints
1834            .iter()
1835            .flatten()
1836            .for_each(repair::quic_endpoint::close_quic_endpoint);
1837        self.serve_repair_service
1838            .join()
1839            .expect("serve_repair_service");
1840        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1841            self.repair_quic_endpoints_runtime
1842                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1843                .transpose()
1844                .unwrap();
1845        }
1846        self.stats_reporter_service
1847            .join()
1848            .expect("stats_reporter_service");
1849        self.blockstore_metric_report_service
1850            .join()
1851            .expect("ledger_metric_report_service");
1852        self.accounts_background_service
1853            .join()
1854            .expect("accounts_background_service");
1855        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1856            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1857        }
1858        if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1859            xdp_retransmitter.join().expect("xdp_retransmitter");
1860        }
1861        self.tpu.join().expect("tpu");
1862        self.tvu.join().expect("tvu");
1863        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1864            self.turbine_quic_endpoint_runtime
1865                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1866                .transpose()
1867                .unwrap();
1868        }
1869        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1870            completed_data_sets_service
1871                .join()
1872                .expect("completed_data_sets_service");
1873        }
1874        if let Some(ip_echo_server) = self.ip_echo_server {
1875            ip_echo_server.shutdown_background();
1876        }
1877
1878        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1879            geyser_plugin_service.join().expect("geyser_plugin_service");
1880        }
1881    }
1882}
1883
1884fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1885    if let Some(account) = &bank.get_account(vote_account) {
1886        if let Some(vote_state) = vote_state::from(account) {
1887            return !vote_state.votes.is_empty();
1888        }
1889    }
1890    false
1891}
1892
1893fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1894    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1895        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1896        return Ok(());
1897    };
1898
1899    let ticks_per_slot = bank.ticks_per_slot();
1900    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1901    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1902
1903    let hash_time = compute_hash_time(hash_samples);
1904    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1905
1906    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1907    let target_hashes_per_second =
1908        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1909
1910    info!(
1911        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1912         second {target_hashes_per_second}"
1913    );
1914    if my_hashes_per_second < target_hashes_per_second {
1915        return Err(ValidatorError::PohTooSlow {
1916            mine: my_hashes_per_second,
1917            target: target_hashes_per_second,
1918        });
1919    }
1920
1921    Ok(())
1922}
1923
1924fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1925    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
1926    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1927        if wait_slot_for_supermajority == root_slot {
1928            return Some(wait_slot_for_supermajority);
1929        }
1930    }
1931
1932    None
1933}
1934
1935fn post_process_restored_tower(
1936    restored_tower: crate::consensus::Result<Tower>,
1937    validator_identity: &Pubkey,
1938    vote_account: &Pubkey,
1939    config: &ValidatorConfig,
1940    bank_forks: &BankForks,
1941) -> Result<Tower, String> {
1942    let mut should_require_tower = config.require_tower;
1943
1944    let restored_tower = restored_tower.and_then(|tower| {
1945        let root_bank = bank_forks.root_bank();
1946        let slot_history = root_bank.get_slot_history();
1947        // make sure tower isn't corrupted first before the following hard fork check
1948        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1949
1950        if let Some(hard_fork_restart_slot) =
1951            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1952        {
1953            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
1954            // out-of-chain vote state doesn't make sense at all
1955            // what if --wait-for-supermajority again if the validator restarted?
1956            let message =
1957                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1958            datapoint_error!("tower_error", ("error", message, String),);
1959            error!("{message}");
1960
1961            // unconditionally relax tower requirement so that we can always restore tower
1962            // from root bank.
1963            should_require_tower = false;
1964            return Err(crate::consensus::TowerError::HardFork(
1965                hard_fork_restart_slot,
1966            ));
1967        }
1968
1969        if let Some(warp_slot) = config.warp_slot {
1970            // unconditionally relax tower requirement so that we can always restore tower
1971            // from root bank after the warp
1972            should_require_tower = false;
1973            return Err(crate::consensus::TowerError::HardFork(warp_slot));
1974        }
1975
1976        tower
1977    });
1978
1979    let restored_tower = match restored_tower {
1980        Ok(tower) => tower,
1981        Err(err) => {
1982            let voting_has_been_active =
1983                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1984            if !err.is_file_missing() {
1985                datapoint_error!(
1986                    "tower_error",
1987                    ("error", format!("Unable to restore tower: {err}"), String),
1988                );
1989            }
1990            if should_require_tower && voting_has_been_active {
1991                return Err(format!(
1992                    "Requested mandatory tower restore failed: {err}. And there is an existing \
1993                     vote_account containing actual votes. Aborting due to possible conflicting \
1994                     duplicate votes"
1995                ));
1996            }
1997            if err.is_file_missing() && !voting_has_been_active {
1998                // Currently, don't protect against spoofed snapshots with no tower at all
1999                info!(
2000                    "Ignoring expected failed tower restore because this is the initial validator \
2001                     start with the vote account..."
2002                );
2003            } else {
2004                error!(
2005                    "Rebuilding a new tower from the latest vote account due to failed tower \
2006                     restore: {err}"
2007                );
2008            }
2009
2010            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2011        }
2012    };
2013
2014    Ok(restored_tower)
2015}
2016
2017fn load_genesis(
2018    config: &ValidatorConfig,
2019    ledger_path: &Path,
2020) -> Result<GenesisConfig, ValidatorError> {
2021    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2022        .map_err(ValidatorError::OpenGenesisConfig)?;
2023
2024    // This needs to be limited otherwise the state in the VoteAccount data
2025    // grows too large
2026    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2027    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2028    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2029    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2030
2031    let genesis_hash = genesis_config.hash();
2032    info!("genesis hash: {genesis_hash}");
2033
2034    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2035        if genesis_hash != expected_genesis_hash {
2036            return Err(ValidatorError::GenesisHashMismatch(
2037                genesis_hash,
2038                expected_genesis_hash,
2039            ));
2040        }
2041    }
2042
2043    Ok(genesis_config)
2044}
2045
2046#[allow(clippy::type_complexity)]
2047fn load_blockstore(
2048    config: &ValidatorConfig,
2049    ledger_path: &Path,
2050    genesis_config: &GenesisConfig,
2051    exit: Arc<AtomicBool>,
2052    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2053    accounts_update_notifier: Option<AccountsUpdateNotifier>,
2054    transaction_notifier: Option<TransactionNotifierArc>,
2055    entry_notifier: Option<EntryNotifierArc>,
2056    dependency_tracker: Option<Arc<DependencyTracker>>,
2057) -> Result<
2058    (
2059        Arc<RwLock<BankForks>>,
2060        Arc<Blockstore>,
2061        Slot,
2062        Receiver<bool>,
2063        LeaderScheduleCache,
2064        Option<StartingSnapshotHashes>,
2065        TransactionHistoryServices,
2066        blockstore_processor::ProcessOptions,
2067        BlockstoreRootScan,
2068        DroppedSlotsReceiver,
2069        Option<EntryNotifierService>,
2070    ),
2071    String,
2072> {
2073    info!("loading ledger from {ledger_path:?}...");
2074    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2075
2076    let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2077        .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2078
2079    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2080    blockstore.add_new_shred_signal(ledger_signal_sender);
2081
2082    // following boot sequence (esp BankForks) could set root. so stash the original value
2083    // of blockstore root away here as soon as possible.
2084    let original_blockstore_root = blockstore.max_root();
2085
2086    let blockstore = Arc::new(blockstore);
2087    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2088    let halt_at_slot = config
2089        .halt_at_slot
2090        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2091
2092    let process_options = blockstore_processor::ProcessOptions {
2093        run_verification: config.run_verification,
2094        halt_at_slot,
2095        new_hard_forks: config.new_hard_forks.clone(),
2096        debug_keys: config.debug_keys.clone(),
2097        accounts_db_config: config.accounts_db_config.clone(),
2098        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2099        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2100        runtime_config: config.runtime_config.clone(),
2101        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2102        ..blockstore_processor::ProcessOptions::default()
2103    };
2104
2105    let enable_rpc_transaction_history =
2106        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2107    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2108    let transaction_history_services =
2109        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2110            initialize_rpc_transaction_history_services(
2111                blockstore.clone(),
2112                exit.clone(),
2113                enable_rpc_transaction_history,
2114                config.rpc_config.enable_extended_tx_metadata_storage,
2115                transaction_notifier,
2116                dependency_tracker,
2117            )
2118        } else {
2119            TransactionHistoryServices::default()
2120        };
2121
2122    let entry_notifier_service = entry_notifier
2123        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2124
2125    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2126        bank_forks_utils::load_bank_forks(
2127            genesis_config,
2128            &blockstore,
2129            config.account_paths.clone(),
2130            &config.snapshot_config,
2131            &process_options,
2132            transaction_history_services
2133                .transaction_status_sender
2134                .as_ref(),
2135            entry_notifier_service
2136                .as_ref()
2137                .map(|service| service.sender()),
2138            accounts_update_notifier,
2139            exit,
2140        )
2141        .map_err(|err| err.to_string())?;
2142
2143    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2144    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2145    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2146    // cache flush, clean, shrink, as long as the same thread performing those activities also
2147    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2148    let pruned_banks_receiver =
2149        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2150
2151    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2152
2153    Ok((
2154        bank_forks,
2155        blockstore,
2156        original_blockstore_root,
2157        ledger_signal_receiver,
2158        leader_schedule_cache,
2159        starting_snapshot_hashes,
2160        transaction_history_services,
2161        process_options,
2162        blockstore_root_scan,
2163        pruned_banks_receiver,
2164        entry_notifier_service,
2165    ))
2166}
2167
2168pub struct ProcessBlockStore<'a> {
2169    id: &'a Pubkey,
2170    vote_account: &'a Pubkey,
2171    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2172    blockstore: &'a Blockstore,
2173    original_blockstore_root: Slot,
2174    bank_forks: &'a Arc<RwLock<BankForks>>,
2175    leader_schedule_cache: &'a LeaderScheduleCache,
2176    process_options: &'a blockstore_processor::ProcessOptions,
2177    transaction_status_sender: Option<&'a TransactionStatusSender>,
2178    entry_notification_sender: Option<&'a EntryNotifierSender>,
2179    blockstore_root_scan: Option<BlockstoreRootScan>,
2180    snapshot_controller: &'a SnapshotController,
2181    config: &'a ValidatorConfig,
2182    tower: Option<Tower>,
2183}
2184
2185impl<'a> ProcessBlockStore<'a> {
2186    #[allow(clippy::too_many_arguments)]
2187    fn new(
2188        id: &'a Pubkey,
2189        vote_account: &'a Pubkey,
2190        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2191        blockstore: &'a Blockstore,
2192        original_blockstore_root: Slot,
2193        bank_forks: &'a Arc<RwLock<BankForks>>,
2194        leader_schedule_cache: &'a LeaderScheduleCache,
2195        process_options: &'a blockstore_processor::ProcessOptions,
2196        transaction_status_sender: Option<&'a TransactionStatusSender>,
2197        entry_notification_sender: Option<&'a EntryNotifierSender>,
2198        blockstore_root_scan: BlockstoreRootScan,
2199        snapshot_controller: &'a SnapshotController,
2200        config: &'a ValidatorConfig,
2201    ) -> Self {
2202        Self {
2203            id,
2204            vote_account,
2205            start_progress,
2206            blockstore,
2207            original_blockstore_root,
2208            bank_forks,
2209            leader_schedule_cache,
2210            process_options,
2211            transaction_status_sender,
2212            entry_notification_sender,
2213            blockstore_root_scan: Some(blockstore_root_scan),
2214            snapshot_controller,
2215            config,
2216            tower: None,
2217        }
2218    }
2219
2220    pub(crate) fn process(&mut self) -> Result<(), String> {
2221        if self.tower.is_none() {
2222            let previous_start_process = *self.start_progress.read().unwrap();
2223            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2224
2225            let exit = Arc::new(AtomicBool::new(false));
2226            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2227                let bank_forks = self.bank_forks.clone();
2228                let exit = exit.clone();
2229                let start_progress = self.start_progress.clone();
2230
2231                let _ = Builder::new()
2232                    .name("solRptLdgrStat".to_string())
2233                    .spawn(move || {
2234                        while !exit.load(Ordering::Relaxed) {
2235                            let slot = bank_forks.read().unwrap().working_bank().slot();
2236                            *start_progress.write().unwrap() =
2237                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2238                            sleep(Duration::from_secs(2));
2239                        }
2240                    })
2241                    .unwrap();
2242            }
2243            blockstore_processor::process_blockstore_from_root(
2244                self.blockstore,
2245                self.bank_forks,
2246                self.leader_schedule_cache,
2247                self.process_options,
2248                self.transaction_status_sender,
2249                self.entry_notification_sender,
2250                Some(self.snapshot_controller),
2251            )
2252            .map_err(|err| {
2253                exit.store(true, Ordering::Relaxed);
2254                format!("Failed to load ledger: {err:?}")
2255            })?;
2256            exit.store(true, Ordering::Relaxed);
2257
2258            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2259                blockstore_root_scan.join();
2260            }
2261
2262            self.tower = Some({
2263                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2264                if let Ok(tower) = &restored_tower {
2265                    // reconciliation attempt 1 of 2 with tower
2266                    reconcile_blockstore_roots_with_external_source(
2267                        ExternalRootSource::Tower(tower.root()),
2268                        self.blockstore,
2269                        &mut self.original_blockstore_root,
2270                    )
2271                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2272                }
2273
2274                post_process_restored_tower(
2275                    restored_tower,
2276                    self.id,
2277                    self.vote_account,
2278                    self.config,
2279                    &self.bank_forks.read().unwrap(),
2280                )?
2281            });
2282
2283            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2284                self.config,
2285                self.bank_forks.read().unwrap().root(),
2286            ) {
2287                // reconciliation attempt 2 of 2 with hard fork
2288                // this should be #2 because hard fork root > tower root in almost all cases
2289                reconcile_blockstore_roots_with_external_source(
2290                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2291                    self.blockstore,
2292                    &mut self.original_blockstore_root,
2293                )
2294                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2295            }
2296
2297            *self.start_progress.write().unwrap() = previous_start_process;
2298        }
2299        Ok(())
2300    }
2301
2302    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2303        self.process()?;
2304        Ok(self.tower.unwrap())
2305    }
2306}
2307
2308fn maybe_warp_slot(
2309    config: &ValidatorConfig,
2310    process_blockstore: &mut ProcessBlockStore,
2311    ledger_path: &Path,
2312    bank_forks: &RwLock<BankForks>,
2313    leader_schedule_cache: &LeaderScheduleCache,
2314    snapshot_controller: &SnapshotController,
2315) -> Result<(), String> {
2316    if let Some(warp_slot) = config.warp_slot {
2317        let mut bank_forks = bank_forks.write().unwrap();
2318
2319        let working_bank = bank_forks.working_bank();
2320
2321        if warp_slot <= working_bank.slot() {
2322            return Err(format!(
2323                "warp slot ({}) cannot be less than the working bank slot ({})",
2324                warp_slot,
2325                working_bank.slot()
2326            ));
2327        }
2328        info!("warping to slot {warp_slot}");
2329
2330        let root_bank = bank_forks.root_bank();
2331
2332        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2333        // requires that the accounts cache has been flushed, which requires the parent slot to be
2334        // rooted.
2335        root_bank.squash();
2336        root_bank.force_flush_accounts_cache();
2337
2338        bank_forks.insert(Bank::warp_from_parent(
2339            root_bank,
2340            &Pubkey::default(),
2341            warp_slot,
2342        ));
2343        bank_forks
2344            .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2345            .map_err(|err| err.to_string())?;
2346        leader_schedule_cache.set_root(&bank_forks.root_bank());
2347
2348        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2349            ledger_path,
2350            &bank_forks.root_bank(),
2351            None,
2352            &config.snapshot_config.full_snapshot_archives_dir,
2353            &config.snapshot_config.incremental_snapshot_archives_dir,
2354            config.snapshot_config.archive_format,
2355        ) {
2356            Ok(archive_info) => archive_info,
2357            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2358        };
2359        info!(
2360            "created snapshot: {}",
2361            full_snapshot_archive_info.path().display()
2362        );
2363
2364        drop(bank_forks);
2365        // Process blockstore after warping bank forks to make sure tower and
2366        // bank forks are in sync.
2367        process_blockstore.process()?;
2368    }
2369    Ok(())
2370}
2371
2372/// Returns the starting slot at which the blockstore should be scanned for
2373/// shreds with an incorrect shred version, or None if the check is unnecessary
2374fn should_cleanup_blockstore_incorrect_shred_versions(
2375    config: &ValidatorConfig,
2376    blockstore: &Blockstore,
2377    root_slot: Slot,
2378    hard_forks: &HardForks,
2379) -> Result<Option<Slot>, BlockstoreError> {
2380    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2381    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2382    if maybe_cluster_restart_slot.is_some() {
2383        return Ok(Some(root_slot + 1));
2384    }
2385
2386    // If there are no hard forks, the shred version cannot have changed
2387    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2388        return Ok(None);
2389    };
2390
2391    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2392    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2393        return Ok(None);
2394    };
2395    let blockstore_min_slot = blockstore.lowest_slot();
2396    info!(
2397        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2398         latest hard fork is {latest_hard_fork}"
2399    );
2400
2401    if latest_hard_fork < blockstore_min_slot {
2402        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2403        //
2404        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2405        // the correct shred version should have been inserted since the latest hard fork
2406        //
2407        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2408        // can skip the check for this case
2409        Ok(None)
2410    } else if latest_hard_fork < blockstore_max_slot {
2411        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2412        //
2413        // This could be a case where there was a cluster restart, but this node was not part of
2414        // the supermajority that actually restarted the cluster. Rather, this node likely
2415        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2416        // chosen restart slot. We need to perform the blockstore check for this case
2417        //
2418        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2419        // fork slot. Even though this node will only replay slots after root_slot, start the check
2420        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2421        Ok(Some(latest_hard_fork + 1))
2422    } else {
2423        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2424        //
2425        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2426        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2427        //
2428        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2429        // and just restarted with a new downloaded snapshot but the old blockstore
2430        Ok(None)
2431    }
2432}
2433
2434/// Searches the blockstore for data shreds with a shred version that differs
2435/// from the passed `expected_shred_version`
2436fn scan_blockstore_for_incorrect_shred_version(
2437    blockstore: &Blockstore,
2438    start_slot: Slot,
2439    expected_shred_version: u16,
2440) -> Result<Option<u16>, BlockstoreError> {
2441    const TIMEOUT: Duration = Duration::from_secs(60);
2442    let timer = Instant::now();
2443    // Search for shreds with incompatible version in blockstore
2444    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2445
2446    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2447    for (slot, _meta) in slot_meta_iterator {
2448        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2449        for shred in &shreds {
2450            if shred.version() != expected_shred_version {
2451                return Ok(Some(shred.version()));
2452            }
2453        }
2454        if timer.elapsed() > TIMEOUT {
2455            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2456            break;
2457        }
2458    }
2459    Ok(None)
2460}
2461
2462/// If the blockstore contains any shreds with the incorrect shred version,
2463/// copy them to a backup blockstore and purge them from the actual blockstore.
2464fn cleanup_blockstore_incorrect_shred_versions(
2465    blockstore: &Blockstore,
2466    config: &ValidatorConfig,
2467    start_slot: Slot,
2468    expected_shred_version: u16,
2469) -> Result<(), BlockstoreError> {
2470    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2471        blockstore,
2472        start_slot,
2473        expected_shred_version,
2474    )?;
2475    let Some(incorrect_shred_version) = incorrect_shred_version else {
2476        info!("Only shreds with the correct version were found in the blockstore");
2477        return Ok(());
2478    };
2479
2480    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2481    let end_slot = blockstore.highest_slot()?.unwrap();
2482
2483    // Backing up the shreds that will be deleted from primary blockstore is
2484    // not critical, so swallow errors from backup blockstore operations.
2485    let backup_folder = format!(
2486        "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2487    );
2488    match Blockstore::open_with_options(
2489        &blockstore.ledger_path().join(backup_folder),
2490        config.blockstore_options.clone(),
2491    ) {
2492        Ok(backup_blockstore) => {
2493            info!("Backing up slots from {start_slot} to {end_slot}");
2494            let mut timer = Measure::start("blockstore backup");
2495
2496            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2497            let mut print_timer = Instant::now();
2498            let mut num_slots_copied = 0;
2499            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2500            for (slot, _meta) in slot_meta_iterator {
2501                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2502                let shreds = shreds.into_iter().map(Cow::Owned);
2503                let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2504                num_slots_copied += 1;
2505
2506                if print_timer.elapsed() > PRINT_INTERVAL {
2507                    info!("Backed up {num_slots_copied} slots thus far");
2508                    print_timer = Instant::now();
2509                }
2510            }
2511
2512            timer.stop();
2513            info!("Backing up slots done. {timer}");
2514        }
2515        Err(err) => {
2516            warn!("Unable to backup shreds with incorrect shred version: {err}");
2517        }
2518    }
2519
2520    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2521    let mut timer = Measure::start("blockstore purge");
2522    blockstore.purge_from_next_slots(start_slot, end_slot);
2523    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2524    timer.stop();
2525    info!("Purging slots done. {timer}");
2526
2527    Ok(())
2528}
2529
2530fn initialize_rpc_transaction_history_services(
2531    blockstore: Arc<Blockstore>,
2532    exit: Arc<AtomicBool>,
2533    enable_rpc_transaction_history: bool,
2534    enable_extended_tx_metadata_storage: bool,
2535    transaction_notifier: Option<TransactionNotifierArc>,
2536    dependency_tracker: Option<Arc<DependencyTracker>>,
2537) -> TransactionHistoryServices {
2538    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2539    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2540    let transaction_status_sender = Some(TransactionStatusSender {
2541        sender: transaction_status_sender,
2542        dependency_tracker: dependency_tracker.clone(),
2543    });
2544    let transaction_status_service = Some(TransactionStatusService::new(
2545        transaction_status_receiver,
2546        max_complete_transaction_status_slot.clone(),
2547        enable_rpc_transaction_history,
2548        transaction_notifier,
2549        blockstore.clone(),
2550        enable_extended_tx_metadata_storage,
2551        dependency_tracker,
2552        exit.clone(),
2553    ));
2554
2555    TransactionHistoryServices {
2556        transaction_status_sender,
2557        transaction_status_service,
2558        max_complete_transaction_status_slot,
2559    }
2560}
2561
2562#[derive(Error, Debug)]
2563pub enum ValidatorError {
2564    #[error("bank hash mismatch: actual={0}, expected={1}")]
2565    BankHashMismatch(Hash, Hash),
2566
2567    #[error("blockstore error: {0}")]
2568    Blockstore(#[source] BlockstoreError),
2569
2570    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2571    GenesisHashMismatch(Hash, Hash),
2572
2573    #[error(
2574        "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2575         slot={1}"
2576    )]
2577    NotEnoughLedgerData(Slot, Slot),
2578
2579    #[error("failed to open genesis: {0}")]
2580    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2581
2582    #[error("{0}")]
2583    Other(String),
2584
2585    #[error(
2586        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2587    )]
2588    PohTooSlow { mine: u64, target: u64 },
2589
2590    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2591    ShredVersionMismatch { actual: u16, expected: u16 },
2592
2593    #[error(transparent)]
2594    TraceError(#[from] TraceError),
2595
2596    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2597    WenRestartFinished,
2598}
2599
2600// Return if the validator waited on other nodes to start. In this case
2601// it should not wait for one of it's votes to land to produce blocks
2602// because if the whole network is waiting, then it will stall.
2603//
2604// Error indicates that a bad hash was encountered or another condition
2605// that is unrecoverable and the validator should exit.
2606fn wait_for_supermajority(
2607    config: &ValidatorConfig,
2608    process_blockstore: Option<&mut ProcessBlockStore>,
2609    bank_forks: &RwLock<BankForks>,
2610    cluster_info: &ClusterInfo,
2611    rpc_override_health_check: Arc<AtomicBool>,
2612    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2613) -> Result<bool, ValidatorError> {
2614    match config.wait_for_supermajority {
2615        None => Ok(false),
2616        Some(wait_for_supermajority_slot) => {
2617            if let Some(process_blockstore) = process_blockstore {
2618                process_blockstore
2619                    .process()
2620                    .map_err(ValidatorError::Other)?;
2621            }
2622
2623            let bank = bank_forks.read().unwrap().working_bank();
2624            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2625                std::cmp::Ordering::Less => return Ok(false),
2626                std::cmp::Ordering::Greater => {
2627                    return Err(ValidatorError::NotEnoughLedgerData(
2628                        bank.slot(),
2629                        wait_for_supermajority_slot,
2630                    ));
2631                }
2632                _ => {}
2633            }
2634
2635            if let Some(expected_bank_hash) = config.expected_bank_hash {
2636                if bank.hash() != expected_bank_hash {
2637                    return Err(ValidatorError::BankHashMismatch(
2638                        bank.hash(),
2639                        expected_bank_hash,
2640                    ));
2641                }
2642            }
2643
2644            for i in 1.. {
2645                let logging = i % 10 == 1;
2646                if logging {
2647                    info!(
2648                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2649                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2650                        bank.slot()
2651                    );
2652                }
2653
2654                let gossip_stake_percent =
2655                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2656
2657                *start_progress.write().unwrap() =
2658                    ValidatorStartProgress::WaitingForSupermajority {
2659                        slot: wait_for_supermajority_slot,
2660                        gossip_stake_percent,
2661                    };
2662
2663                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2664                    info!(
2665                        "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2666                         starting up now.",
2667                    );
2668                    break;
2669                }
2670                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2671                // prevent load balancers from removing the node from their list of candidates during a
2672                // manual restart.
2673                rpc_override_health_check.store(true, Ordering::Relaxed);
2674                sleep(Duration::new(1, 0));
2675            }
2676            rpc_override_health_check.store(false, Ordering::Relaxed);
2677            Ok(true)
2678        }
2679    }
2680}
2681
2682// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2683fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2684    let mut online_stake = 0;
2685    let mut wrong_shred_stake = 0;
2686    let mut wrong_shred_nodes = vec![];
2687    let mut offline_stake = 0;
2688    let mut offline_nodes = vec![];
2689
2690    let mut total_activated_stake = 0;
2691    let now = timestamp();
2692    // Nodes contact infos are saved to disk and restored on validator startup.
2693    // Staked nodes entries will not expire until an epoch after. So it
2694    // is necessary here to filter for recent entries to establish liveness.
2695    let peers: HashMap<_, _> = cluster_info
2696        .tvu_peers(ContactInfo::clone)
2697        .into_iter()
2698        .filter(|node| {
2699            let age = now.saturating_sub(node.wallclock());
2700            // Contact infos are refreshed twice during this period.
2701            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2702        })
2703        .map(|node| (*node.pubkey(), node))
2704        .collect();
2705    let my_shred_version = cluster_info.my_shred_version();
2706    let my_id = cluster_info.id();
2707
2708    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2709        let activated_stake = *activated_stake;
2710        total_activated_stake += activated_stake;
2711
2712        if activated_stake == 0 {
2713            continue;
2714        }
2715        let vote_state_node_pubkey = *vote_account.node_pubkey();
2716
2717        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2718            if peer.shred_version() == my_shred_version {
2719                trace!(
2720                    "observed {vote_state_node_pubkey} in gossip, \
2721                     (activated_stake={activated_stake})"
2722                );
2723                online_stake += activated_stake;
2724            } else {
2725                wrong_shred_stake += activated_stake;
2726                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2727            }
2728        } else if vote_state_node_pubkey == my_id {
2729            online_stake += activated_stake; // This node is online
2730        } else {
2731            offline_stake += activated_stake;
2732            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2733        }
2734    }
2735
2736    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2737    if log {
2738        info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2739
2740        if !wrong_shred_nodes.is_empty() {
2741            info!(
2742                "{:.3}% of active stake has the wrong shred version in gossip",
2743                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2744            );
2745            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2746            for (stake, identity) in wrong_shred_nodes {
2747                info!(
2748                    "    {:.3}% - {}",
2749                    (stake as f64 / total_activated_stake as f64) * 100.,
2750                    identity
2751                );
2752            }
2753        }
2754
2755        if !offline_nodes.is_empty() {
2756            info!(
2757                "{:.3}% of active stake is not visible in gossip",
2758                (offline_stake as f64 / total_activated_stake as f64) * 100.
2759            );
2760            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2761            for (stake, identity) in offline_nodes {
2762                info!(
2763                    "    {:.3}% - {}",
2764                    (stake as f64 / total_activated_stake as f64) * 100.,
2765                    identity
2766                );
2767            }
2768        }
2769    }
2770
2771    online_stake_percentage as u64
2772}
2773
2774fn cleanup_accounts_paths(config: &ValidatorConfig) {
2775    for account_path in &config.account_paths {
2776        move_and_async_delete_path_contents(account_path);
2777    }
2778    if let Some(shrink_paths) = config
2779        .accounts_db_config
2780        .as_ref()
2781        .and_then(|config| config.shrink_paths.as_ref())
2782    {
2783        for shrink_path in shrink_paths {
2784            move_and_async_delete_path_contents(shrink_path);
2785        }
2786    }
2787}
2788
2789pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2790    // if the snapshot config is configured to *not* take snapshots, then it is valid
2791    if !snapshot_config.should_generate_snapshots() {
2792        return true;
2793    }
2794
2795    let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2796        snapshot_config.full_snapshot_archive_interval
2797    else {
2798        // if we *are* generating snapshots, then the full snapshot interval cannot be disabled
2799        return false;
2800    };
2801
2802    match snapshot_config.incremental_snapshot_archive_interval {
2803        SnapshotInterval::Disabled => true,
2804        SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2805            full_snapshot_interval_slots > incremental_snapshot_interval_slots
2806        }
2807    }
2808}
2809
2810#[cfg(test)]
2811mod tests {
2812    use {
2813        super::*,
2814        crossbeam_channel::{bounded, RecvTimeoutError},
2815        solana_entry::entry,
2816        solana_genesis_config::create_genesis_config,
2817        solana_gossip::contact_info::ContactInfo,
2818        solana_ledger::{
2819            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2820            get_tmp_ledger_path_auto_delete,
2821        },
2822        solana_poh_config::PohConfig,
2823        solana_sha256_hasher::hash,
2824        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2825        std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2826    };
2827
2828    #[test]
2829    fn validator_exit() {
2830        solana_logger::setup();
2831        let leader_keypair = Keypair::new();
2832        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2833
2834        let validator_keypair = Keypair::new();
2835        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2836        let genesis_config =
2837            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2838                .genesis_config;
2839        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2840
2841        let voting_keypair = Arc::new(Keypair::new());
2842        let config = ValidatorConfig {
2843            rpc_addrs: Some((
2844                validator_node.info.rpc().unwrap(),
2845                validator_node.info.rpc_pubsub().unwrap(),
2846            )),
2847            ..ValidatorConfig::default_for_test()
2848        };
2849        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2850        let validator = Validator::new(
2851            validator_node,
2852            Arc::new(validator_keypair),
2853            &validator_ledger_path,
2854            &voting_keypair.pubkey(),
2855            Arc::new(RwLock::new(vec![voting_keypair])),
2856            vec![leader_node.info],
2857            &config,
2858            true, // should_check_duplicate_instance
2859            None, // rpc_to_plugin_manager_receiver
2860            start_progress.clone(),
2861            SocketAddrSpace::Unspecified,
2862            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2863            Arc::new(RwLock::new(None)),
2864        )
2865        .expect("assume successful validator start");
2866        assert_eq!(
2867            *start_progress.read().unwrap(),
2868            ValidatorStartProgress::Running
2869        );
2870        validator.close();
2871        remove_dir_all(validator_ledger_path).unwrap();
2872    }
2873
2874    #[test]
2875    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2876        solana_logger::setup();
2877
2878        let ledger_path = get_tmp_ledger_path_auto_delete!();
2879        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2880
2881        let mut validator_config = ValidatorConfig::default_for_test();
2882        let mut hard_forks = HardForks::default();
2883        let mut root_slot;
2884
2885        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2886        root_slot = 10;
2887        validator_config.wait_for_supermajority = Some(root_slot);
2888        assert_eq!(
2889            should_cleanup_blockstore_incorrect_shred_versions(
2890                &validator_config,
2891                &blockstore,
2892                root_slot,
2893                &hard_forks
2894            )
2895            .unwrap(),
2896            Some(root_slot + 1)
2897        );
2898
2899        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
2900        // Arguably operator error to pass a value for wait_for_supermajority in this case
2901        root_slot = 15;
2902        assert_eq!(
2903            should_cleanup_blockstore_incorrect_shred_versions(
2904                &validator_config,
2905                &blockstore,
2906                root_slot,
2907                &hard_forks
2908            )
2909            .unwrap(),
2910            None,
2911        );
2912
2913        // Emulate cluster restart at slot 10
2914        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
2915        hard_forks.register(10);
2916        assert_eq!(
2917            should_cleanup_blockstore_incorrect_shred_versions(
2918                &validator_config,
2919                &blockstore,
2920                root_slot,
2921                &hard_forks
2922            )
2923            .unwrap(),
2924            None,
2925        );
2926
2927        // Insert some shreds at newer slots than hard fork
2928        let entries = entry::create_ticks(1, 0, Hash::default());
2929        for i in 20..35 {
2930            let shreds = blockstore::entries_to_test_shreds(
2931                &entries,
2932                i,     // slot
2933                i - 1, // parent_slot
2934                true,  // is_full_slot
2935                1,     // version
2936            );
2937            blockstore.insert_shreds(shreds, None, true).unwrap();
2938        }
2939
2940        // No check as all blockstore data is newer than latest hard fork
2941        assert_eq!(
2942            should_cleanup_blockstore_incorrect_shred_versions(
2943                &validator_config,
2944                &blockstore,
2945                root_slot,
2946                &hard_forks
2947            )
2948            .unwrap(),
2949            None,
2950        );
2951
2952        // Emulate cluster restart at slot 25
2953        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
2954        root_slot = 25;
2955        hard_forks.register(root_slot);
2956        validator_config.wait_for_supermajority = Some(root_slot);
2957        assert_eq!(
2958            should_cleanup_blockstore_incorrect_shred_versions(
2959                &validator_config,
2960                &blockstore,
2961                root_slot,
2962                &hard_forks
2963            )
2964            .unwrap(),
2965            Some(root_slot + 1),
2966        );
2967        validator_config.wait_for_supermajority = None;
2968        assert_eq!(
2969            should_cleanup_blockstore_incorrect_shred_versions(
2970                &validator_config,
2971                &blockstore,
2972                root_slot,
2973                &hard_forks
2974            )
2975            .unwrap(),
2976            Some(root_slot + 1),
2977        );
2978
2979        // Do check with advanced root slot, even without wait_for_supermajority set correctly
2980        // Check starts from latest hard fork + 1
2981        root_slot = 30;
2982        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2983        assert_eq!(
2984            should_cleanup_blockstore_incorrect_shred_versions(
2985                &validator_config,
2986                &blockstore,
2987                root_slot,
2988                &hard_forks
2989            )
2990            .unwrap(),
2991            Some(latest_hard_fork + 1),
2992        );
2993
2994        // Purge blockstore up to latest hard fork
2995        // No check since all blockstore data newer than latest hard fork
2996        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2997        assert_eq!(
2998            should_cleanup_blockstore_incorrect_shred_versions(
2999                &validator_config,
3000                &blockstore,
3001                root_slot,
3002                &hard_forks
3003            )
3004            .unwrap(),
3005            None,
3006        );
3007    }
3008
3009    #[test]
3010    fn test_cleanup_blockstore_incorrect_shred_versions() {
3011        solana_logger::setup();
3012
3013        let validator_config = ValidatorConfig::default_for_test();
3014        let ledger_path = get_tmp_ledger_path_auto_delete!();
3015        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3016
3017        let entries = entry::create_ticks(1, 0, Hash::default());
3018        for i in 1..10 {
3019            let shreds = blockstore::entries_to_test_shreds(
3020                &entries,
3021                i,     // slot
3022                i - 1, // parent_slot
3023                true,  // is_full_slot
3024                1,     // version
3025            );
3026            blockstore.insert_shreds(shreds, None, true).unwrap();
3027        }
3028
3029        // this purges and compacts all slots greater than or equal to 5
3030        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3031        // assert that slots less than 5 aren't affected
3032        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3033        for i in 5..10 {
3034            assert!(blockstore
3035                .get_data_shreds_for_slot(i, 0)
3036                .unwrap()
3037                .is_empty());
3038        }
3039    }
3040
3041    #[test]
3042    fn validator_parallel_exit() {
3043        let leader_keypair = Keypair::new();
3044        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3045        let genesis_config =
3046            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3047                .genesis_config;
3048
3049        let mut ledger_paths = vec![];
3050        let mut validators: Vec<Validator> = (0..2)
3051            .map(|_| {
3052                let validator_keypair = Keypair::new();
3053                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3054                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3055                ledger_paths.push(validator_ledger_path.clone());
3056                let vote_account_keypair = Keypair::new();
3057                let config = ValidatorConfig {
3058                    rpc_addrs: Some((
3059                        validator_node.info.rpc().unwrap(),
3060                        validator_node.info.rpc_pubsub().unwrap(),
3061                    )),
3062                    ..ValidatorConfig::default_for_test()
3063                };
3064                Validator::new(
3065                    validator_node,
3066                    Arc::new(validator_keypair),
3067                    &validator_ledger_path,
3068                    &vote_account_keypair.pubkey(),
3069                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3070                    vec![leader_node.info.clone()],
3071                    &config,
3072                    true, // should_check_duplicate_instance.
3073                    None, // rpc_to_plugin_manager_receiver
3074                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3075                    SocketAddrSpace::Unspecified,
3076                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3077                    Arc::new(RwLock::new(None)),
3078                )
3079                .expect("assume successful validator start")
3080            })
3081            .collect();
3082
3083        // Each validator can exit in parallel to speed many sequential calls to join`
3084        validators.iter_mut().for_each(|v| v.exit());
3085
3086        // spawn a new thread to wait for the join of the validator
3087        let (sender, receiver) = bounded(0);
3088        let _ = thread::spawn(move || {
3089            validators.into_iter().for_each(|validator| {
3090                validator.join();
3091            });
3092            sender.send(()).unwrap();
3093        });
3094
3095        let timeout = Duration::from_secs(60);
3096        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3097            panic!("timeout for shutting down validators",);
3098        }
3099
3100        for path in ledger_paths {
3101            remove_dir_all(path).unwrap();
3102        }
3103    }
3104
3105    #[test]
3106    fn test_wait_for_supermajority() {
3107        solana_logger::setup();
3108        let node_keypair = Arc::new(Keypair::new());
3109        let cluster_info = ClusterInfo::new(
3110            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3111            node_keypair,
3112            SocketAddrSpace::Unspecified,
3113        );
3114
3115        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3116        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3117        let mut config = ValidatorConfig::default_for_test();
3118        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3119        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3120
3121        assert!(!wait_for_supermajority(
3122            &config,
3123            None,
3124            &bank_forks,
3125            &cluster_info,
3126            rpc_override_health_check.clone(),
3127            &start_progress,
3128        )
3129        .unwrap());
3130
3131        // bank=0, wait=1, should fail
3132        config.wait_for_supermajority = Some(1);
3133        assert!(matches!(
3134            wait_for_supermajority(
3135                &config,
3136                None,
3137                &bank_forks,
3138                &cluster_info,
3139                rpc_override_health_check.clone(),
3140                &start_progress,
3141            ),
3142            Err(ValidatorError::NotEnoughLedgerData(_, _)),
3143        ));
3144
3145        // bank=1, wait=0, should pass, bank is past the wait slot
3146        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3147            bank_forks.read().unwrap().root_bank(),
3148            &Pubkey::default(),
3149            1,
3150        ));
3151        config.wait_for_supermajority = Some(0);
3152        assert!(!wait_for_supermajority(
3153            &config,
3154            None,
3155            &bank_forks,
3156            &cluster_info,
3157            rpc_override_health_check.clone(),
3158            &start_progress,
3159        )
3160        .unwrap());
3161
3162        // bank=1, wait=1, equal, but bad hash provided
3163        config.wait_for_supermajority = Some(1);
3164        config.expected_bank_hash = Some(hash(&[1]));
3165        assert!(matches!(
3166            wait_for_supermajority(
3167                &config,
3168                None,
3169                &bank_forks,
3170                &cluster_info,
3171                rpc_override_health_check,
3172                &start_progress,
3173            ),
3174            Err(ValidatorError::BankHashMismatch(_, _)),
3175        ));
3176    }
3177
3178    #[test]
3179    fn test_is_snapshot_config_valid() {
3180        fn new_snapshot_config(
3181            full_snapshot_archive_interval_slots: Slot,
3182            incremental_snapshot_archive_interval_slots: Slot,
3183        ) -> SnapshotConfig {
3184            SnapshotConfig {
3185                full_snapshot_archive_interval: SnapshotInterval::Slots(
3186                    NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3187                ),
3188                incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3189                    NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3190                ),
3191                ..SnapshotConfig::default()
3192            }
3193        }
3194
3195        // default config must be valid
3196        assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3197
3198        // disabled incremental snapshot must be valid
3199        assert!(is_snapshot_config_valid(&SnapshotConfig {
3200            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3201            ..SnapshotConfig::default()
3202        }));
3203
3204        // disabled full snapshot must be invalid though (if generating snapshots)
3205        assert!(!is_snapshot_config_valid(&SnapshotConfig {
3206            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3207            ..SnapshotConfig::default()
3208        }));
3209
3210        // simple config must be valid
3211        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3212        assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3213        assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3214        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3215
3216        // config where full interval is not larger than incremental interval must be invalid
3217        assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3218        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3219        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3220
3221        // config with snapshots disabled (or load-only) must be valid
3222        assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3223        assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3224        assert!(is_snapshot_config_valid(&SnapshotConfig {
3225            full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3226            incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3227                NonZeroU64::new(41).unwrap()
3228            ),
3229            ..SnapshotConfig::new_load_only()
3230        }));
3231        assert!(is_snapshot_config_valid(&SnapshotConfig {
3232            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3233            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3234            ..SnapshotConfig::new_load_only()
3235        }));
3236    }
3237
3238    fn target_tick_duration() -> Duration {
3239        // DEFAULT_MS_PER_SLOT = 400
3240        // DEFAULT_TICKS_PER_SLOT = 64
3241        // MS_PER_TICK = 6
3242        //
3243        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3244        //
3245        // So, convert to microseconds first to avoid the integer rounding error
3246        let target_tick_duration_us =
3247            solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3248        assert_eq!(target_tick_duration_us, 6250);
3249        Duration::from_micros(target_tick_duration_us)
3250    }
3251
3252    #[test]
3253    fn test_poh_speed() {
3254        solana_logger::setup();
3255        let poh_config = PohConfig {
3256            target_tick_duration: target_tick_duration(),
3257            // make PoH rate really fast to cause the panic condition
3258            hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3259            ..PohConfig::default()
3260        };
3261        let genesis_config = GenesisConfig {
3262            poh_config,
3263            ..GenesisConfig::default()
3264        };
3265        let bank = Bank::new_for_tests(&genesis_config);
3266        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3267    }
3268
3269    #[test]
3270    fn test_poh_speed_no_hashes_per_tick() {
3271        solana_logger::setup();
3272        let poh_config = PohConfig {
3273            target_tick_duration: target_tick_duration(),
3274            hashes_per_tick: None,
3275            ..PohConfig::default()
3276        };
3277        let genesis_config = GenesisConfig {
3278            poh_config,
3279            ..GenesisConfig::default()
3280        };
3281        let bank = Bank::new_for_tests(&genesis_config);
3282        check_poh_speed(&bank, Some(10_000)).unwrap();
3283    }
3284}