solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        accounts_hash_verifier::AccountsHashVerifier,
7        admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
8        banking_trace::{self, BankingTracer, TraceError},
9        cluster_info_vote_listener::VoteTracker,
10        completed_data_sets_service::CompletedDataSetsService,
11        consensus::{
12            reconcile_blockstore_roots_with_external_source,
13            tower_storage::{NullTowerStorage, TowerStorage},
14            ExternalRootSource, Tower,
15        },
16        repair::{
17            self,
18            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19            serve_repair::ServeRepair,
20            serve_repair_service::ServeRepairService,
21        },
22        sample_performance_service::SamplePerformanceService,
23        sigverify,
24        snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService},
25        stats_reporter_service::StatsReporterService,
26        system_monitor_service::{
27            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28        },
29        tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30        tvu::{Tvu, TvuConfig, TvuSockets},
31    },
32    anyhow::{anyhow, Context, Result},
33    crossbeam_channel::{bounded, unbounded, Receiver},
34    quinn::Endpoint,
35    solana_accounts_db::{
36        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37        accounts_update_notifier_interface::AccountsUpdateNotifier,
38        hardened_unpack::{
39            open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40        },
41        utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
42    },
43    solana_client::connection_cache::{ConnectionCache, Protocol},
44    solana_clock::Slot,
45    solana_entry::poh::compute_hash_time,
46    solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
47    solana_genesis_config::{ClusterType, GenesisConfig},
48    solana_geyser_plugin_manager::{
49        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
50    },
51    solana_gossip::{
52        cluster_info::{
53            ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
54            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
55        },
56        contact_info::ContactInfo,
57        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
58        gossip_service::GossipService,
59    },
60    solana_hard_forks::HardForks,
61    solana_hash::Hash,
62    solana_keypair::Keypair,
63    solana_ledger::{
64        bank_forks_utils,
65        blockstore::{
66            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
67            MAX_REPLAY_WAKE_UP_SIGNALS,
68        },
69        blockstore_metric_report_service::BlockstoreMetricReportService,
70        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
71        blockstore_processor::{self, TransactionStatusSender},
72        entry_notifier_interface::EntryNotifierArc,
73        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
74        leader_schedule::FixedSchedule,
75        leader_schedule_cache::LeaderScheduleCache,
76        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
77    },
78    solana_measure::measure::Measure,
79    solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
80    solana_poh::{
81        poh_recorder::PohRecorder,
82        poh_service::{self, PohService},
83        transaction_recorder::TransactionRecorder,
84    },
85    solana_pubkey::Pubkey,
86    solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
87    solana_rpc::{
88        max_slots::MaxSlots,
89        optimistically_confirmed_bank_tracker::{
90            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
91            OptimisticallyConfirmedBankTracker,
92        },
93        rpc::JsonRpcConfig,
94        rpc_completed_slots_service::RpcCompletedSlotsService,
95        rpc_pubsub_service::{PubSubConfig, PubSubService},
96        rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
97        rpc_subscriptions::RpcSubscriptions,
98        transaction_notifier_interface::TransactionNotifierArc,
99        transaction_status_service::TransactionStatusService,
100    },
101    solana_runtime::{
102        accounts_background_service::{
103            AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
104            PrunedBanksRequestHandler, SnapshotRequestHandler,
105        },
106        bank::Bank,
107        bank_forks::BankForks,
108        commitment::BlockCommitmentCache,
109        prioritization_fee_cache::PrioritizationFeeCache,
110        runtime_config::RuntimeConfig,
111        snapshot_archive_info::SnapshotArchiveInfoGetter,
112        snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
113        snapshot_config::SnapshotConfig,
114        snapshot_controller::SnapshotController,
115        snapshot_hash::StartingSnapshotHashes,
116        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
117    },
118    solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
119    solana_shred_version::compute_shred_version,
120    solana_signer::Signer,
121    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
122    solana_time_utils::timestamp,
123    solana_tpu_client::tpu_client::{
124        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
125    },
126    solana_turbine::{self, broadcast_stage::BroadcastStageType, xdp::XdpConfig},
127    solana_unified_scheduler_pool::DefaultSchedulerPool,
128    solana_validator_exit::Exit,
129    solana_vote_program::vote_state,
130    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
131    std::{
132        borrow::Cow,
133        collections::{HashMap, HashSet},
134        net::SocketAddr,
135        num::NonZeroUsize,
136        path::{Path, PathBuf},
137        sync::{
138            atomic::{AtomicBool, AtomicU64, Ordering},
139            Arc, Mutex, RwLock,
140        },
141        thread::{sleep, Builder, JoinHandle},
142        time::{Duration, Instant},
143    },
144    strum::VariantNames,
145    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
146    thiserror::Error,
147    tokio::runtime::Runtime as TokioRuntime,
148    tokio_util::sync::CancellationToken,
149};
150
151const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
152const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
153// Right now since we reuse the wait for supermajority code, the
154// following threshold should always greater than or equal to
155// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
156const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
157    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
158
159#[derive(
160    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
161)]
162#[strum(serialize_all = "kebab-case")]
163pub enum BlockVerificationMethod {
164    BlockstoreProcessor,
165    #[default]
166    UnifiedScheduler,
167}
168
169impl BlockVerificationMethod {
170    pub const fn cli_names() -> &'static [&'static str] {
171        Self::VARIANTS
172    }
173
174    pub fn cli_message() -> &'static str {
175        "Switch transaction scheduling method for verifying ledger entries"
176    }
177}
178
179#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
180#[strum(serialize_all = "kebab-case")]
181pub enum BlockProductionMethod {
182    CentralScheduler,
183    #[default]
184    CentralSchedulerGreedy,
185}
186
187impl BlockProductionMethod {
188    pub const fn cli_names() -> &'static [&'static str] {
189        Self::VARIANTS
190    }
191
192    pub fn cli_message() -> &'static str {
193        "Switch transaction scheduling method for producing ledger entries"
194    }
195}
196
197#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
198#[strum(serialize_all = "kebab-case")]
199pub enum TransactionStructure {
200    #[default]
201    Sdk,
202    View,
203}
204
205impl TransactionStructure {
206    pub const fn cli_names() -> &'static [&'static str] {
207        Self::VARIANTS
208    }
209
210    pub fn cli_message() -> &'static str {
211        "Switch internal transaction structure/representation"
212    }
213}
214
215/// Configuration for the block generator invalidator for replay.
216#[derive(Clone, Debug)]
217pub struct GeneratorConfig {
218    pub accounts_path: String,
219    pub starting_keypairs: Arc<Vec<Keypair>>,
220}
221
222pub struct ValidatorConfig {
223    pub halt_at_slot: Option<Slot>,
224    pub expected_genesis_hash: Option<Hash>,
225    pub expected_bank_hash: Option<Hash>,
226    pub expected_shred_version: Option<u16>,
227    pub voting_disabled: bool,
228    pub account_paths: Vec<PathBuf>,
229    pub account_snapshot_paths: Vec<PathBuf>,
230    pub rpc_config: JsonRpcConfig,
231    /// Specifies which plugins to start up with
232    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
233    pub geyser_plugin_always_enabled: bool,
234    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
235    pub pubsub_config: PubSubConfig,
236    pub snapshot_config: SnapshotConfig,
237    pub max_ledger_shreds: Option<u64>,
238    pub blockstore_options: BlockstoreOptions,
239    pub broadcast_stage_type: BroadcastStageType,
240    pub turbine_disabled: Arc<AtomicBool>,
241    pub fixed_leader_schedule: Option<FixedSchedule>,
242    pub wait_for_supermajority: Option<Slot>,
243    pub new_hard_forks: Option<Vec<Slot>>,
244    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
245    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
246    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
247    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
248    pub max_genesis_archive_unpacked_size: u64,
249    /// Run PoH, transaction signature and other transaction verifications during blockstore
250    /// processing.
251    pub run_verification: bool,
252    pub require_tower: bool,
253    pub tower_storage: Arc<dyn TowerStorage>,
254    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
255    pub contact_debug_interval: u64,
256    pub contact_save_interval: u64,
257    pub send_transaction_service_config: SendTransactionServiceConfig,
258    pub no_poh_speed_test: bool,
259    pub no_os_memory_stats_reporting: bool,
260    pub no_os_network_stats_reporting: bool,
261    pub no_os_cpu_stats_reporting: bool,
262    pub no_os_disk_stats_reporting: bool,
263    pub poh_pinned_cpu_core: usize,
264    pub poh_hashes_per_batch: u64,
265    pub process_ledger_before_services: bool,
266    pub accounts_db_config: Option<AccountsDbConfig>,
267    pub warp_slot: Option<Slot>,
268    pub accounts_db_test_hash_calculation: bool,
269    pub accounts_db_skip_shrink: bool,
270    pub accounts_db_force_initial_clean: bool,
271    pub tpu_coalesce: Duration,
272    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
273    pub validator_exit: Arc<RwLock<Exit>>,
274    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
275    pub no_wait_for_vote_to_start_leader: bool,
276    pub wait_to_vote_slot: Option<Slot>,
277    pub runtime_config: RuntimeConfig,
278    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
279    pub block_verification_method: BlockVerificationMethod,
280    pub block_production_method: BlockProductionMethod,
281    pub transaction_struct: TransactionStructure,
282    pub enable_block_production_forwarding: bool,
283    pub generator_config: Option<GeneratorConfig>,
284    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
285    pub wen_restart_proto_path: Option<PathBuf>,
286    pub wen_restart_coordinator: Option<Pubkey>,
287    pub unified_scheduler_handler_threads: Option<usize>,
288    pub ip_echo_server_threads: NonZeroUsize,
289    pub rayon_global_threads: NonZeroUsize,
290    pub replay_forks_threads: NonZeroUsize,
291    pub replay_transactions_threads: NonZeroUsize,
292    pub tvu_shred_sigverify_threads: NonZeroUsize,
293    pub delay_leader_block_for_pending_fork: bool,
294    pub use_tpu_client_next: bool,
295    pub retransmit_xdp: Option<XdpConfig>,
296}
297
298impl Default for ValidatorConfig {
299    fn default() -> Self {
300        Self {
301            halt_at_slot: None,
302            expected_genesis_hash: None,
303            expected_bank_hash: None,
304            expected_shred_version: None,
305            voting_disabled: false,
306            max_ledger_shreds: None,
307            blockstore_options: BlockstoreOptions::default(),
308            account_paths: Vec::new(),
309            account_snapshot_paths: Vec::new(),
310            rpc_config: JsonRpcConfig::default(),
311            on_start_geyser_plugin_config_files: None,
312            geyser_plugin_always_enabled: false,
313            rpc_addrs: None,
314            pubsub_config: PubSubConfig::default(),
315            snapshot_config: SnapshotConfig::new_load_only(),
316            broadcast_stage_type: BroadcastStageType::Standard,
317            turbine_disabled: Arc::<AtomicBool>::default(),
318            fixed_leader_schedule: None,
319            wait_for_supermajority: None,
320            new_hard_forks: None,
321            known_validators: None,
322            repair_validators: None,
323            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
324            gossip_validators: None,
325            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
326            run_verification: true,
327            require_tower: false,
328            tower_storage: Arc::new(NullTowerStorage::default()),
329            debug_keys: None,
330            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
331            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
332            send_transaction_service_config: SendTransactionServiceConfig::default(),
333            no_poh_speed_test: true,
334            no_os_memory_stats_reporting: true,
335            no_os_network_stats_reporting: true,
336            no_os_cpu_stats_reporting: true,
337            no_os_disk_stats_reporting: true,
338            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
339            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
340            process_ledger_before_services: false,
341            warp_slot: None,
342            accounts_db_test_hash_calculation: false,
343            accounts_db_skip_shrink: false,
344            accounts_db_force_initial_clean: false,
345            tpu_coalesce: DEFAULT_TPU_COALESCE,
346            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
347            validator_exit: Arc::new(RwLock::new(Exit::default())),
348            validator_exit_backpressure: HashMap::default(),
349            no_wait_for_vote_to_start_leader: true,
350            accounts_db_config: None,
351            wait_to_vote_slot: None,
352            runtime_config: RuntimeConfig::default(),
353            banking_trace_dir_byte_limit: 0,
354            block_verification_method: BlockVerificationMethod::default(),
355            block_production_method: BlockProductionMethod::default(),
356            transaction_struct: TransactionStructure::default(),
357            enable_block_production_forwarding: false,
358            generator_config: None,
359            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
360            wen_restart_proto_path: None,
361            wen_restart_coordinator: None,
362            unified_scheduler_handler_threads: None,
363            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
364            rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
365            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
366            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
367            tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
368            delay_leader_block_for_pending_fork: false,
369            use_tpu_client_next: true,
370            retransmit_xdp: None,
371        }
372    }
373}
374
375impl ValidatorConfig {
376    pub fn default_for_test() -> Self {
377        let max_thread_count =
378            NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");
379
380        Self {
381            accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
382            blockstore_options: BlockstoreOptions::default_for_tests(),
383            rpc_config: JsonRpcConfig::default_for_test(),
384            block_production_method: BlockProductionMethod::default(),
385            enable_block_production_forwarding: true, // enable forwarding by default for tests
386            rayon_global_threads: max_thread_count,
387            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
388            replay_transactions_threads: max_thread_count,
389            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
390                .expect("thread count is non-zero"),
391            ..Self::default()
392        }
393    }
394
395    pub fn enable_default_rpc_block_subscribe(&mut self) {
396        let pubsub_config = PubSubConfig {
397            enable_block_subscription: true,
398            ..PubSubConfig::default()
399        };
400        let rpc_config = JsonRpcConfig {
401            enable_rpc_transaction_history: true,
402            ..JsonRpcConfig::default_for_test()
403        };
404
405        self.pubsub_config = pubsub_config;
406        self.rpc_config = rpc_config;
407    }
408}
409
410// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
411// the admin RPC channel to help them to follow the general progress of node startup without
412// having to watch log messages.
413#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
414pub enum ValidatorStartProgress {
415    Initializing, // Catch all, default state
416    SearchingForRpcService,
417    DownloadingSnapshot {
418        slot: Slot,
419        rpc_addr: SocketAddr,
420    },
421    CleaningBlockStore,
422    CleaningAccounts,
423    LoadingLedger,
424    ProcessingLedger {
425        slot: Slot,
426        max_slot: Slot,
427    },
428    StartingServices,
429    Halted, // Validator halted due to `--dev-halt-at-slot` argument
430    WaitingForSupermajority {
431        slot: Slot,
432        gossip_stake_percent: u64,
433    },
434
435    // `Running` is the terminal state once the validator fully starts and all services are
436    // operational
437    Running,
438}
439
440impl Default for ValidatorStartProgress {
441    fn default() -> Self {
442        Self::Initializing
443    }
444}
445
446struct BlockstoreRootScan {
447    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
448}
449
450impl BlockstoreRootScan {
451    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
452        let thread = if config.rpc_addrs.is_some()
453            && config.rpc_config.enable_rpc_transaction_history
454            && config.rpc_config.rpc_scan_and_fix_roots
455        {
456            Some(
457                Builder::new()
458                    .name("solBStoreRtScan".to_string())
459                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
460                    .unwrap(),
461            )
462        } else {
463            None
464        };
465        Self { thread }
466    }
467
468    fn join(self) {
469        if let Some(blockstore_root_scan) = self.thread {
470            if let Err(err) = blockstore_root_scan.join() {
471                warn!("blockstore_root_scan failed to join {:?}", err);
472            }
473        }
474    }
475}
476
477#[derive(Default)]
478struct TransactionHistoryServices {
479    transaction_status_sender: Option<TransactionStatusSender>,
480    transaction_status_service: Option<TransactionStatusService>,
481    max_complete_transaction_status_slot: Arc<AtomicU64>,
482}
483
484/// A struct easing passing Validator TPU Configurations
485pub struct ValidatorTpuConfig {
486    /// Controls if to use QUIC for sending regular TPU transaction
487    pub use_quic: bool,
488    /// Controls if to use QUIC for sending TPU votes
489    pub vote_use_quic: bool,
490    /// Controls the connection cache pool size
491    pub tpu_connection_pool_size: usize,
492    /// Controls if to enable UDP for TPU tansactions.
493    pub tpu_enable_udp: bool,
494    /// QUIC server config for regular TPU
495    pub tpu_quic_server_config: QuicServerParams,
496    /// QUIC server config for TPU forward
497    pub tpu_fwd_quic_server_config: QuicServerParams,
498    /// QUIC server config for Vote
499    pub vote_quic_server_config: QuicServerParams,
500}
501
502impl ValidatorTpuConfig {
503    /// A convenient function to build a ValidatorTpuConfig for testing with good
504    /// default.
505    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
506        let tpu_quic_server_config = QuicServerParams {
507            max_connections_per_ipaddr_per_min: 32,
508            coalesce_channel_size: 100_000, // smaller channel size for faster test
509            ..Default::default()
510        };
511
512        let tpu_fwd_quic_server_config = QuicServerParams {
513            max_connections_per_ipaddr_per_min: 32,
514            max_unstaked_connections: 0,
515            coalesce_channel_size: 100_000, // smaller channel size for faster test
516            ..Default::default()
517        };
518
519        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
520        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
521
522        ValidatorTpuConfig {
523            use_quic: DEFAULT_TPU_USE_QUIC,
524            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
525            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
526            tpu_enable_udp,
527            tpu_quic_server_config,
528            tpu_fwd_quic_server_config,
529            vote_quic_server_config,
530        }
531    }
532}
533
534pub struct Validator {
535    validator_exit: Arc<RwLock<Exit>>,
536    json_rpc_service: Option<JsonRpcService>,
537    pubsub_service: Option<PubSubService>,
538    rpc_completed_slots_service: Option<JoinHandle<()>>,
539    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
540    transaction_status_service: Option<TransactionStatusService>,
541    entry_notifier_service: Option<EntryNotifierService>,
542    system_monitor_service: Option<SystemMonitorService>,
543    sample_performance_service: Option<SamplePerformanceService>,
544    stats_reporter_service: StatsReporterService,
545    gossip_service: GossipService,
546    serve_repair_service: ServeRepairService,
547    completed_data_sets_service: Option<CompletedDataSetsService>,
548    snapshot_packager_service: Option<SnapshotPackagerService>,
549    poh_recorder: Arc<RwLock<PohRecorder>>,
550    poh_service: PohService,
551    tpu: Tpu,
552    tvu: Tvu,
553    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
554    pub cluster_info: Arc<ClusterInfo>,
555    pub bank_forks: Arc<RwLock<BankForks>>,
556    pub blockstore: Arc<Blockstore>,
557    geyser_plugin_service: Option<GeyserPluginService>,
558    blockstore_metric_report_service: BlockstoreMetricReportService,
559    accounts_background_service: AccountsBackgroundService,
560    accounts_hash_verifier: AccountsHashVerifier,
561    turbine_quic_endpoint: Option<Endpoint>,
562    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
563    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
564    repair_quic_endpoints: Option<[Endpoint; 3]>,
565    repair_quic_endpoints_runtime: Option<TokioRuntime>,
566    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
567    // This runtime is used to run the client owned by SendTransactionService.
568    // We don't wait for its JoinHandle here because ownership and shutdown
569    // are managed elsewhere. This variable is intentionally unused.
570    _tpu_client_next_runtime: Option<TokioRuntime>,
571}
572
573impl Validator {
574    #[allow(clippy::too_many_arguments)]
575    pub fn new(
576        mut node: Node,
577        identity_keypair: Arc<Keypair>,
578        ledger_path: &Path,
579        vote_account: &Pubkey,
580        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
581        cluster_entrypoints: Vec<ContactInfo>,
582        config: &ValidatorConfig,
583        should_check_duplicate_instance: bool,
584        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
585        start_progress: Arc<RwLock<ValidatorStartProgress>>,
586        socket_addr_space: SocketAddrSpace,
587        tpu_config: ValidatorTpuConfig,
588        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
589    ) -> Result<Self> {
590        let ValidatorTpuConfig {
591            use_quic,
592            vote_use_quic,
593            tpu_connection_pool_size,
594            tpu_enable_udp,
595            tpu_quic_server_config,
596            tpu_fwd_quic_server_config,
597            vote_quic_server_config,
598        } = tpu_config;
599
600        let start_time = Instant::now();
601
602        // Initialize the global rayon pool first to ensure the value in config
603        // is honored. Otherwise, some code accessing the global pool could
604        // cause it to get initialized with Rayon's default (not ours)
605        if rayon::ThreadPoolBuilder::new()
606            .thread_name(|i| format!("solRayonGlob{i:02}"))
607            .num_threads(config.rayon_global_threads.get())
608            .build_global()
609            .is_err()
610        {
611            warn!("Rayon global thread pool already initialized");
612        }
613
614        let id = identity_keypair.pubkey();
615        assert_eq!(&id, node.info.pubkey());
616
617        info!("identity pubkey: {id}");
618        info!("vote account pubkey: {vote_account}");
619
620        if !config.no_os_network_stats_reporting {
621            verify_net_stats_access().map_err(|e| {
622                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
623            })?;
624        }
625
626        let mut bank_notification_senders = Vec::new();
627
628        let exit = Arc::new(AtomicBool::new(false));
629
630        let geyser_plugin_config_files = config
631            .on_start_geyser_plugin_config_files
632            .as_ref()
633            .map(Cow::Borrowed)
634            .or_else(|| {
635                config
636                    .geyser_plugin_always_enabled
637                    .then_some(Cow::Owned(vec![]))
638            });
639        let geyser_plugin_service =
640            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
641                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
642                bank_notification_senders.push(confirmed_bank_sender);
643                let rpc_to_plugin_manager_receiver_and_exit =
644                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
645                Some(
646                    GeyserPluginService::new_with_receiver(
647                        confirmed_bank_receiver,
648                        config.geyser_plugin_always_enabled,
649                        geyser_plugin_config_files.as_ref(),
650                        rpc_to_plugin_manager_receiver_and_exit,
651                    )
652                    .map_err(|err| {
653                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
654                    })?,
655                )
656            } else {
657                None
658            };
659
660        if config.voting_disabled {
661            warn!("voting disabled");
662            authorized_voter_keypairs.write().unwrap().clear();
663        } else {
664            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
665                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
666            }
667        }
668
669        for cluster_entrypoint in &cluster_entrypoints {
670            info!("entrypoint: {:?}", cluster_entrypoint);
671        }
672
673        if solana_perf::perf_libs::api().is_some() {
674            info!("Initializing sigverify, this could take a while...");
675        } else {
676            info!("Initializing sigverify...");
677        }
678        sigverify::init();
679        info!("Initializing sigverify done.");
680
681        if !ledger_path.is_dir() {
682            return Err(anyhow!(
683                "ledger directory does not exist or is not accessible: {ledger_path:?}"
684            ));
685        }
686        let genesis_config = load_genesis(config, ledger_path)?;
687        metrics_config_sanity_check(genesis_config.cluster_type)?;
688
689        info!("Cleaning accounts paths..");
690        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
691        let mut timer = Measure::start("clean_accounts_paths");
692        cleanup_accounts_paths(config);
693        timer.stop();
694        info!("Cleaning accounts paths done. {timer}");
695
696        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
697        snapshot_utils::purge_old_bank_snapshots_at_startup(
698            &config.snapshot_config.bank_snapshots_dir,
699        );
700
701        info!("Cleaning orphaned account snapshot directories..");
702        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
703        clean_orphaned_account_snapshot_dirs(
704            &config.snapshot_config.bank_snapshots_dir,
705            &config.account_snapshot_paths,
706        )
707        .context("failed to clean orphaned account snapshot directories")?;
708        timer.stop();
709        info!("Cleaning orphaned account snapshot directories done. {timer}");
710
711        // The accounts hash cache dir was renamed, so cleanup any old dirs that exist.
712        let accounts_hash_cache_path = config
713            .accounts_db_config
714            .as_ref()
715            .and_then(|config| config.accounts_hash_cache_path.as_ref())
716            .map(PathBuf::as_path)
717            .unwrap_or(ledger_path);
718        let old_accounts_hash_cache_dirs = [
719            ledger_path.join("calculate_accounts_hash_cache"),
720            accounts_hash_cache_path.join("full"),
721            accounts_hash_cache_path.join("incremental"),
722            accounts_hash_cache_path.join("transient"),
723        ];
724        for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
725            if old_accounts_hash_cache_dir.exists() {
726                move_and_async_delete_path(old_accounts_hash_cache_dir);
727            }
728        }
729
730        // token used to cancel tpu-client-next.
731        let cancel_tpu_client_next = CancellationToken::new();
732        {
733            let exit = exit.clone();
734            config
735                .validator_exit
736                .write()
737                .unwrap()
738                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
739            let cancel_tpu_client_next = cancel_tpu_client_next.clone();
740            config
741                .validator_exit
742                .write()
743                .unwrap()
744                .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
745        }
746
747        let accounts_update_notifier = geyser_plugin_service
748            .as_ref()
749            .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
750
751        let transaction_notifier = geyser_plugin_service
752            .as_ref()
753            .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());
754
755        let entry_notifier = geyser_plugin_service
756            .as_ref()
757            .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());
758
759        let block_metadata_notifier = geyser_plugin_service
760            .as_ref()
761            .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
762
763        let slot_status_notifier = geyser_plugin_service
764            .as_ref()
765            .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());
766
767        info!(
768            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
769             entry_notifier: {}",
770            accounts_update_notifier.is_some(),
771            transaction_notifier.is_some(),
772            entry_notifier.is_some()
773        );
774
775        let system_monitor_service = Some(SystemMonitorService::new(
776            exit.clone(),
777            SystemMonitorStatsReportConfig {
778                report_os_memory_stats: !config.no_os_memory_stats_reporting,
779                report_os_network_stats: !config.no_os_network_stats_reporting,
780                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
781                report_os_disk_stats: !config.no_os_disk_stats_reporting,
782            },
783        ));
784
785        let (
786            bank_forks,
787            blockstore,
788            original_blockstore_root,
789            ledger_signal_receiver,
790            leader_schedule_cache,
791            starting_snapshot_hashes,
792            TransactionHistoryServices {
793                transaction_status_sender,
794                transaction_status_service,
795                max_complete_transaction_status_slot,
796            },
797            blockstore_process_options,
798            blockstore_root_scan,
799            pruned_banks_receiver,
800            entry_notifier_service,
801        ) = load_blockstore(
802            config,
803            ledger_path,
804            &genesis_config,
805            exit.clone(),
806            &start_progress,
807            accounts_update_notifier,
808            transaction_notifier,
809            entry_notifier,
810        )
811        .map_err(ValidatorError::Other)?;
812
813        if !config.no_poh_speed_test {
814            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
815        }
816
817        let (root_slot, hard_forks) = {
818            let root_bank = bank_forks.read().unwrap().root_bank();
819            (root_bank.slot(), root_bank.hard_forks())
820        };
821        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
822        info!(
823            "shred version: {shred_version}, hard forks: {:?}",
824            hard_forks
825        );
826
827        if let Some(expected_shred_version) = config.expected_shred_version {
828            if expected_shred_version != shred_version {
829                return Err(ValidatorError::ShredVersionMismatch {
830                    actual: shred_version,
831                    expected: expected_shred_version,
832                }
833                .into());
834            }
835        }
836
837        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
838            config,
839            &blockstore,
840            root_slot,
841            &hard_forks,
842        )? {
843            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
844            cleanup_blockstore_incorrect_shred_versions(
845                &blockstore,
846                config,
847                start_slot,
848                shred_version,
849            )?;
850        } else {
851            info!("Skipping the blockstore check for shreds with incorrect version");
852        }
853
854        node.info.set_shred_version(shred_version);
855        node.info.set_wallclock(timestamp());
856        Self::print_node_info(&node);
857
858        let mut cluster_info = ClusterInfo::new(
859            node.info.clone(),
860            identity_keypair.clone(),
861            socket_addr_space,
862        );
863        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
864        cluster_info.set_entrypoints(cluster_entrypoints);
865        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
866        let cluster_info = Arc::new(cluster_info);
867
868        assert!(is_snapshot_config_valid(&config.snapshot_config));
869
870        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
871        let snapshot_controller = Arc::new(SnapshotController::new(
872            snapshot_request_sender.clone(),
873            config.snapshot_config.clone(),
874            bank_forks.read().unwrap().root(),
875        ));
876
877        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
878        let snapshot_packager_service = if snapshot_controller
879            .snapshot_config()
880            .should_generate_snapshots()
881        {
882            let exit_backpressure = config
883                .validator_exit_backpressure
884                .get(SnapshotPackagerService::NAME)
885                .cloned();
886            let enable_gossip_push = true;
887            let snapshot_packager_service = SnapshotPackagerService::new(
888                pending_snapshot_packages.clone(),
889                starting_snapshot_hashes,
890                exit.clone(),
891                exit_backpressure,
892                cluster_info.clone(),
893                snapshot_controller.clone(),
894                enable_gossip_push,
895            );
896            Some(snapshot_packager_service)
897        } else {
898            None
899        };
900
901        let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
902        let accounts_hash_verifier = AccountsHashVerifier::new(
903            accounts_package_sender.clone(),
904            accounts_package_receiver,
905            pending_snapshot_packages,
906            exit.clone(),
907            snapshot_controller.clone(),
908        );
909        let snapshot_request_handler = SnapshotRequestHandler {
910            snapshot_controller: snapshot_controller.clone(),
911            snapshot_request_receiver,
912            accounts_package_sender,
913        };
914        let pruned_banks_request_handler = PrunedBanksRequestHandler {
915            pruned_banks_receiver,
916        };
917        let accounts_background_service = AccountsBackgroundService::new(
918            bank_forks.clone(),
919            exit.clone(),
920            AbsRequestHandlers {
921                snapshot_request_handler,
922                pruned_banks_request_handler,
923            },
924            config.accounts_db_test_hash_calculation,
925        );
926        info!(
927            "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}",
928            config.block_verification_method, config.block_production_method, config.transaction_struct
929        );
930
931        let (replay_vote_sender, replay_vote_receiver) = unbounded();
932
933        // block min prioritization fee cache should be readable by RPC, and writable by validator
934        // (by both replay stage and banking stage)
935        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
936
937        let leader_schedule_cache = Arc::new(leader_schedule_cache);
938        let startup_verification_complete;
939        let (poh_recorder, entry_receiver) = {
940            let bank = &bank_forks.read().unwrap().working_bank();
941            startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
942            PohRecorder::new_with_clear_signal(
943                bank.tick_height(),
944                bank.last_blockhash(),
945                bank.clone(),
946                None,
947                bank.ticks_per_slot(),
948                config.delay_leader_block_for_pending_fork,
949                blockstore.clone(),
950                blockstore.get_new_shred_signal(0),
951                &leader_schedule_cache,
952                &genesis_config.poh_config,
953                exit.clone(),
954            )
955        };
956        let (record_sender, record_receiver) = unbounded();
957        let transaction_recorder =
958            TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
959        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
960
961        let (banking_tracer, tracer_thread) =
962            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
963                &blockstore.banking_trace_path(),
964                exit.clone(),
965                config.banking_trace_dir_byte_limit,
966            )))?;
967        if banking_tracer.is_enabled() {
968            info!(
969                "Enabled banking trace (dir_byte_limit: {})",
970                config.banking_trace_dir_byte_limit
971            );
972        } else {
973            info!("Disabled banking trace");
974        }
975        let banking_tracer_channels = banking_tracer.create_channels(false);
976
977        match &config.block_verification_method {
978            BlockVerificationMethod::BlockstoreProcessor => {
979                info!("no scheduler pool is installed for block verification...");
980                if let Some(count) = config.unified_scheduler_handler_threads {
981                    warn!(
982                        "--unified-scheduler-handler-threads={count} is ignored because unified \
983                         scheduler isn't enabled"
984                    );
985                }
986            }
987            BlockVerificationMethod::UnifiedScheduler => {
988                let scheduler_pool = DefaultSchedulerPool::new_dyn(
989                    config.unified_scheduler_handler_threads,
990                    config.runtime_config.log_messages_bytes_limit,
991                    transaction_status_sender.clone(),
992                    Some(replay_vote_sender.clone()),
993                    prioritization_fee_cache.clone(),
994                );
995                bank_forks
996                    .write()
997                    .unwrap()
998                    .install_scheduler_pool(scheduler_pool);
999            }
1000        }
1001
1002        let entry_notification_sender = entry_notifier_service
1003            .as_ref()
1004            .map(|service| service.sender());
1005        let mut process_blockstore = ProcessBlockStore::new(
1006            &id,
1007            vote_account,
1008            &start_progress,
1009            &blockstore,
1010            original_blockstore_root,
1011            &bank_forks,
1012            &leader_schedule_cache,
1013            &blockstore_process_options,
1014            transaction_status_sender.as_ref(),
1015            entry_notification_sender,
1016            blockstore_root_scan,
1017            &snapshot_controller,
1018            config,
1019        );
1020
1021        maybe_warp_slot(
1022            config,
1023            &mut process_blockstore,
1024            ledger_path,
1025            &bank_forks,
1026            &leader_schedule_cache,
1027            &snapshot_controller,
1028        )
1029        .map_err(ValidatorError::Other)?;
1030
1031        if config.process_ledger_before_services {
1032            process_blockstore
1033                .process()
1034                .map_err(ValidatorError::Other)?;
1035        }
1036        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1037
1038        let sample_performance_service =
1039            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1040                Some(SamplePerformanceService::new(
1041                    &bank_forks,
1042                    blockstore.clone(),
1043                    exit.clone(),
1044                ))
1045            } else {
1046                None
1047            };
1048
1049        let mut block_commitment_cache = BlockCommitmentCache::default();
1050        let bank_forks_guard = bank_forks.read().unwrap();
1051        block_commitment_cache.initialize_slots(
1052            bank_forks_guard.working_bank().slot(),
1053            bank_forks_guard.root(),
1054        );
1055        drop(bank_forks_guard);
1056        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1057
1058        let optimistically_confirmed_bank =
1059            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1060
1061        let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1062            exit.clone(),
1063            max_complete_transaction_status_slot.clone(),
1064            blockstore.clone(),
1065            bank_forks.clone(),
1066            block_commitment_cache.clone(),
1067            optimistically_confirmed_bank.clone(),
1068            &config.pubsub_config,
1069            None,
1070        ));
1071
1072        let max_slots = Arc::new(MaxSlots::default());
1073
1074        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1075
1076        let mut tpu_transactions_forwards_client =
1077            Some(node.sockets.tpu_transaction_forwarding_client);
1078
1079        let connection_cache = match (config.use_tpu_client_next, use_quic) {
1080            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1081                "connection_cache_tpu_quic",
1082                tpu_connection_pool_size,
1083                Some(
1084                    tpu_transactions_forwards_client
1085                        .take()
1086                        .expect("Socket should exist."),
1087                ),
1088                Some((
1089                    &identity_keypair,
1090                    node.info
1091                        .tpu(Protocol::UDP)
1092                        .ok_or_else(|| {
1093                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1094                        })?
1095                        .ip(),
1096                )),
1097                Some((&staked_nodes, &identity_keypair.pubkey())),
1098            ))),
1099            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1100                "connection_cache_tpu_udp",
1101                tpu_connection_pool_size,
1102            ))),
1103            (true, _) => None,
1104        };
1105
1106        let vote_connection_cache = if vote_use_quic {
1107            let vote_connection_cache = ConnectionCache::new_with_client_options(
1108                "connection_cache_vote_quic",
1109                tpu_connection_pool_size,
1110                Some(node.sockets.quic_vote_client),
1111                Some((
1112                    &identity_keypair,
1113                    node.info
1114                        .tpu_vote(Protocol::QUIC)
1115                        .ok_or_else(|| {
1116                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1117                        })?
1118                        .ip(),
1119                )),
1120                Some((&staked_nodes, &identity_keypair.pubkey())),
1121            );
1122            Arc::new(vote_connection_cache)
1123        } else {
1124            Arc::new(ConnectionCache::with_udp(
1125                "connection_cache_vote_udp",
1126                tpu_connection_pool_size,
1127            ))
1128        };
1129
1130        // test-validator crate may start the validator in a tokio runtime
1131        // context which forces us to use the same runtime because a nested
1132        // runtime will cause panic at drop. Outside test-validator crate, we
1133        // always need a tokio runtime (and the respective handle) to initialize
1134        // the turbine QUIC endpoint.
1135        let current_runtime_handle = tokio::runtime::Handle::try_current();
1136        let tpu_client_next_runtime =
1137            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1138                tokio::runtime::Builder::new_multi_thread()
1139                    .enable_all()
1140                    .worker_threads(2)
1141                    .thread_name("solTpuClientRt")
1142                    .build()
1143                    .unwrap()
1144            });
1145
1146        let rpc_override_health_check =
1147            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1148        let (
1149            json_rpc_service,
1150            pubsub_service,
1151            completed_data_sets_sender,
1152            completed_data_sets_service,
1153            rpc_completed_slots_service,
1154            optimistically_confirmed_bank_tracker,
1155            bank_notification_sender,
1156        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1157            assert_eq!(
1158                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1159                node.info
1160                    .rpc_pubsub()
1161                    .map(|addr| socket_addr_space.check(&addr))
1162            );
1163            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1164            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1165                Some(Arc::new(RwLock::new(bank_notification_senders)))
1166            } else {
1167                None
1168            };
1169
1170            let client_option = if config.use_tpu_client_next {
1171                let runtime_handle = tpu_client_next_runtime
1172                    .as_ref()
1173                    .map(TokioRuntime::handle)
1174                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1175                ClientOption::TpuClientNext(
1176                    Arc::as_ref(&identity_keypair),
1177                    node.sockets.rpc_sts_client,
1178                    runtime_handle.clone(),
1179                    cancel_tpu_client_next.clone(),
1180                )
1181            } else {
1182                let Some(connection_cache) = &connection_cache else {
1183                    panic!("ConnectionCache should exist by construction.");
1184                };
1185                ClientOption::ConnectionCache(connection_cache.clone())
1186            };
1187            let rpc_svc_config = JsonRpcServiceConfig {
1188                rpc_addr,
1189                rpc_config: config.rpc_config.clone(),
1190                snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1191                bank_forks: bank_forks.clone(),
1192                block_commitment_cache: block_commitment_cache.clone(),
1193                blockstore: blockstore.clone(),
1194                cluster_info: cluster_info.clone(),
1195                poh_recorder: Some(poh_recorder.clone()),
1196                genesis_hash: genesis_config.hash(),
1197                ledger_path: ledger_path.to_path_buf(),
1198                validator_exit: config.validator_exit.clone(),
1199                exit: exit.clone(),
1200                override_health_check: rpc_override_health_check.clone(),
1201                startup_verification_complete,
1202                optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1203                send_transaction_service_config: config.send_transaction_service_config.clone(),
1204                max_slots: max_slots.clone(),
1205                leader_schedule_cache: leader_schedule_cache.clone(),
1206                max_complete_transaction_status_slot,
1207                prioritization_fee_cache: prioritization_fee_cache.clone(),
1208                client_option,
1209            };
1210            let json_rpc_service =
1211                JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1212
1213            let pubsub_service = if !config.rpc_config.full_api {
1214                None
1215            } else {
1216                let (trigger, pubsub_service) = PubSubService::new(
1217                    config.pubsub_config.clone(),
1218                    &rpc_subscriptions,
1219                    rpc_pubsub_addr,
1220                );
1221                config
1222                    .validator_exit
1223                    .write()
1224                    .unwrap()
1225                    .register_exit(Box::new(move || trigger.cancel()));
1226
1227                Some(pubsub_service)
1228            };
1229
1230            let (completed_data_sets_sender, completed_data_sets_service) =
1231                if !config.rpc_config.full_api {
1232                    (None, None)
1233                } else {
1234                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1235                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1236                    let completed_data_sets_service = CompletedDataSetsService::new(
1237                        completed_data_sets_receiver,
1238                        blockstore.clone(),
1239                        rpc_subscriptions.clone(),
1240                        exit.clone(),
1241                        max_slots.clone(),
1242                    );
1243                    (
1244                        Some(completed_data_sets_sender),
1245                        Some(completed_data_sets_service),
1246                    )
1247                };
1248
1249            let rpc_completed_slots_service =
1250                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1251                    let (completed_slots_sender, completed_slots_receiver) =
1252                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1253                    blockstore.add_completed_slots_signal(completed_slots_sender);
1254
1255                    Some(RpcCompletedSlotsService::spawn(
1256                        completed_slots_receiver,
1257                        rpc_subscriptions.clone(),
1258                        slot_status_notifier.clone(),
1259                        exit.clone(),
1260                    ))
1261                } else {
1262                    None
1263                };
1264
1265            let optimistically_confirmed_bank_tracker =
1266                Some(OptimisticallyConfirmedBankTracker::new(
1267                    bank_notification_receiver,
1268                    exit.clone(),
1269                    bank_forks.clone(),
1270                    optimistically_confirmed_bank,
1271                    rpc_subscriptions.clone(),
1272                    confirmed_bank_subscribers,
1273                    prioritization_fee_cache.clone(),
1274                ));
1275            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1276                sender: bank_notification_sender,
1277                should_send_parents: geyser_plugin_service.is_some(),
1278            });
1279            (
1280                Some(json_rpc_service),
1281                pubsub_service,
1282                completed_data_sets_sender,
1283                completed_data_sets_service,
1284                rpc_completed_slots_service,
1285                optimistically_confirmed_bank_tracker,
1286                bank_notification_sender_config,
1287            )
1288        } else {
1289            (None, None, None, None, None, None, None)
1290        };
1291
1292        if config.halt_at_slot.is_some() {
1293            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1294            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1295            block_commitment_cache
1296                .write()
1297                .unwrap()
1298                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1299
1300            // Park with the RPC service running, ready for inspection!
1301            warn!("Validator halted");
1302            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1303            std::thread::park();
1304        }
1305        let ip_echo_server = match node.sockets.ip_echo {
1306            None => None,
1307            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1308                tcp_listener,
1309                config.ip_echo_server_threads,
1310                Some(node.info.shred_version()),
1311            )),
1312        };
1313
1314        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1315
1316        let stats_reporter_service =
1317            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1318
1319        let gossip_service = GossipService::new(
1320            &cluster_info,
1321            Some(bank_forks.clone()),
1322            node.sockets.gossip,
1323            config.gossip_validators.clone(),
1324            should_check_duplicate_instance,
1325            Some(stats_reporter_sender.clone()),
1326            exit.clone(),
1327        );
1328        let serve_repair = ServeRepair::new(
1329            cluster_info.clone(),
1330            bank_forks.clone(),
1331            config.repair_whitelist.clone(),
1332        );
1333        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1334        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1335        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1336            unbounded();
1337
1338        let waited_for_supermajority = wait_for_supermajority(
1339            config,
1340            Some(&mut process_blockstore),
1341            &bank_forks,
1342            &cluster_info,
1343            rpc_override_health_check,
1344            &start_progress,
1345        )?;
1346
1347        let blockstore_metric_report_service =
1348            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1349
1350        let wait_for_vote_to_start_leader =
1351            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1352
1353        let poh_service = PohService::new(
1354            poh_recorder.clone(),
1355            &genesis_config.poh_config,
1356            exit.clone(),
1357            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1358            config.poh_pinned_cpu_core,
1359            config.poh_hashes_per_batch,
1360            record_receiver,
1361        );
1362        assert_eq!(
1363            blockstore.get_new_shred_signals_len(),
1364            1,
1365            "New shred signal for the TVU should be the same as the clear bank signal."
1366        );
1367
1368        let vote_tracker = Arc::<VoteTracker>::default();
1369
1370        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1371        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1372        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1373        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1374
1375        let entry_notification_sender = entry_notifier_service
1376            .as_ref()
1377            .map(|service| service.sender_cloned());
1378
1379        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1380            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1381            .then(|| {
1382                tokio::runtime::Builder::new_multi_thread()
1383                    .enable_all()
1384                    .thread_name("solTurbineQuic")
1385                    .build()
1386                    .unwrap()
1387            });
1388        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1389        let (
1390            turbine_quic_endpoint,
1391            turbine_quic_endpoint_sender,
1392            turbine_quic_endpoint_join_handle,
1393        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1394            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1395            (None, sender, None)
1396        } else {
1397            solana_turbine::quic_endpoint::new_quic_endpoint(
1398                turbine_quic_endpoint_runtime
1399                    .as_ref()
1400                    .map(TokioRuntime::handle)
1401                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1402                &identity_keypair,
1403                node.sockets.tvu_quic,
1404                turbine_quic_endpoint_sender,
1405                bank_forks.clone(),
1406            )
1407            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1408            .unwrap()
1409        };
1410
1411        // Repair quic endpoint.
1412        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1413            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1414            .then(|| {
1415                tokio::runtime::Builder::new_multi_thread()
1416                    .enable_all()
1417                    .thread_name("solRepairQuic")
1418                    .build()
1419                    .unwrap()
1420            });
1421        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1422            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1423                (None, RepairQuicAsyncSenders::new_dummy(), None)
1424            } else {
1425                let repair_quic_sockets = RepairQuicSockets {
1426                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1427                    repair_client_quic_socket: node.sockets.repair_quic,
1428                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1429                };
1430                let repair_quic_senders = RepairQuicSenders {
1431                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1432                    repair_response_quic_sender,
1433                    ancestor_hashes_response_quic_sender,
1434                };
1435                repair::quic_endpoint::new_quic_endpoints(
1436                    repair_quic_endpoints_runtime
1437                        .as_ref()
1438                        .map(TokioRuntime::handle)
1439                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1440                    &identity_keypair,
1441                    repair_quic_sockets,
1442                    repair_quic_senders,
1443                    bank_forks.clone(),
1444                )
1445                .map(|(endpoints, senders, join_handle)| {
1446                    (Some(endpoints), senders, Some(join_handle))
1447                })
1448                .unwrap()
1449            };
1450        let serve_repair_service = ServeRepairService::new(
1451            serve_repair,
1452            // Incoming UDP repair requests are adapted into RemoteRequest
1453            // and also sent through the same channel.
1454            repair_request_quic_sender,
1455            repair_request_quic_receiver,
1456            repair_quic_async_senders.repair_response_quic_sender,
1457            blockstore.clone(),
1458            node.sockets.serve_repair,
1459            socket_addr_space,
1460            stats_reporter_sender,
1461            exit.clone(),
1462        );
1463
1464        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1465        let wen_restart_repair_slots = if in_wen_restart {
1466            Some(Arc::new(RwLock::new(Vec::new())))
1467        } else {
1468            None
1469        };
1470        let tower = match process_blockstore.process_to_create_tower() {
1471            Ok(tower) => {
1472                info!("Tower state: {:?}", tower);
1473                tower
1474            }
1475            Err(e) => {
1476                warn!(
1477                    "Unable to retrieve tower: {:?} creating default tower....",
1478                    e
1479                );
1480                Tower::default()
1481            }
1482        };
1483        let last_vote = tower.last_vote();
1484
1485        let outstanding_repair_requests =
1486            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1487        let cluster_slots =
1488            Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1489
1490        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
1491        let connection_cache_for_warmup =
1492            if json_rpc_service.is_some() && connection_cache.is_some() {
1493                connection_cache.as_ref()
1494            } else {
1495                None
1496            };
1497
1498        let tvu = Tvu::new(
1499            vote_account,
1500            authorized_voter_keypairs,
1501            &bank_forks,
1502            &cluster_info,
1503            TvuSockets {
1504                repair: node.sockets.repair.try_clone().unwrap(),
1505                retransmit: node.sockets.retransmit_sockets,
1506                fetch: node.sockets.tvu,
1507                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1508            },
1509            blockstore.clone(),
1510            ledger_signal_receiver,
1511            &rpc_subscriptions,
1512            &poh_recorder,
1513            tower,
1514            config.tower_storage.clone(),
1515            &leader_schedule_cache,
1516            exit.clone(),
1517            block_commitment_cache,
1518            config.turbine_disabled.clone(),
1519            transaction_status_sender.clone(),
1520            entry_notification_sender.clone(),
1521            vote_tracker.clone(),
1522            retransmit_slots_sender,
1523            gossip_verified_vote_hash_receiver,
1524            verified_vote_receiver,
1525            replay_vote_sender.clone(),
1526            completed_data_sets_sender,
1527            bank_notification_sender.clone(),
1528            duplicate_confirmed_slots_receiver,
1529            TvuConfig {
1530                max_ledger_shreds: config.max_ledger_shreds,
1531                shred_version: node.info.shred_version(),
1532                repair_validators: config.repair_validators.clone(),
1533                repair_whitelist: config.repair_whitelist.clone(),
1534                wait_for_vote_to_start_leader,
1535                replay_forks_threads: config.replay_forks_threads,
1536                replay_transactions_threads: config.replay_transactions_threads,
1537                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1538                retransmit_xdp: config.retransmit_xdp.clone(),
1539            },
1540            &max_slots,
1541            block_metadata_notifier,
1542            config.wait_to_vote_slot,
1543            Some(snapshot_controller.clone()),
1544            config.runtime_config.log_messages_bytes_limit,
1545            connection_cache_for_warmup,
1546            &prioritization_fee_cache,
1547            banking_tracer.clone(),
1548            turbine_quic_endpoint_sender.clone(),
1549            turbine_quic_endpoint_receiver,
1550            repair_response_quic_receiver,
1551            repair_quic_async_senders.repair_request_quic_sender,
1552            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1553            ancestor_hashes_response_quic_receiver,
1554            outstanding_repair_requests.clone(),
1555            cluster_slots.clone(),
1556            wen_restart_repair_slots.clone(),
1557            slot_status_notifier,
1558            vote_connection_cache,
1559        )
1560        .map_err(ValidatorError::Other)?;
1561
1562        if in_wen_restart {
1563            info!("Waiting for wen_restart to finish");
1564            wait_for_wen_restart(WenRestartConfig {
1565                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1566                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1567                last_vote,
1568                blockstore: blockstore.clone(),
1569                cluster_info: cluster_info.clone(),
1570                bank_forks: bank_forks.clone(),
1571                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1572                wait_for_supermajority_threshold_percent:
1573                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1574                snapshot_controller: Some(snapshot_controller.clone()),
1575                abs_status: accounts_background_service.status().clone(),
1576                genesis_config_hash: genesis_config.hash(),
1577                exit: exit.clone(),
1578            })?;
1579            return Err(ValidatorError::WenRestartFinished.into());
1580        }
1581
1582        let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1583        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1584            ForwardingClientOption::ConnectionCache(connection_cache.clone())
1585        } else {
1586            let runtime_handle = tpu_client_next_runtime
1587                .as_ref()
1588                .map(TokioRuntime::handle)
1589                .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1590            ForwardingClientOption::TpuClientNext((
1591                Arc::as_ref(&identity_keypair),
1592                tpu_transactions_forwards_client
1593                    .take()
1594                    .expect("Socket should exist."),
1595                runtime_handle.clone(),
1596                cancel_tpu_client_next,
1597            ))
1598        };
1599        let tpu = Tpu::new_with_client(
1600            &cluster_info,
1601            &poh_recorder,
1602            transaction_recorder,
1603            entry_receiver,
1604            retransmit_slots_receiver,
1605            TpuSockets {
1606                transactions: node.sockets.tpu,
1607                transaction_forwards: node.sockets.tpu_forwards,
1608                vote: node.sockets.tpu_vote,
1609                broadcast: node.sockets.broadcast,
1610                transactions_quic: node.sockets.tpu_quic,
1611                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1612                vote_quic: node.sockets.tpu_vote_quic,
1613                vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1614            },
1615            &rpc_subscriptions,
1616            transaction_status_sender,
1617            entry_notification_sender,
1618            blockstore.clone(),
1619            &config.broadcast_stage_type,
1620            exit,
1621            node.info.shred_version(),
1622            vote_tracker,
1623            bank_forks.clone(),
1624            verified_vote_sender,
1625            gossip_verified_vote_hash_sender,
1626            replay_vote_receiver,
1627            replay_vote_sender,
1628            bank_notification_sender.map(|sender| sender.sender),
1629            config.tpu_coalesce,
1630            duplicate_confirmed_slot_sender,
1631            forwarding_tpu_client,
1632            turbine_quic_endpoint_sender,
1633            &identity_keypair,
1634            config.runtime_config.log_messages_bytes_limit,
1635            &staked_nodes,
1636            config.staked_nodes_overrides.clone(),
1637            banking_tracer_channels,
1638            tracer_thread,
1639            tpu_enable_udp,
1640            tpu_quic_server_config,
1641            tpu_fwd_quic_server_config,
1642            vote_quic_server_config,
1643            &prioritization_fee_cache,
1644            config.block_production_method.clone(),
1645            config.transaction_struct.clone(),
1646            config.enable_block_production_forwarding,
1647            config.generator_config.clone(),
1648            key_notifiers.clone(),
1649        );
1650
1651        datapoint_info!(
1652            "validator-new",
1653            ("id", id.to_string(), String),
1654            ("version", solana_version::version!(), String),
1655            ("cluster_type", genesis_config.cluster_type as u32, i64),
1656            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1657            ("waited_for_supermajority", waited_for_supermajority, bool),
1658            ("shred_version", shred_version as i64, i64),
1659        );
1660
1661        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1662        if config.use_tpu_client_next {
1663            if let Some(json_rpc_service) = &json_rpc_service {
1664                key_notifiers.write().unwrap().add(
1665                    KeyUpdaterType::RpcService,
1666                    json_rpc_service.get_client_key_updater(),
1667                );
1668            }
1669            // note, that we don't need to add ConnectionClient to key_notifiers
1670            // because it is added inside Tpu.
1671        }
1672
1673        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1674            bank_forks: bank_forks.clone(),
1675            cluster_info: cluster_info.clone(),
1676            vote_account: *vote_account,
1677            repair_whitelist: config.repair_whitelist.clone(),
1678            notifies: key_notifiers,
1679            repair_socket: Arc::new(node.sockets.repair),
1680            outstanding_repair_requests,
1681            cluster_slots,
1682        });
1683
1684        Ok(Self {
1685            stats_reporter_service,
1686            gossip_service,
1687            serve_repair_service,
1688            json_rpc_service,
1689            pubsub_service,
1690            rpc_completed_slots_service,
1691            optimistically_confirmed_bank_tracker,
1692            transaction_status_service,
1693            entry_notifier_service,
1694            system_monitor_service,
1695            sample_performance_service,
1696            snapshot_packager_service,
1697            completed_data_sets_service,
1698            tpu,
1699            tvu,
1700            poh_service,
1701            poh_recorder,
1702            ip_echo_server,
1703            validator_exit: config.validator_exit.clone(),
1704            cluster_info,
1705            bank_forks,
1706            blockstore,
1707            geyser_plugin_service,
1708            blockstore_metric_report_service,
1709            accounts_background_service,
1710            accounts_hash_verifier,
1711            turbine_quic_endpoint,
1712            turbine_quic_endpoint_runtime,
1713            turbine_quic_endpoint_join_handle,
1714            repair_quic_endpoints,
1715            repair_quic_endpoints_runtime,
1716            repair_quic_endpoints_join_handle,
1717            _tpu_client_next_runtime: tpu_client_next_runtime,
1718        })
1719    }
1720
1721    // Used for notifying many nodes in parallel to exit
1722    pub fn exit(&mut self) {
1723        self.validator_exit.write().unwrap().exit();
1724
1725        // drop all signals in blockstore
1726        self.blockstore.drop_signal();
1727    }
1728
1729    pub fn close(mut self) {
1730        self.exit();
1731        self.join();
1732    }
1733
1734    fn print_node_info(node: &Node) {
1735        info!("{:?}", node.info);
1736        info!(
1737            "local gossip address: {}",
1738            node.sockets.gossip.local_addr().unwrap()
1739        );
1740        info!(
1741            "local broadcast address: {}",
1742            node.sockets
1743                .broadcast
1744                .first()
1745                .unwrap()
1746                .local_addr()
1747                .unwrap()
1748        );
1749        info!(
1750            "local repair address: {}",
1751            node.sockets.repair.local_addr().unwrap()
1752        );
1753        info!(
1754            "local retransmit address: {}",
1755            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1756        );
1757    }
1758
1759    pub fn join(self) {
1760        drop(self.bank_forks);
1761        drop(self.cluster_info);
1762
1763        self.poh_service.join().expect("poh_service");
1764        drop(self.poh_recorder);
1765
1766        if let Some(json_rpc_service) = self.json_rpc_service {
1767            json_rpc_service.join().expect("rpc_service");
1768        }
1769
1770        if let Some(pubsub_service) = self.pubsub_service {
1771            pubsub_service.join().expect("pubsub_service");
1772        }
1773
1774        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1775            rpc_completed_slots_service
1776                .join()
1777                .expect("rpc_completed_slots_service");
1778        }
1779
1780        if let Some(optimistically_confirmed_bank_tracker) =
1781            self.optimistically_confirmed_bank_tracker
1782        {
1783            optimistically_confirmed_bank_tracker
1784                .join()
1785                .expect("optimistically_confirmed_bank_tracker");
1786        }
1787
1788        if let Some(transaction_status_service) = self.transaction_status_service {
1789            transaction_status_service
1790                .join()
1791                .expect("transaction_status_service");
1792        }
1793
1794        if let Some(system_monitor_service) = self.system_monitor_service {
1795            system_monitor_service
1796                .join()
1797                .expect("system_monitor_service");
1798        }
1799
1800        if let Some(sample_performance_service) = self.sample_performance_service {
1801            sample_performance_service
1802                .join()
1803                .expect("sample_performance_service");
1804        }
1805
1806        if let Some(entry_notifier_service) = self.entry_notifier_service {
1807            entry_notifier_service
1808                .join()
1809                .expect("entry_notifier_service");
1810        }
1811
1812        if let Some(s) = self.snapshot_packager_service {
1813            s.join().expect("snapshot_packager_service");
1814        }
1815
1816        self.gossip_service.join().expect("gossip_service");
1817        self.repair_quic_endpoints
1818            .iter()
1819            .flatten()
1820            .for_each(repair::quic_endpoint::close_quic_endpoint);
1821        self.serve_repair_service
1822            .join()
1823            .expect("serve_repair_service");
1824        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1825            self.repair_quic_endpoints_runtime
1826                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1827                .transpose()
1828                .unwrap();
1829        }
1830        self.stats_reporter_service
1831            .join()
1832            .expect("stats_reporter_service");
1833        self.blockstore_metric_report_service
1834            .join()
1835            .expect("ledger_metric_report_service");
1836        self.accounts_background_service
1837            .join()
1838            .expect("accounts_background_service");
1839        self.accounts_hash_verifier
1840            .join()
1841            .expect("accounts_hash_verifier");
1842        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1843            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1844        }
1845        self.tpu.join().expect("tpu");
1846        self.tvu.join().expect("tvu");
1847        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1848            self.turbine_quic_endpoint_runtime
1849                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1850                .transpose()
1851                .unwrap();
1852        }
1853        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1854            completed_data_sets_service
1855                .join()
1856                .expect("completed_data_sets_service");
1857        }
1858        if let Some(ip_echo_server) = self.ip_echo_server {
1859            ip_echo_server.shutdown_background();
1860        }
1861
1862        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1863            geyser_plugin_service.join().expect("geyser_plugin_service");
1864        }
1865    }
1866}
1867
1868fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1869    if let Some(account) = &bank.get_account(vote_account) {
1870        if let Some(vote_state) = vote_state::from(account) {
1871            return !vote_state.votes.is_empty();
1872        }
1873    }
1874    false
1875}
1876
1877fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1878    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1879        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1880        return Ok(());
1881    };
1882
1883    let ticks_per_slot = bank.ticks_per_slot();
1884    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1885    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1886
1887    let hash_time = compute_hash_time(hash_samples);
1888    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1889
1890    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1891    let target_hashes_per_second =
1892        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1893
1894    info!(
1895        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1896         second {target_hashes_per_second}"
1897    );
1898    if my_hashes_per_second < target_hashes_per_second {
1899        return Err(ValidatorError::PohTooSlow {
1900            mine: my_hashes_per_second,
1901            target: target_hashes_per_second,
1902        });
1903    }
1904
1905    Ok(())
1906}
1907
1908fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1909    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
1910    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1911        if wait_slot_for_supermajority == root_slot {
1912            return Some(wait_slot_for_supermajority);
1913        }
1914    }
1915
1916    None
1917}
1918
1919fn post_process_restored_tower(
1920    restored_tower: crate::consensus::Result<Tower>,
1921    validator_identity: &Pubkey,
1922    vote_account: &Pubkey,
1923    config: &ValidatorConfig,
1924    bank_forks: &BankForks,
1925) -> Result<Tower, String> {
1926    let mut should_require_tower = config.require_tower;
1927
1928    let restored_tower = restored_tower.and_then(|tower| {
1929        let root_bank = bank_forks.root_bank();
1930        let slot_history = root_bank.get_slot_history();
1931        // make sure tower isn't corrupted first before the following hard fork check
1932        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1933
1934        if let Some(hard_fork_restart_slot) =
1935            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1936        {
1937            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
1938            // out-of-chain vote state doesn't make sense at all
1939            // what if --wait-for-supermajority again if the validator restarted?
1940            let message =
1941                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1942            datapoint_error!("tower_error", ("error", message, String),);
1943            error!("{}", message);
1944
1945            // unconditionally relax tower requirement so that we can always restore tower
1946            // from root bank.
1947            should_require_tower = false;
1948            return Err(crate::consensus::TowerError::HardFork(
1949                hard_fork_restart_slot,
1950            ));
1951        }
1952
1953        if let Some(warp_slot) = config.warp_slot {
1954            // unconditionally relax tower requirement so that we can always restore tower
1955            // from root bank after the warp
1956            should_require_tower = false;
1957            return Err(crate::consensus::TowerError::HardFork(warp_slot));
1958        }
1959
1960        tower
1961    });
1962
1963    let restored_tower = match restored_tower {
1964        Ok(tower) => tower,
1965        Err(err) => {
1966            let voting_has_been_active =
1967                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1968            if !err.is_file_missing() {
1969                datapoint_error!(
1970                    "tower_error",
1971                    ("error", format!("Unable to restore tower: {err}"), String),
1972                );
1973            }
1974            if should_require_tower && voting_has_been_active {
1975                return Err(format!(
1976                    "Requested mandatory tower restore failed: {err}. And there is an existing \
1977                     vote_account containing actual votes. Aborting due to possible conflicting \
1978                     duplicate votes"
1979                ));
1980            }
1981            if err.is_file_missing() && !voting_has_been_active {
1982                // Currently, don't protect against spoofed snapshots with no tower at all
1983                info!(
1984                    "Ignoring expected failed tower restore because this is the initial validator \
1985                     start with the vote account..."
1986                );
1987            } else {
1988                error!(
1989                    "Rebuilding a new tower from the latest vote account due to failed tower \
1990                     restore: {}",
1991                    err
1992                );
1993            }
1994
1995            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
1996        }
1997    };
1998
1999    Ok(restored_tower)
2000}
2001
2002fn load_genesis(
2003    config: &ValidatorConfig,
2004    ledger_path: &Path,
2005) -> Result<GenesisConfig, ValidatorError> {
2006    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2007        .map_err(ValidatorError::OpenGenesisConfig)?;
2008
2009    // This needs to be limited otherwise the state in the VoteAccount data
2010    // grows too large
2011    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2012    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2013    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2014    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2015
2016    let genesis_hash = genesis_config.hash();
2017    info!("genesis hash: {}", genesis_hash);
2018
2019    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2020        if genesis_hash != expected_genesis_hash {
2021            return Err(ValidatorError::GenesisHashMismatch(
2022                genesis_hash,
2023                expected_genesis_hash,
2024            ));
2025        }
2026    }
2027
2028    Ok(genesis_config)
2029}
2030
2031#[allow(clippy::type_complexity)]
2032fn load_blockstore(
2033    config: &ValidatorConfig,
2034    ledger_path: &Path,
2035    genesis_config: &GenesisConfig,
2036    exit: Arc<AtomicBool>,
2037    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2038    accounts_update_notifier: Option<AccountsUpdateNotifier>,
2039    transaction_notifier: Option<TransactionNotifierArc>,
2040    entry_notifier: Option<EntryNotifierArc>,
2041) -> Result<
2042    (
2043        Arc<RwLock<BankForks>>,
2044        Arc<Blockstore>,
2045        Slot,
2046        Receiver<bool>,
2047        LeaderScheduleCache,
2048        Option<StartingSnapshotHashes>,
2049        TransactionHistoryServices,
2050        blockstore_processor::ProcessOptions,
2051        BlockstoreRootScan,
2052        DroppedSlotsReceiver,
2053        Option<EntryNotifierService>,
2054    ),
2055    String,
2056> {
2057    info!("loading ledger from {:?}...", ledger_path);
2058    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2059
2060    let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2061        .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2062
2063    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2064    blockstore.add_new_shred_signal(ledger_signal_sender);
2065
2066    // following boot sequence (esp BankForks) could set root. so stash the original value
2067    // of blockstore root away here as soon as possible.
2068    let original_blockstore_root = blockstore.max_root();
2069
2070    let blockstore = Arc::new(blockstore);
2071    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2072    let halt_at_slot = config
2073        .halt_at_slot
2074        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2075
2076    let process_options = blockstore_processor::ProcessOptions {
2077        run_verification: config.run_verification,
2078        halt_at_slot,
2079        new_hard_forks: config.new_hard_forks.clone(),
2080        debug_keys: config.debug_keys.clone(),
2081        accounts_db_config: config.accounts_db_config.clone(),
2082        accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
2083        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2084        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2085        runtime_config: config.runtime_config.clone(),
2086        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2087        ..blockstore_processor::ProcessOptions::default()
2088    };
2089
2090    let enable_rpc_transaction_history =
2091        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2092    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2093    let transaction_history_services =
2094        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2095            initialize_rpc_transaction_history_services(
2096                blockstore.clone(),
2097                exit.clone(),
2098                enable_rpc_transaction_history,
2099                config.rpc_config.enable_extended_tx_metadata_storage,
2100                transaction_notifier,
2101            )
2102        } else {
2103            TransactionHistoryServices::default()
2104        };
2105
2106    let entry_notifier_service = entry_notifier
2107        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2108
2109    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2110        bank_forks_utils::load_bank_forks(
2111            genesis_config,
2112            &blockstore,
2113            config.account_paths.clone(),
2114            &config.snapshot_config,
2115            &process_options,
2116            transaction_history_services
2117                .transaction_status_sender
2118                .as_ref(),
2119            entry_notifier_service
2120                .as_ref()
2121                .map(|service| service.sender()),
2122            accounts_update_notifier,
2123            exit,
2124        )
2125        .map_err(|err| err.to_string())?;
2126
2127    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2128    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2129    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2130    // cache flush, clean, shrink, as long as the same thread performing those activities also
2131    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2132    let pruned_banks_receiver =
2133        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2134
2135    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2136
2137    Ok((
2138        bank_forks,
2139        blockstore,
2140        original_blockstore_root,
2141        ledger_signal_receiver,
2142        leader_schedule_cache,
2143        starting_snapshot_hashes,
2144        transaction_history_services,
2145        process_options,
2146        blockstore_root_scan,
2147        pruned_banks_receiver,
2148        entry_notifier_service,
2149    ))
2150}
2151
2152pub struct ProcessBlockStore<'a> {
2153    id: &'a Pubkey,
2154    vote_account: &'a Pubkey,
2155    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2156    blockstore: &'a Blockstore,
2157    original_blockstore_root: Slot,
2158    bank_forks: &'a Arc<RwLock<BankForks>>,
2159    leader_schedule_cache: &'a LeaderScheduleCache,
2160    process_options: &'a blockstore_processor::ProcessOptions,
2161    transaction_status_sender: Option<&'a TransactionStatusSender>,
2162    entry_notification_sender: Option<&'a EntryNotifierSender>,
2163    blockstore_root_scan: Option<BlockstoreRootScan>,
2164    snapshot_controller: &'a SnapshotController,
2165    config: &'a ValidatorConfig,
2166    tower: Option<Tower>,
2167}
2168
2169impl<'a> ProcessBlockStore<'a> {
2170    #[allow(clippy::too_many_arguments)]
2171    fn new(
2172        id: &'a Pubkey,
2173        vote_account: &'a Pubkey,
2174        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2175        blockstore: &'a Blockstore,
2176        original_blockstore_root: Slot,
2177        bank_forks: &'a Arc<RwLock<BankForks>>,
2178        leader_schedule_cache: &'a LeaderScheduleCache,
2179        process_options: &'a blockstore_processor::ProcessOptions,
2180        transaction_status_sender: Option<&'a TransactionStatusSender>,
2181        entry_notification_sender: Option<&'a EntryNotifierSender>,
2182        blockstore_root_scan: BlockstoreRootScan,
2183        snapshot_controller: &'a SnapshotController,
2184        config: &'a ValidatorConfig,
2185    ) -> Self {
2186        Self {
2187            id,
2188            vote_account,
2189            start_progress,
2190            blockstore,
2191            original_blockstore_root,
2192            bank_forks,
2193            leader_schedule_cache,
2194            process_options,
2195            transaction_status_sender,
2196            entry_notification_sender,
2197            blockstore_root_scan: Some(blockstore_root_scan),
2198            snapshot_controller,
2199            config,
2200            tower: None,
2201        }
2202    }
2203
2204    pub(crate) fn process(&mut self) -> Result<(), String> {
2205        if self.tower.is_none() {
2206            let previous_start_process = *self.start_progress.read().unwrap();
2207            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2208
2209            let exit = Arc::new(AtomicBool::new(false));
2210            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2211                let bank_forks = self.bank_forks.clone();
2212                let exit = exit.clone();
2213                let start_progress = self.start_progress.clone();
2214
2215                let _ = Builder::new()
2216                    .name("solRptLdgrStat".to_string())
2217                    .spawn(move || {
2218                        while !exit.load(Ordering::Relaxed) {
2219                            let slot = bank_forks.read().unwrap().working_bank().slot();
2220                            *start_progress.write().unwrap() =
2221                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2222                            sleep(Duration::from_secs(2));
2223                        }
2224                    })
2225                    .unwrap();
2226            }
2227            blockstore_processor::process_blockstore_from_root(
2228                self.blockstore,
2229                self.bank_forks,
2230                self.leader_schedule_cache,
2231                self.process_options,
2232                self.transaction_status_sender,
2233                self.entry_notification_sender,
2234                Some(self.snapshot_controller),
2235            )
2236            .map_err(|err| {
2237                exit.store(true, Ordering::Relaxed);
2238                format!("Failed to load ledger: {err:?}")
2239            })?;
2240            exit.store(true, Ordering::Relaxed);
2241
2242            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2243                blockstore_root_scan.join();
2244            }
2245
2246            self.tower = Some({
2247                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2248                if let Ok(tower) = &restored_tower {
2249                    // reconciliation attempt 1 of 2 with tower
2250                    reconcile_blockstore_roots_with_external_source(
2251                        ExternalRootSource::Tower(tower.root()),
2252                        self.blockstore,
2253                        &mut self.original_blockstore_root,
2254                    )
2255                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2256                }
2257
2258                post_process_restored_tower(
2259                    restored_tower,
2260                    self.id,
2261                    self.vote_account,
2262                    self.config,
2263                    &self.bank_forks.read().unwrap(),
2264                )?
2265            });
2266
2267            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2268                self.config,
2269                self.bank_forks.read().unwrap().root(),
2270            ) {
2271                // reconciliation attempt 2 of 2 with hard fork
2272                // this should be #2 because hard fork root > tower root in almost all cases
2273                reconcile_blockstore_roots_with_external_source(
2274                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2275                    self.blockstore,
2276                    &mut self.original_blockstore_root,
2277                )
2278                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2279            }
2280
2281            *self.start_progress.write().unwrap() = previous_start_process;
2282        }
2283        Ok(())
2284    }
2285
2286    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2287        self.process()?;
2288        Ok(self.tower.unwrap())
2289    }
2290}
2291
2292fn maybe_warp_slot(
2293    config: &ValidatorConfig,
2294    process_blockstore: &mut ProcessBlockStore,
2295    ledger_path: &Path,
2296    bank_forks: &RwLock<BankForks>,
2297    leader_schedule_cache: &LeaderScheduleCache,
2298    snapshot_controller: &SnapshotController,
2299) -> Result<(), String> {
2300    if let Some(warp_slot) = config.warp_slot {
2301        let mut bank_forks = bank_forks.write().unwrap();
2302
2303        let working_bank = bank_forks.working_bank();
2304
2305        if warp_slot <= working_bank.slot() {
2306            return Err(format!(
2307                "warp slot ({}) cannot be less than the working bank slot ({})",
2308                warp_slot,
2309                working_bank.slot()
2310            ));
2311        }
2312        info!("warping to slot {}", warp_slot);
2313
2314        let root_bank = bank_forks.root_bank();
2315
2316        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2317        // requires that the accounts cache has been flushed, which requires the parent slot to be
2318        // rooted.
2319        root_bank.squash();
2320        root_bank.force_flush_accounts_cache();
2321
2322        bank_forks.insert(Bank::warp_from_parent(
2323            root_bank,
2324            &Pubkey::default(),
2325            warp_slot,
2326            solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
2327        ));
2328        bank_forks
2329            .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2330            .map_err(|err| err.to_string())?;
2331        leader_schedule_cache.set_root(&bank_forks.root_bank());
2332
2333        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2334            ledger_path,
2335            &bank_forks.root_bank(),
2336            None,
2337            &config.snapshot_config.full_snapshot_archives_dir,
2338            &config.snapshot_config.incremental_snapshot_archives_dir,
2339            config.snapshot_config.archive_format,
2340        ) {
2341            Ok(archive_info) => archive_info,
2342            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2343        };
2344        info!(
2345            "created snapshot: {}",
2346            full_snapshot_archive_info.path().display()
2347        );
2348
2349        drop(bank_forks);
2350        // Process blockstore after warping bank forks to make sure tower and
2351        // bank forks are in sync.
2352        process_blockstore.process()?;
2353    }
2354    Ok(())
2355}
2356
2357/// Returns the starting slot at which the blockstore should be scanned for
2358/// shreds with an incorrect shred version, or None if the check is unnecessary
2359fn should_cleanup_blockstore_incorrect_shred_versions(
2360    config: &ValidatorConfig,
2361    blockstore: &Blockstore,
2362    root_slot: Slot,
2363    hard_forks: &HardForks,
2364) -> Result<Option<Slot>, BlockstoreError> {
2365    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2366    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2367    if maybe_cluster_restart_slot.is_some() {
2368        return Ok(Some(root_slot + 1));
2369    }
2370
2371    // If there are no hard forks, the shred version cannot have changed
2372    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2373        return Ok(None);
2374    };
2375
2376    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2377    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2378        return Ok(None);
2379    };
2380    let blockstore_min_slot = blockstore.lowest_slot();
2381    info!(
2382        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2383        latest hard fork is {latest_hard_fork}"
2384    );
2385
2386    if latest_hard_fork < blockstore_min_slot {
2387        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2388        //
2389        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2390        // the correct shred version should have been inserted since the latest hard fork
2391        //
2392        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2393        // can skip the check for this case
2394        Ok(None)
2395    } else if latest_hard_fork < blockstore_max_slot {
2396        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2397        //
2398        // This could be a case where there was a cluster restart, but this node was not part of
2399        // the supermajority that actually restarted the cluster. Rather, this node likely
2400        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2401        // chosen restart slot. We need to perform the blockstore check for this case
2402        //
2403        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2404        // fork slot. Even though this node will only replay slots after root_slot, start the check
2405        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2406        Ok(Some(latest_hard_fork + 1))
2407    } else {
2408        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2409        //
2410        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2411        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2412        //
2413        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2414        // and just restarted with a new downloaded snapshot but the old blockstore
2415        Ok(None)
2416    }
2417}
2418
2419/// Searches the blockstore for data shreds with a shred version that differs
2420/// from the passed `expected_shred_version`
2421fn scan_blockstore_for_incorrect_shred_version(
2422    blockstore: &Blockstore,
2423    start_slot: Slot,
2424    expected_shred_version: u16,
2425) -> Result<Option<u16>, BlockstoreError> {
2426    const TIMEOUT: Duration = Duration::from_secs(60);
2427    let timer = Instant::now();
2428    // Search for shreds with incompatible version in blockstore
2429    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2430
2431    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2432    for (slot, _meta) in slot_meta_iterator {
2433        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2434        for shred in &shreds {
2435            if shred.version() != expected_shred_version {
2436                return Ok(Some(shred.version()));
2437            }
2438        }
2439        if timer.elapsed() > TIMEOUT {
2440            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2441            break;
2442        }
2443    }
2444    Ok(None)
2445}
2446
2447/// If the blockstore contains any shreds with the incorrect shred version,
2448/// copy them to a backup blockstore and purge them from the actual blockstore.
2449fn cleanup_blockstore_incorrect_shred_versions(
2450    blockstore: &Blockstore,
2451    config: &ValidatorConfig,
2452    start_slot: Slot,
2453    expected_shred_version: u16,
2454) -> Result<(), BlockstoreError> {
2455    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2456        blockstore,
2457        start_slot,
2458        expected_shred_version,
2459    )?;
2460    let Some(incorrect_shred_version) = incorrect_shred_version else {
2461        info!("Only shreds with the correct version were found in the blockstore");
2462        return Ok(());
2463    };
2464
2465    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2466    let end_slot = blockstore.highest_slot()?.unwrap();
2467
2468    // Backing up the shreds that will be deleted from primary blockstore is
2469    // not critical, so swallow errors from backup blockstore operations.
2470    let backup_folder = format!(
2471        "{}_backup_{}_{}_{}",
2472        BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, incorrect_shred_version, start_slot, end_slot
2473    );
2474    match Blockstore::open_with_options(
2475        &blockstore.ledger_path().join(backup_folder),
2476        config.blockstore_options.clone(),
2477    ) {
2478        Ok(backup_blockstore) => {
2479            info!("Backing up slots from {start_slot} to {end_slot}");
2480            let mut timer = Measure::start("blockstore backup");
2481
2482            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2483            let mut print_timer = Instant::now();
2484            let mut num_slots_copied = 0;
2485            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2486            for (slot, _meta) in slot_meta_iterator {
2487                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2488                let shreds = shreds.into_iter().map(Cow::Owned);
2489                let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2490                num_slots_copied += 1;
2491
2492                if print_timer.elapsed() > PRINT_INTERVAL {
2493                    info!("Backed up {num_slots_copied} slots thus far");
2494                    print_timer = Instant::now();
2495                }
2496            }
2497
2498            timer.stop();
2499            info!("Backing up slots done. {timer}");
2500        }
2501        Err(err) => {
2502            warn!("Unable to backup shreds with incorrect shred version: {err}");
2503        }
2504    }
2505
2506    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2507    let mut timer = Measure::start("blockstore purge");
2508    blockstore.purge_from_next_slots(start_slot, end_slot);
2509    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2510    timer.stop();
2511    info!("Purging slots done. {timer}");
2512
2513    Ok(())
2514}
2515
2516fn initialize_rpc_transaction_history_services(
2517    blockstore: Arc<Blockstore>,
2518    exit: Arc<AtomicBool>,
2519    enable_rpc_transaction_history: bool,
2520    enable_extended_tx_metadata_storage: bool,
2521    transaction_notifier: Option<TransactionNotifierArc>,
2522) -> TransactionHistoryServices {
2523    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2524    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2525    let transaction_status_sender = Some(TransactionStatusSender {
2526        sender: transaction_status_sender,
2527    });
2528    let transaction_status_service = Some(TransactionStatusService::new(
2529        transaction_status_receiver,
2530        max_complete_transaction_status_slot.clone(),
2531        enable_rpc_transaction_history,
2532        transaction_notifier,
2533        blockstore.clone(),
2534        enable_extended_tx_metadata_storage,
2535        exit.clone(),
2536    ));
2537
2538    TransactionHistoryServices {
2539        transaction_status_sender,
2540        transaction_status_service,
2541        max_complete_transaction_status_slot,
2542    }
2543}
2544
2545#[derive(Error, Debug)]
2546pub enum ValidatorError {
2547    #[error("bank hash mismatch: actual={0}, expected={1}")]
2548    BankHashMismatch(Hash, Hash),
2549
2550    #[error("blockstore error: {0}")]
2551    Blockstore(#[source] BlockstoreError),
2552
2553    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2554    GenesisHashMismatch(Hash, Hash),
2555
2556    #[error(
2557        "ledger does not have enough data to wait for supermajority: \
2558        current slot={0}, needed slot={1}"
2559    )]
2560    NotEnoughLedgerData(Slot, Slot),
2561
2562    #[error("failed to open genesis: {0}")]
2563    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2564
2565    #[error("{0}")]
2566    Other(String),
2567
2568    #[error(
2569        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2570    )]
2571    PohTooSlow { mine: u64, target: u64 },
2572
2573    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2574    ShredVersionMismatch { actual: u16, expected: u16 },
2575
2576    #[error(transparent)]
2577    TraceError(#[from] TraceError),
2578
2579    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2580    WenRestartFinished,
2581}
2582
2583// Return if the validator waited on other nodes to start. In this case
2584// it should not wait for one of it's votes to land to produce blocks
2585// because if the whole network is waiting, then it will stall.
2586//
2587// Error indicates that a bad hash was encountered or another condition
2588// that is unrecoverable and the validator should exit.
2589fn wait_for_supermajority(
2590    config: &ValidatorConfig,
2591    process_blockstore: Option<&mut ProcessBlockStore>,
2592    bank_forks: &RwLock<BankForks>,
2593    cluster_info: &ClusterInfo,
2594    rpc_override_health_check: Arc<AtomicBool>,
2595    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2596) -> Result<bool, ValidatorError> {
2597    match config.wait_for_supermajority {
2598        None => Ok(false),
2599        Some(wait_for_supermajority_slot) => {
2600            if let Some(process_blockstore) = process_blockstore {
2601                process_blockstore
2602                    .process()
2603                    .map_err(ValidatorError::Other)?;
2604            }
2605
2606            let bank = bank_forks.read().unwrap().working_bank();
2607            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2608                std::cmp::Ordering::Less => return Ok(false),
2609                std::cmp::Ordering::Greater => {
2610                    return Err(ValidatorError::NotEnoughLedgerData(
2611                        bank.slot(),
2612                        wait_for_supermajority_slot,
2613                    ));
2614                }
2615                _ => {}
2616            }
2617
2618            if let Some(expected_bank_hash) = config.expected_bank_hash {
2619                if bank.hash() != expected_bank_hash {
2620                    return Err(ValidatorError::BankHashMismatch(
2621                        bank.hash(),
2622                        expected_bank_hash,
2623                    ));
2624                }
2625            }
2626
2627            for i in 1.. {
2628                let logging = i % 10 == 1;
2629                if logging {
2630                    info!(
2631                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2632                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2633                        bank.slot()
2634                    );
2635                }
2636
2637                let gossip_stake_percent =
2638                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2639
2640                *start_progress.write().unwrap() =
2641                    ValidatorStartProgress::WaitingForSupermajority {
2642                        slot: wait_for_supermajority_slot,
2643                        gossip_stake_percent,
2644                    };
2645
2646                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2647                    info!(
2648                        "Supermajority reached, {}% active stake detected, starting up now.",
2649                        gossip_stake_percent,
2650                    );
2651                    break;
2652                }
2653                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2654                // prevent load balancers from removing the node from their list of candidates during a
2655                // manual restart.
2656                rpc_override_health_check.store(true, Ordering::Relaxed);
2657                sleep(Duration::new(1, 0));
2658            }
2659            rpc_override_health_check.store(false, Ordering::Relaxed);
2660            Ok(true)
2661        }
2662    }
2663}
2664
2665// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2666fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2667    let mut online_stake = 0;
2668    let mut wrong_shred_stake = 0;
2669    let mut wrong_shred_nodes = vec![];
2670    let mut offline_stake = 0;
2671    let mut offline_nodes = vec![];
2672
2673    let mut total_activated_stake = 0;
2674    let now = timestamp();
2675    // Nodes contact infos are saved to disk and restored on validator startup.
2676    // Staked nodes entries will not expire until an epoch after. So it
2677    // is necessary here to filter for recent entries to establish liveness.
2678    let peers: HashMap<_, _> = cluster_info
2679        .all_tvu_peers()
2680        .into_iter()
2681        .filter(|node| {
2682            let age = now.saturating_sub(node.wallclock());
2683            // Contact infos are refreshed twice during this period.
2684            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2685        })
2686        .map(|node| (*node.pubkey(), node))
2687        .collect();
2688    let my_shred_version = cluster_info.my_shred_version();
2689    let my_id = cluster_info.id();
2690
2691    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2692        let activated_stake = *activated_stake;
2693        total_activated_stake += activated_stake;
2694
2695        if activated_stake == 0 {
2696            continue;
2697        }
2698        let vote_state_node_pubkey = *vote_account.node_pubkey();
2699
2700        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2701            if peer.shred_version() == my_shred_version {
2702                trace!(
2703                    "observed {} in gossip, (activated_stake={})",
2704                    vote_state_node_pubkey,
2705                    activated_stake
2706                );
2707                online_stake += activated_stake;
2708            } else {
2709                wrong_shred_stake += activated_stake;
2710                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2711            }
2712        } else if vote_state_node_pubkey == my_id {
2713            online_stake += activated_stake; // This node is online
2714        } else {
2715            offline_stake += activated_stake;
2716            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2717        }
2718    }
2719
2720    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2721    if log {
2722        info!(
2723            "{:.3}% of active stake visible in gossip",
2724            online_stake_percentage
2725        );
2726
2727        if !wrong_shred_nodes.is_empty() {
2728            info!(
2729                "{:.3}% of active stake has the wrong shred version in gossip",
2730                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2731            );
2732            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2733            for (stake, identity) in wrong_shred_nodes {
2734                info!(
2735                    "    {:.3}% - {}",
2736                    (stake as f64 / total_activated_stake as f64) * 100.,
2737                    identity
2738                );
2739            }
2740        }
2741
2742        if !offline_nodes.is_empty() {
2743            info!(
2744                "{:.3}% of active stake is not visible in gossip",
2745                (offline_stake as f64 / total_activated_stake as f64) * 100.
2746            );
2747            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2748            for (stake, identity) in offline_nodes {
2749                info!(
2750                    "    {:.3}% - {}",
2751                    (stake as f64 / total_activated_stake as f64) * 100.,
2752                    identity
2753                );
2754            }
2755        }
2756    }
2757
2758    online_stake_percentage as u64
2759}
2760
2761fn cleanup_accounts_paths(config: &ValidatorConfig) {
2762    for account_path in &config.account_paths {
2763        move_and_async_delete_path_contents(account_path);
2764    }
2765    if let Some(shrink_paths) = config
2766        .accounts_db_config
2767        .as_ref()
2768        .and_then(|config| config.shrink_paths.as_ref())
2769    {
2770        for shrink_path in shrink_paths {
2771            move_and_async_delete_path_contents(shrink_path);
2772        }
2773    }
2774}
2775
2776pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2777    // if the snapshot config is configured to *not* take snapshots, then it is valid
2778    if !snapshot_config.should_generate_snapshots() {
2779        return true;
2780    }
2781
2782    let full_snapshot_interval_slots = snapshot_config.full_snapshot_archive_interval_slots;
2783    let incremental_snapshot_interval_slots =
2784        snapshot_config.incremental_snapshot_archive_interval_slots;
2785
2786    if incremental_snapshot_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
2787        true
2788    } else if incremental_snapshot_interval_slots == 0 {
2789        false
2790    } else {
2791        full_snapshot_interval_slots > incremental_snapshot_interval_slots
2792    }
2793}
2794
2795#[cfg(test)]
2796mod tests {
2797    use {
2798        super::*,
2799        crossbeam_channel::{bounded, RecvTimeoutError},
2800        solana_entry::entry,
2801        solana_genesis_config::create_genesis_config,
2802        solana_gossip::contact_info::ContactInfo,
2803        solana_ledger::{
2804            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2805            get_tmp_ledger_path_auto_delete,
2806        },
2807        solana_poh_config::PohConfig,
2808        solana_sha256_hasher::hash,
2809        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2810        std::{fs::remove_dir_all, thread, time::Duration},
2811    };
2812
2813    #[test]
2814    fn validator_exit() {
2815        solana_logger::setup();
2816        let leader_keypair = Keypair::new();
2817        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2818
2819        let validator_keypair = Keypair::new();
2820        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2821        let genesis_config =
2822            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2823                .genesis_config;
2824        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2825
2826        let voting_keypair = Arc::new(Keypair::new());
2827        let config = ValidatorConfig {
2828            rpc_addrs: Some((
2829                validator_node.info.rpc().unwrap(),
2830                validator_node.info.rpc_pubsub().unwrap(),
2831            )),
2832            ..ValidatorConfig::default_for_test()
2833        };
2834        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2835        let validator = Validator::new(
2836            validator_node,
2837            Arc::new(validator_keypair),
2838            &validator_ledger_path,
2839            &voting_keypair.pubkey(),
2840            Arc::new(RwLock::new(vec![voting_keypair])),
2841            vec![leader_node.info],
2842            &config,
2843            true, // should_check_duplicate_instance
2844            None, // rpc_to_plugin_manager_receiver
2845            start_progress.clone(),
2846            SocketAddrSpace::Unspecified,
2847            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2848            Arc::new(RwLock::new(None)),
2849        )
2850        .expect("assume successful validator start");
2851        assert_eq!(
2852            *start_progress.read().unwrap(),
2853            ValidatorStartProgress::Running
2854        );
2855        validator.close();
2856        remove_dir_all(validator_ledger_path).unwrap();
2857    }
2858
2859    #[test]
2860    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2861        solana_logger::setup();
2862
2863        let ledger_path = get_tmp_ledger_path_auto_delete!();
2864        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2865
2866        let mut validator_config = ValidatorConfig::default_for_test();
2867        let mut hard_forks = HardForks::default();
2868        let mut root_slot;
2869
2870        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2871        root_slot = 10;
2872        validator_config.wait_for_supermajority = Some(root_slot);
2873        assert_eq!(
2874            should_cleanup_blockstore_incorrect_shred_versions(
2875                &validator_config,
2876                &blockstore,
2877                root_slot,
2878                &hard_forks
2879            )
2880            .unwrap(),
2881            Some(root_slot + 1)
2882        );
2883
2884        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
2885        // Arguably operator error to pass a value for wait_for_supermajority in this case
2886        root_slot = 15;
2887        assert_eq!(
2888            should_cleanup_blockstore_incorrect_shred_versions(
2889                &validator_config,
2890                &blockstore,
2891                root_slot,
2892                &hard_forks
2893            )
2894            .unwrap(),
2895            None,
2896        );
2897
2898        // Emulate cluster restart at slot 10
2899        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
2900        hard_forks.register(10);
2901        assert_eq!(
2902            should_cleanup_blockstore_incorrect_shred_versions(
2903                &validator_config,
2904                &blockstore,
2905                root_slot,
2906                &hard_forks
2907            )
2908            .unwrap(),
2909            None,
2910        );
2911
2912        // Insert some shreds at newer slots than hard fork
2913        let entries = entry::create_ticks(1, 0, Hash::default());
2914        for i in 20..35 {
2915            let shreds = blockstore::entries_to_test_shreds(
2916                &entries,
2917                i,     // slot
2918                i - 1, // parent_slot
2919                true,  // is_full_slot
2920                1,     // version
2921                true,  // merkle_variant
2922            );
2923            blockstore.insert_shreds(shreds, None, true).unwrap();
2924        }
2925
2926        // No check as all blockstore data is newer than latest hard fork
2927        assert_eq!(
2928            should_cleanup_blockstore_incorrect_shred_versions(
2929                &validator_config,
2930                &blockstore,
2931                root_slot,
2932                &hard_forks
2933            )
2934            .unwrap(),
2935            None,
2936        );
2937
2938        // Emulate cluster restart at slot 25
2939        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
2940        root_slot = 25;
2941        hard_forks.register(root_slot);
2942        validator_config.wait_for_supermajority = Some(root_slot);
2943        assert_eq!(
2944            should_cleanup_blockstore_incorrect_shred_versions(
2945                &validator_config,
2946                &blockstore,
2947                root_slot,
2948                &hard_forks
2949            )
2950            .unwrap(),
2951            Some(root_slot + 1),
2952        );
2953        validator_config.wait_for_supermajority = None;
2954        assert_eq!(
2955            should_cleanup_blockstore_incorrect_shred_versions(
2956                &validator_config,
2957                &blockstore,
2958                root_slot,
2959                &hard_forks
2960            )
2961            .unwrap(),
2962            Some(root_slot + 1),
2963        );
2964
2965        // Do check with advanced root slot, even without wait_for_supermajority set correctly
2966        // Check starts from latest hard fork + 1
2967        root_slot = 30;
2968        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2969        assert_eq!(
2970            should_cleanup_blockstore_incorrect_shred_versions(
2971                &validator_config,
2972                &blockstore,
2973                root_slot,
2974                &hard_forks
2975            )
2976            .unwrap(),
2977            Some(latest_hard_fork + 1),
2978        );
2979
2980        // Purge blockstore up to latest hard fork
2981        // No check since all blockstore data newer than latest hard fork
2982        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2983        assert_eq!(
2984            should_cleanup_blockstore_incorrect_shred_versions(
2985                &validator_config,
2986                &blockstore,
2987                root_slot,
2988                &hard_forks
2989            )
2990            .unwrap(),
2991            None,
2992        );
2993    }
2994
2995    #[test]
2996    fn test_cleanup_blockstore_incorrect_shred_versions() {
2997        solana_logger::setup();
2998
2999        let validator_config = ValidatorConfig::default_for_test();
3000        let ledger_path = get_tmp_ledger_path_auto_delete!();
3001        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3002
3003        let entries = entry::create_ticks(1, 0, Hash::default());
3004        for i in 1..10 {
3005            let shreds = blockstore::entries_to_test_shreds(
3006                &entries,
3007                i,     // slot
3008                i - 1, // parent_slot
3009                true,  // is_full_slot
3010                1,     // version
3011                true,  // merkle_variant
3012            );
3013            blockstore.insert_shreds(shreds, None, true).unwrap();
3014        }
3015
3016        // this purges and compacts all slots greater than or equal to 5
3017        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3018        // assert that slots less than 5 aren't affected
3019        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3020        for i in 5..10 {
3021            assert!(blockstore
3022                .get_data_shreds_for_slot(i, 0)
3023                .unwrap()
3024                .is_empty());
3025        }
3026    }
3027
3028    #[test]
3029    fn validator_parallel_exit() {
3030        let leader_keypair = Keypair::new();
3031        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3032        let genesis_config =
3033            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3034                .genesis_config;
3035
3036        let mut ledger_paths = vec![];
3037        let mut validators: Vec<Validator> = (0..2)
3038            .map(|_| {
3039                let validator_keypair = Keypair::new();
3040                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3041                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3042                ledger_paths.push(validator_ledger_path.clone());
3043                let vote_account_keypair = Keypair::new();
3044                let config = ValidatorConfig {
3045                    rpc_addrs: Some((
3046                        validator_node.info.rpc().unwrap(),
3047                        validator_node.info.rpc_pubsub().unwrap(),
3048                    )),
3049                    ..ValidatorConfig::default_for_test()
3050                };
3051                Validator::new(
3052                    validator_node,
3053                    Arc::new(validator_keypair),
3054                    &validator_ledger_path,
3055                    &vote_account_keypair.pubkey(),
3056                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3057                    vec![leader_node.info.clone()],
3058                    &config,
3059                    true, // should_check_duplicate_instance.
3060                    None, // rpc_to_plugin_manager_receiver
3061                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3062                    SocketAddrSpace::Unspecified,
3063                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3064                    Arc::new(RwLock::new(None)),
3065                )
3066                .expect("assume successful validator start")
3067            })
3068            .collect();
3069
3070        // Each validator can exit in parallel to speed many sequential calls to join`
3071        validators.iter_mut().for_each(|v| v.exit());
3072
3073        // spawn a new thread to wait for the join of the validator
3074        let (sender, receiver) = bounded(0);
3075        let _ = thread::spawn(move || {
3076            validators.into_iter().for_each(|validator| {
3077                validator.join();
3078            });
3079            sender.send(()).unwrap();
3080        });
3081
3082        let timeout = Duration::from_secs(60);
3083        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3084            panic!("timeout for shutting down validators",);
3085        }
3086
3087        for path in ledger_paths {
3088            remove_dir_all(path).unwrap();
3089        }
3090    }
3091
3092    #[test]
3093    fn test_wait_for_supermajority() {
3094        solana_logger::setup();
3095        let node_keypair = Arc::new(Keypair::new());
3096        let cluster_info = ClusterInfo::new(
3097            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3098            node_keypair,
3099            SocketAddrSpace::Unspecified,
3100        );
3101
3102        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3103        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3104        let mut config = ValidatorConfig::default_for_test();
3105        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3106        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3107
3108        assert!(!wait_for_supermajority(
3109            &config,
3110            None,
3111            &bank_forks,
3112            &cluster_info,
3113            rpc_override_health_check.clone(),
3114            &start_progress,
3115        )
3116        .unwrap());
3117
3118        // bank=0, wait=1, should fail
3119        config.wait_for_supermajority = Some(1);
3120        assert!(matches!(
3121            wait_for_supermajority(
3122                &config,
3123                None,
3124                &bank_forks,
3125                &cluster_info,
3126                rpc_override_health_check.clone(),
3127                &start_progress,
3128            ),
3129            Err(ValidatorError::NotEnoughLedgerData(_, _)),
3130        ));
3131
3132        // bank=1, wait=0, should pass, bank is past the wait slot
3133        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3134            bank_forks.read().unwrap().root_bank(),
3135            &Pubkey::default(),
3136            1,
3137        ));
3138        config.wait_for_supermajority = Some(0);
3139        assert!(!wait_for_supermajority(
3140            &config,
3141            None,
3142            &bank_forks,
3143            &cluster_info,
3144            rpc_override_health_check.clone(),
3145            &start_progress,
3146        )
3147        .unwrap());
3148
3149        // bank=1, wait=1, equal, but bad hash provided
3150        config.wait_for_supermajority = Some(1);
3151        config.expected_bank_hash = Some(hash(&[1]));
3152        assert!(matches!(
3153            wait_for_supermajority(
3154                &config,
3155                None,
3156                &bank_forks,
3157                &cluster_info,
3158                rpc_override_health_check,
3159                &start_progress,
3160            ),
3161            Err(ValidatorError::BankHashMismatch(_, _)),
3162        ));
3163    }
3164
3165    #[test]
3166    fn test_interval_check() {
3167        fn new_snapshot_config(
3168            full_snapshot_archive_interval_slots: Slot,
3169            incremental_snapshot_archive_interval_slots: Slot,
3170        ) -> SnapshotConfig {
3171            SnapshotConfig {
3172                full_snapshot_archive_interval_slots,
3173                incremental_snapshot_archive_interval_slots,
3174                ..SnapshotConfig::default()
3175            }
3176        }
3177
3178        assert!(is_snapshot_config_valid(&new_snapshot_config(300, 200)));
3179
3180        assert!(is_snapshot_config_valid(&new_snapshot_config(
3181            snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3182            snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS
3183        )));
3184        assert!(is_snapshot_config_valid(&new_snapshot_config(
3185            snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3186            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3187        )));
3188        assert!(is_snapshot_config_valid(&new_snapshot_config(
3189            snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3190            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3191        )));
3192        assert!(is_snapshot_config_valid(&new_snapshot_config(
3193            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3194            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3195        )));
3196
3197        // Full snaphot intervals used to be required to be a multiple of
3198        // incremental snapshot intervals, but that's no longer the case
3199        // so test that the check is relaxed.
3200        assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3201        assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3202        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3203
3204        assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 100)));
3205        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 0)));
3206        assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 0)));
3207        assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3208        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3209        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3210
3211        assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3212        assert!(is_snapshot_config_valid(&SnapshotConfig {
3213            full_snapshot_archive_interval_slots: 37,
3214            incremental_snapshot_archive_interval_slots: 41,
3215            ..SnapshotConfig::new_load_only()
3216        }));
3217        assert!(is_snapshot_config_valid(&SnapshotConfig {
3218            full_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3219            incremental_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3220            ..SnapshotConfig::new_load_only()
3221        }));
3222    }
3223
3224    fn target_tick_duration() -> Duration {
3225        // DEFAULT_MS_PER_SLOT = 400
3226        // DEFAULT_TICKS_PER_SLOT = 64
3227        // MS_PER_TICK = 6
3228        //
3229        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3230        //
3231        // So, convert to microseconds first to avoid the integer rounding error
3232        let target_tick_duration_us =
3233            solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3234        assert_eq!(target_tick_duration_us, 6250);
3235        Duration::from_micros(target_tick_duration_us)
3236    }
3237
3238    #[test]
3239    fn test_poh_speed() {
3240        solana_logger::setup();
3241        let poh_config = PohConfig {
3242            target_tick_duration: target_tick_duration(),
3243            // make PoH rate really fast to cause the panic condition
3244            hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3245            ..PohConfig::default()
3246        };
3247        let genesis_config = GenesisConfig {
3248            poh_config,
3249            ..GenesisConfig::default()
3250        };
3251        let bank = Bank::new_for_tests(&genesis_config);
3252        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3253    }
3254
3255    #[test]
3256    fn test_poh_speed_no_hashes_per_tick() {
3257        solana_logger::setup();
3258        let poh_config = PohConfig {
3259            target_tick_duration: target_tick_duration(),
3260            hashes_per_tick: None,
3261            ..PohConfig::default()
3262        };
3263        let genesis_config = GenesisConfig {
3264            poh_config,
3265            ..GenesisConfig::default()
3266        };
3267        let bank = Bank::new_for_tests(&genesis_config);
3268        check_poh_speed(&bank, Some(10_000)).unwrap();
3269    }
3270}