solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        accounts_hash_verifier::AccountsHashVerifier,
7        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
8        banking_trace::{self, BankingTracer, TraceError},
9        cluster_info_vote_listener::VoteTracker,
10        completed_data_sets_service::CompletedDataSetsService,
11        consensus::{
12            reconcile_blockstore_roots_with_external_source,
13            tower_storage::{NullTowerStorage, TowerStorage},
14            ExternalRootSource, Tower,
15        },
16        poh_timing_report_service::PohTimingReportService,
17        repair::{
18            self,
19            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
20            serve_repair::ServeRepair,
21            serve_repair_service::ServeRepairService,
22        },
23        sample_performance_service::SamplePerformanceService,
24        sigverify,
25        snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService},
26        stats_reporter_service::StatsReporterService,
27        system_monitor_service::{
28            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
29        },
30        tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
31        tvu::{Tvu, TvuConfig, TvuSockets},
32    },
33    agave_thread_manager::{ThreadManager, ThreadManagerConfig},
34    anyhow::{anyhow, Context, Result},
35    crossbeam_channel::{bounded, unbounded, Receiver},
36    lazy_static::lazy_static,
37    quinn::Endpoint,
38    solana_accounts_db::{
39        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
40        accounts_update_notifier_interface::AccountsUpdateNotifier,
41        hardened_unpack::{
42            open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
43        },
44        utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
45    },
46    solana_client::connection_cache::{ConnectionCache, Protocol},
47    solana_entry::poh::compute_hash_time,
48    solana_geyser_plugin_manager::{
49        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
50    },
51    solana_gossip::{
52        cluster_info::{
53            ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
54            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
55        },
56        contact_info::ContactInfo,
57        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
58        gossip_service::GossipService,
59    },
60    solana_ledger::{
61        bank_forks_utils,
62        blockstore::{
63            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
64            MAX_REPLAY_WAKE_UP_SIGNALS,
65        },
66        blockstore_metric_report_service::BlockstoreMetricReportService,
67        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
68        blockstore_processor::{self, TransactionStatusSender},
69        entry_notifier_interface::EntryNotifierArc,
70        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
71        leader_schedule::FixedSchedule,
72        leader_schedule_cache::LeaderScheduleCache,
73        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
74    },
75    solana_measure::measure::Measure,
76    solana_metrics::{
77        datapoint_info, metrics::metrics_config_sanity_check, poh_timing_point::PohTimingSender,
78    },
79    solana_poh::{
80        poh_recorder::PohRecorder,
81        poh_service::{self, PohService},
82    },
83    solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
84    solana_rpc::{
85        block_meta_service::{BlockMetaSender, BlockMetaService},
86        max_slots::MaxSlots,
87        optimistically_confirmed_bank_tracker::{
88            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
89            OptimisticallyConfirmedBankTracker,
90        },
91        rpc::JsonRpcConfig,
92        rpc_completed_slots_service::RpcCompletedSlotsService,
93        rpc_pubsub_service::{PubSubConfig, PubSubService},
94        rpc_service::JsonRpcService,
95        rpc_subscriptions::RpcSubscriptions,
96        transaction_notifier_interface::TransactionNotifierArc,
97        transaction_status_service::TransactionStatusService,
98    },
99    solana_runtime::{
100        accounts_background_service::{
101            AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
102            PrunedBanksRequestHandler, SnapshotRequestHandler,
103        },
104        bank::Bank,
105        bank_forks::BankForks,
106        commitment::BlockCommitmentCache,
107        prioritization_fee_cache::PrioritizationFeeCache,
108        runtime_config::RuntimeConfig,
109        snapshot_archive_info::SnapshotArchiveInfoGetter,
110        snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
111        snapshot_config::SnapshotConfig,
112        snapshot_hash::StartingSnapshotHashes,
113        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
114    },
115    solana_sdk::{
116        clock::Slot,
117        epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
118        exit::Exit,
119        genesis_config::{ClusterType, GenesisConfig},
120        hard_forks::HardForks,
121        hash::Hash,
122        pubkey::Pubkey,
123        shred_version::compute_shred_version,
124        signature::{Keypair, Signer},
125        timing::timestamp,
126    },
127    solana_send_transaction_service::send_transaction_service,
128    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
129    solana_tpu_client::tpu_client::{
130        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
131    },
132    solana_turbine::{self, broadcast_stage::BroadcastStageType},
133    solana_unified_scheduler_pool::DefaultSchedulerPool,
134    solana_vote_program::vote_state,
135    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
136    std::{
137        borrow::Cow,
138        collections::{HashMap, HashSet},
139        net::SocketAddr,
140        num::NonZeroUsize,
141        path::{Path, PathBuf},
142        sync::{
143            atomic::{AtomicBool, AtomicU64, Ordering},
144            Arc, Mutex, RwLock,
145        },
146        thread::{sleep, Builder, JoinHandle},
147        time::{Duration, Instant},
148    },
149    strum::VariantNames,
150    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
151    thiserror::Error,
152    tokio::runtime::Runtime as TokioRuntime,
153};
154
155const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
156const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
157// Right now since we reuse the wait for supermajority code, the
158// following threshold should always greater than or equal to
159// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
160const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
161    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
162
163#[derive(
164    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
165)]
166#[strum(serialize_all = "kebab-case")]
167pub enum BlockVerificationMethod {
168    BlockstoreProcessor,
169    #[default]
170    UnifiedScheduler,
171}
172
173impl BlockVerificationMethod {
174    pub const fn cli_names() -> &'static [&'static str] {
175        Self::VARIANTS
176    }
177
178    pub fn cli_message() -> &'static str {
179        lazy_static! {
180            static ref MESSAGE: String = format!(
181                "Switch transaction scheduling method for verifying ledger entries [default: {}]",
182                BlockVerificationMethod::default()
183            );
184        };
185
186        &MESSAGE
187    }
188}
189
190#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
191#[strum(serialize_all = "kebab-case")]
192pub enum BlockProductionMethod {
193    #[default]
194    CentralScheduler,
195    CentralSchedulerGreedy,
196}
197
198impl BlockProductionMethod {
199    pub const fn cli_names() -> &'static [&'static str] {
200        Self::VARIANTS
201    }
202
203    pub fn cli_message() -> &'static str {
204        lazy_static! {
205            static ref MESSAGE: String = format!(
206                "Switch transaction scheduling method for producing ledger entries [default: {}]",
207                BlockProductionMethod::default()
208            );
209        };
210
211        &MESSAGE
212    }
213}
214
215#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
216#[strum(serialize_all = "kebab-case")]
217pub enum TransactionStructure {
218    #[default]
219    Sdk,
220    View,
221}
222
223impl TransactionStructure {
224    pub const fn cli_names() -> &'static [&'static str] {
225        Self::VARIANTS
226    }
227
228    pub fn cli_message() -> &'static str {
229        lazy_static! {
230            static ref MESSAGE: String = format!(
231                "Switch internal transaction structure/representation [default: {}]",
232                TransactionStructure::default()
233            );
234        };
235
236        &MESSAGE
237    }
238}
239
240/// Configuration for the block generator invalidator for replay.
241#[derive(Clone, Debug)]
242pub struct GeneratorConfig {
243    pub accounts_path: String,
244    pub starting_keypairs: Arc<Vec<Keypair>>,
245}
246
247pub struct ValidatorConfig {
248    pub halt_at_slot: Option<Slot>,
249    pub expected_genesis_hash: Option<Hash>,
250    pub expected_bank_hash: Option<Hash>,
251    pub expected_shred_version: Option<u16>,
252    pub voting_disabled: bool,
253    pub account_paths: Vec<PathBuf>,
254    pub account_snapshot_paths: Vec<PathBuf>,
255    pub rpc_config: JsonRpcConfig,
256    /// Specifies which plugins to start up with
257    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
258    pub geyser_plugin_always_enabled: bool,
259    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
260    pub pubsub_config: PubSubConfig,
261    pub snapshot_config: SnapshotConfig,
262    pub max_ledger_shreds: Option<u64>,
263    pub blockstore_options: BlockstoreOptions,
264    pub broadcast_stage_type: BroadcastStageType,
265    pub turbine_disabled: Arc<AtomicBool>,
266    pub fixed_leader_schedule: Option<FixedSchedule>,
267    pub wait_for_supermajority: Option<Slot>,
268    pub new_hard_forks: Option<Vec<Slot>>,
269    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
270    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
271    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
272    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
273    pub accounts_hash_interval_slots: u64,
274    pub max_genesis_archive_unpacked_size: u64,
275    /// Run PoH, transaction signature and other transaction verifications during blockstore
276    /// processing.
277    pub run_verification: bool,
278    pub require_tower: bool,
279    pub tower_storage: Arc<dyn TowerStorage>,
280    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
281    pub contact_debug_interval: u64,
282    pub contact_save_interval: u64,
283    pub send_transaction_service_config: send_transaction_service::Config,
284    pub no_poh_speed_test: bool,
285    pub no_os_memory_stats_reporting: bool,
286    pub no_os_network_stats_reporting: bool,
287    pub no_os_cpu_stats_reporting: bool,
288    pub no_os_disk_stats_reporting: bool,
289    pub poh_pinned_cpu_core: usize,
290    pub poh_hashes_per_batch: u64,
291    pub process_ledger_before_services: bool,
292    pub accounts_db_config: Option<AccountsDbConfig>,
293    pub warp_slot: Option<Slot>,
294    pub accounts_db_test_hash_calculation: bool,
295    pub accounts_db_skip_shrink: bool,
296    pub accounts_db_force_initial_clean: bool,
297    pub tpu_coalesce: Duration,
298    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
299    pub validator_exit: Arc<RwLock<Exit>>,
300    pub no_wait_for_vote_to_start_leader: bool,
301    pub wait_to_vote_slot: Option<Slot>,
302    pub runtime_config: RuntimeConfig,
303    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
304    pub block_verification_method: BlockVerificationMethod,
305    pub block_production_method: BlockProductionMethod,
306    pub transaction_struct: TransactionStructure,
307    pub enable_block_production_forwarding: bool,
308    pub generator_config: Option<GeneratorConfig>,
309    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
310    pub wen_restart_proto_path: Option<PathBuf>,
311    pub wen_restart_coordinator: Option<Pubkey>,
312    pub unified_scheduler_handler_threads: Option<usize>,
313    pub ip_echo_server_threads: NonZeroUsize,
314    pub rayon_global_threads: NonZeroUsize,
315    pub replay_forks_threads: NonZeroUsize,
316    pub replay_transactions_threads: NonZeroUsize,
317    pub tvu_shred_sigverify_threads: NonZeroUsize,
318    pub thread_manager_config: ThreadManagerConfig,
319    pub delay_leader_block_for_pending_fork: bool,
320}
321
322impl Default for ValidatorConfig {
323    fn default() -> Self {
324        Self {
325            halt_at_slot: None,
326            expected_genesis_hash: None,
327            expected_bank_hash: None,
328            expected_shred_version: None,
329            voting_disabled: false,
330            max_ledger_shreds: None,
331            blockstore_options: BlockstoreOptions::default(),
332            account_paths: Vec::new(),
333            account_snapshot_paths: Vec::new(),
334            rpc_config: JsonRpcConfig::default(),
335            on_start_geyser_plugin_config_files: None,
336            geyser_plugin_always_enabled: false,
337            rpc_addrs: None,
338            pubsub_config: PubSubConfig::default(),
339            snapshot_config: SnapshotConfig::new_load_only(),
340            broadcast_stage_type: BroadcastStageType::Standard,
341            turbine_disabled: Arc::<AtomicBool>::default(),
342            fixed_leader_schedule: None,
343            wait_for_supermajority: None,
344            new_hard_forks: None,
345            known_validators: None,
346            repair_validators: None,
347            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
348            gossip_validators: None,
349            accounts_hash_interval_slots: u64::MAX,
350            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
351            run_verification: true,
352            require_tower: false,
353            tower_storage: Arc::new(NullTowerStorage::default()),
354            debug_keys: None,
355            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
356            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
357            send_transaction_service_config: send_transaction_service::Config::default(),
358            no_poh_speed_test: true,
359            no_os_memory_stats_reporting: true,
360            no_os_network_stats_reporting: true,
361            no_os_cpu_stats_reporting: true,
362            no_os_disk_stats_reporting: true,
363            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
364            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
365            process_ledger_before_services: false,
366            warp_slot: None,
367            accounts_db_test_hash_calculation: false,
368            accounts_db_skip_shrink: false,
369            accounts_db_force_initial_clean: false,
370            tpu_coalesce: DEFAULT_TPU_COALESCE,
371            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
372            validator_exit: Arc::new(RwLock::new(Exit::default())),
373            no_wait_for_vote_to_start_leader: true,
374            accounts_db_config: None,
375            wait_to_vote_slot: None,
376            runtime_config: RuntimeConfig::default(),
377            banking_trace_dir_byte_limit: 0,
378            block_verification_method: BlockVerificationMethod::default(),
379            block_production_method: BlockProductionMethod::default(),
380            transaction_struct: TransactionStructure::default(),
381            enable_block_production_forwarding: false,
382            generator_config: None,
383            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
384            wen_restart_proto_path: None,
385            wen_restart_coordinator: None,
386            unified_scheduler_handler_threads: None,
387            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
388            rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
389            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
390            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
391            thread_manager_config: ThreadManagerConfig::default_for_agave(),
392            tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
393            delay_leader_block_for_pending_fork: false,
394        }
395    }
396}
397
398impl ValidatorConfig {
399    pub fn default_for_test() -> Self {
400        let max_thread_count =
401            NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");
402
403        Self {
404            accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
405            blockstore_options: BlockstoreOptions::default_for_tests(),
406            rpc_config: JsonRpcConfig::default_for_test(),
407            block_production_method: BlockProductionMethod::default(),
408            enable_block_production_forwarding: true, // enable forwarding by default for tests
409            rayon_global_threads: max_thread_count,
410            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
411            replay_transactions_threads: max_thread_count,
412            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
413                .expect("thread count is non-zero"),
414            ..Self::default()
415        }
416    }
417
418    pub fn enable_default_rpc_block_subscribe(&mut self) {
419        let pubsub_config = PubSubConfig {
420            enable_block_subscription: true,
421            ..PubSubConfig::default()
422        };
423        let rpc_config = JsonRpcConfig {
424            enable_rpc_transaction_history: true,
425            ..JsonRpcConfig::default_for_test()
426        };
427
428        self.pubsub_config = pubsub_config;
429        self.rpc_config = rpc_config;
430    }
431}
432
433// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
434// the admin RPC channel to help them to follow the general progress of node startup without
435// having to watch log messages.
436#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
437pub enum ValidatorStartProgress {
438    Initializing, // Catch all, default state
439    SearchingForRpcService,
440    DownloadingSnapshot {
441        slot: Slot,
442        rpc_addr: SocketAddr,
443    },
444    CleaningBlockStore,
445    CleaningAccounts,
446    LoadingLedger,
447    ProcessingLedger {
448        slot: Slot,
449        max_slot: Slot,
450    },
451    StartingServices,
452    Halted, // Validator halted due to `--dev-halt-at-slot` argument
453    WaitingForSupermajority {
454        slot: Slot,
455        gossip_stake_percent: u64,
456    },
457
458    // `Running` is the terminal state once the validator fully starts and all services are
459    // operational
460    Running,
461}
462
463impl Default for ValidatorStartProgress {
464    fn default() -> Self {
465        Self::Initializing
466    }
467}
468
469struct BlockstoreRootScan {
470    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
471}
472
473impl BlockstoreRootScan {
474    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
475        let thread = if config.rpc_addrs.is_some()
476            && config.rpc_config.enable_rpc_transaction_history
477            && config.rpc_config.rpc_scan_and_fix_roots
478        {
479            Some(
480                Builder::new()
481                    .name("solBStoreRtScan".to_string())
482                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
483                    .unwrap(),
484            )
485        } else {
486            None
487        };
488        Self { thread }
489    }
490
491    fn join(self) {
492        if let Some(blockstore_root_scan) = self.thread {
493            if let Err(err) = blockstore_root_scan.join() {
494                warn!("blockstore_root_scan failed to join {:?}", err);
495            }
496        }
497    }
498}
499
500#[derive(Default)]
501struct TransactionHistoryServices {
502    transaction_status_sender: Option<TransactionStatusSender>,
503    transaction_status_service: Option<TransactionStatusService>,
504    max_complete_transaction_status_slot: Arc<AtomicU64>,
505    max_complete_rewards_slot: Arc<AtomicU64>,
506    block_meta_sender: Option<BlockMetaSender>,
507    block_meta_service: Option<BlockMetaService>,
508}
509
510/// A struct easing passing Validator TPU Configurations
511pub struct ValidatorTpuConfig {
512    /// Controls if to use QUIC for sending regular TPU transaction
513    pub use_quic: bool,
514    /// Controls if to use QUIC for sending TPU votes
515    pub vote_use_quic: bool,
516    /// Controls the connection cache pool size
517    pub tpu_connection_pool_size: usize,
518    /// Controls if to enable UDP for TPU tansactions.
519    pub tpu_enable_udp: bool,
520    /// QUIC server config for regular TPU
521    pub tpu_quic_server_config: QuicServerParams,
522    /// QUIC server config for TPU forward
523    pub tpu_fwd_quic_server_config: QuicServerParams,
524    /// QUIC server config for Vote
525    pub vote_quic_server_config: QuicServerParams,
526}
527
528impl ValidatorTpuConfig {
529    /// A convenient function to build a ValidatorTpuConfig for testing with good
530    /// default.
531    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
532        let tpu_quic_server_config = QuicServerParams {
533            max_connections_per_ipaddr_per_min: 32,
534            coalesce_channel_size: 100_000, // smaller channel size for faster test
535            ..Default::default()
536        };
537
538        let tpu_fwd_quic_server_config = QuicServerParams {
539            max_connections_per_ipaddr_per_min: 32,
540            max_unstaked_connections: 0,
541            coalesce_channel_size: 100_000, // smaller channel size for faster test
542            ..Default::default()
543        };
544
545        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
546        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
547
548        ValidatorTpuConfig {
549            use_quic: DEFAULT_TPU_USE_QUIC,
550            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
551            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
552            tpu_enable_udp,
553            tpu_quic_server_config,
554            tpu_fwd_quic_server_config,
555            vote_quic_server_config,
556        }
557    }
558}
559
560pub struct Validator {
561    validator_exit: Arc<RwLock<Exit>>,
562    json_rpc_service: Option<JsonRpcService>,
563    pubsub_service: Option<PubSubService>,
564    rpc_completed_slots_service: Option<JoinHandle<()>>,
565    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
566    transaction_status_service: Option<TransactionStatusService>,
567    block_meta_service: Option<BlockMetaService>,
568    entry_notifier_service: Option<EntryNotifierService>,
569    system_monitor_service: Option<SystemMonitorService>,
570    sample_performance_service: Option<SamplePerformanceService>,
571    poh_timing_report_service: PohTimingReportService,
572    stats_reporter_service: StatsReporterService,
573    gossip_service: GossipService,
574    serve_repair_service: ServeRepairService,
575    completed_data_sets_service: Option<CompletedDataSetsService>,
576    snapshot_packager_service: Option<SnapshotPackagerService>,
577    poh_recorder: Arc<RwLock<PohRecorder>>,
578    poh_service: PohService,
579    tpu: Tpu,
580    tvu: Tvu,
581    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
582    pub cluster_info: Arc<ClusterInfo>,
583    pub bank_forks: Arc<RwLock<BankForks>>,
584    pub blockstore: Arc<Blockstore>,
585    geyser_plugin_service: Option<GeyserPluginService>,
586    blockstore_metric_report_service: BlockstoreMetricReportService,
587    accounts_background_service: AccountsBackgroundService,
588    accounts_hash_verifier: AccountsHashVerifier,
589    turbine_quic_endpoint: Option<Endpoint>,
590    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
591    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
592    repair_quic_endpoints: Option<[Endpoint; 3]>,
593    repair_quic_endpoints_runtime: Option<TokioRuntime>,
594    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
595    thread_manager: ThreadManager,
596}
597
598impl Validator {
599    #[allow(clippy::too_many_arguments)]
600    pub fn new(
601        mut node: Node,
602        identity_keypair: Arc<Keypair>,
603        ledger_path: &Path,
604        vote_account: &Pubkey,
605        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
606        cluster_entrypoints: Vec<ContactInfo>,
607        config: &ValidatorConfig,
608        should_check_duplicate_instance: bool,
609        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
610        start_progress: Arc<RwLock<ValidatorStartProgress>>,
611        socket_addr_space: SocketAddrSpace,
612        tpu_config: ValidatorTpuConfig,
613        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
614    ) -> Result<Self> {
615        let ValidatorTpuConfig {
616            use_quic,
617            vote_use_quic,
618            tpu_connection_pool_size,
619            tpu_enable_udp,
620            tpu_quic_server_config,
621            tpu_fwd_quic_server_config,
622            vote_quic_server_config,
623        } = tpu_config;
624
625        let start_time = Instant::now();
626
627        let thread_manager = ThreadManager::new(&config.thread_manager_config)?;
628        // Initialize the global rayon pool first to ensure the value in config
629        // is honored. Otherwise, some code accessing the global pool could
630        // cause it to get initialized with Rayon's default (not ours)
631        if rayon::ThreadPoolBuilder::new()
632            .thread_name(|i| format!("solRayonGlob{i:02}"))
633            .num_threads(config.rayon_global_threads.get())
634            .build_global()
635            .is_err()
636        {
637            warn!("Rayon global thread pool already initialized");
638        }
639
640        let id = identity_keypair.pubkey();
641        assert_eq!(&id, node.info.pubkey());
642
643        info!("identity pubkey: {id}");
644        info!("vote account pubkey: {vote_account}");
645
646        if !config.no_os_network_stats_reporting {
647            verify_net_stats_access().map_err(|e| {
648                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
649            })?;
650        }
651
652        let mut bank_notification_senders = Vec::new();
653
654        let exit = Arc::new(AtomicBool::new(false));
655
656        let geyser_plugin_config_files = config
657            .on_start_geyser_plugin_config_files
658            .as_ref()
659            .map(Cow::Borrowed)
660            .or_else(|| {
661                config
662                    .geyser_plugin_always_enabled
663                    .then_some(Cow::Owned(vec![]))
664            });
665        let geyser_plugin_service =
666            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
667                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
668                bank_notification_senders.push(confirmed_bank_sender);
669                let rpc_to_plugin_manager_receiver_and_exit =
670                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
671                Some(
672                    GeyserPluginService::new_with_receiver(
673                        confirmed_bank_receiver,
674                        config.geyser_plugin_always_enabled,
675                        geyser_plugin_config_files.as_ref(),
676                        rpc_to_plugin_manager_receiver_and_exit,
677                    )
678                    .map_err(|err| {
679                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
680                    })?,
681                )
682            } else {
683                None
684            };
685
686        if config.voting_disabled {
687            warn!("voting disabled");
688            authorized_voter_keypairs.write().unwrap().clear();
689        } else {
690            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
691                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
692            }
693        }
694
695        for cluster_entrypoint in &cluster_entrypoints {
696            info!("entrypoint: {:?}", cluster_entrypoint);
697        }
698
699        if solana_perf::perf_libs::api().is_some() {
700            info!("Initializing sigverify, this could take a while...");
701        } else {
702            info!("Initializing sigverify...");
703        }
704        sigverify::init();
705        info!("Initializing sigverify done.");
706
707        if !ledger_path.is_dir() {
708            return Err(anyhow!(
709                "ledger directory does not exist or is not accessible: {ledger_path:?}"
710            ));
711        }
712        let genesis_config = load_genesis(config, ledger_path)?;
713        metrics_config_sanity_check(genesis_config.cluster_type)?;
714
715        info!("Cleaning accounts paths..");
716        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
717        let mut timer = Measure::start("clean_accounts_paths");
718        cleanup_accounts_paths(config);
719        timer.stop();
720        info!("Cleaning accounts paths done. {timer}");
721
722        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
723        snapshot_utils::purge_old_bank_snapshots_at_startup(
724            &config.snapshot_config.bank_snapshots_dir,
725        );
726
727        info!("Cleaning orphaned account snapshot directories..");
728        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
729        clean_orphaned_account_snapshot_dirs(
730            &config.snapshot_config.bank_snapshots_dir,
731            &config.account_snapshot_paths,
732        )
733        .context("failed to clean orphaned account snapshot directories")?;
734        timer.stop();
735        info!("Cleaning orphaned account snapshot directories done. {timer}");
736
737        // The accounts hash cache dir was renamed, so cleanup any old dirs that exist.
738        let accounts_hash_cache_path = config
739            .accounts_db_config
740            .as_ref()
741            .and_then(|config| config.accounts_hash_cache_path.as_ref())
742            .map(PathBuf::as_path)
743            .unwrap_or(ledger_path);
744        let old_accounts_hash_cache_dirs = [
745            ledger_path.join("calculate_accounts_hash_cache"),
746            accounts_hash_cache_path.join("full"),
747            accounts_hash_cache_path.join("incremental"),
748            accounts_hash_cache_path.join("transient"),
749        ];
750        for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
751            if old_accounts_hash_cache_dir.exists() {
752                move_and_async_delete_path(old_accounts_hash_cache_dir);
753            }
754        }
755
756        {
757            let exit = exit.clone();
758            config
759                .validator_exit
760                .write()
761                .unwrap()
762                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
763        }
764
765        let accounts_update_notifier = geyser_plugin_service
766            .as_ref()
767            .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
768
769        let transaction_notifier = geyser_plugin_service
770            .as_ref()
771            .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());
772
773        let entry_notifier = geyser_plugin_service
774            .as_ref()
775            .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());
776
777        let block_metadata_notifier = geyser_plugin_service
778            .as_ref()
779            .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
780
781        let slot_status_notifier = geyser_plugin_service
782            .as_ref()
783            .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());
784
785        info!(
786            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
787             entry_notifier: {}",
788            accounts_update_notifier.is_some(),
789            transaction_notifier.is_some(),
790            entry_notifier.is_some()
791        );
792
793        let system_monitor_service = Some(SystemMonitorService::new(
794            exit.clone(),
795            SystemMonitorStatsReportConfig {
796                report_os_memory_stats: !config.no_os_memory_stats_reporting,
797                report_os_network_stats: !config.no_os_network_stats_reporting,
798                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
799                report_os_disk_stats: !config.no_os_disk_stats_reporting,
800            },
801        ));
802
803        let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
804        let poh_timing_report_service =
805            PohTimingReportService::new(poh_timing_point_receiver, exit.clone());
806
807        let (
808            bank_forks,
809            blockstore,
810            original_blockstore_root,
811            ledger_signal_receiver,
812            leader_schedule_cache,
813            starting_snapshot_hashes,
814            TransactionHistoryServices {
815                transaction_status_sender,
816                transaction_status_service,
817                max_complete_transaction_status_slot,
818                max_complete_rewards_slot,
819                block_meta_sender,
820                block_meta_service,
821            },
822            blockstore_process_options,
823            blockstore_root_scan,
824            pruned_banks_receiver,
825            entry_notifier_service,
826        ) = load_blockstore(
827            config,
828            ledger_path,
829            &genesis_config,
830            exit.clone(),
831            &start_progress,
832            accounts_update_notifier,
833            transaction_notifier,
834            entry_notifier,
835            Some(poh_timing_point_sender.clone()),
836        )
837        .map_err(ValidatorError::Other)?;
838
839        if !config.no_poh_speed_test {
840            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
841        }
842
843        let (root_slot, hard_forks) = {
844            let root_bank = bank_forks.read().unwrap().root_bank();
845            (root_bank.slot(), root_bank.hard_forks())
846        };
847        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
848        info!(
849            "shred version: {shred_version}, hard forks: {:?}",
850            hard_forks
851        );
852
853        if let Some(expected_shred_version) = config.expected_shred_version {
854            if expected_shred_version != shred_version {
855                return Err(ValidatorError::ShredVersionMismatch {
856                    actual: shred_version,
857                    expected: expected_shred_version,
858                }
859                .into());
860            }
861        }
862
863        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
864            config,
865            &blockstore,
866            root_slot,
867            &hard_forks,
868        )? {
869            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
870            cleanup_blockstore_incorrect_shred_versions(
871                &blockstore,
872                config,
873                start_slot,
874                shred_version,
875            )?;
876        } else {
877            info!("Skipping the blockstore check for shreds with incorrect version");
878        }
879
880        node.info.set_shred_version(shred_version);
881        node.info.set_wallclock(timestamp());
882        Self::print_node_info(&node);
883
884        let mut cluster_info = ClusterInfo::new(
885            node.info.clone(),
886            identity_keypair.clone(),
887            socket_addr_space,
888        );
889        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
890        cluster_info.set_entrypoints(cluster_entrypoints);
891        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
892        let cluster_info = Arc::new(cluster_info);
893
894        assert!(is_snapshot_config_valid(
895            &config.snapshot_config,
896            config.accounts_hash_interval_slots,
897        ));
898
899        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
900        let snapshot_packager_service = if config.snapshot_config.should_generate_snapshots() {
901            let enable_gossip_push = true;
902            let snapshot_packager_service = SnapshotPackagerService::new(
903                pending_snapshot_packages.clone(),
904                starting_snapshot_hashes,
905                exit.clone(),
906                cluster_info.clone(),
907                config.snapshot_config.clone(),
908                enable_gossip_push,
909            );
910            Some(snapshot_packager_service)
911        } else {
912            None
913        };
914
915        let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
916        let accounts_hash_verifier = AccountsHashVerifier::new(
917            accounts_package_sender.clone(),
918            accounts_package_receiver,
919            pending_snapshot_packages,
920            exit.clone(),
921            config.snapshot_config.clone(),
922        );
923
924        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
925        let accounts_background_request_sender =
926            AbsRequestSender::new(snapshot_request_sender.clone());
927        let snapshot_request_handler = SnapshotRequestHandler {
928            snapshot_config: config.snapshot_config.clone(),
929            snapshot_request_sender,
930            snapshot_request_receiver,
931            accounts_package_sender,
932        };
933        let pruned_banks_request_handler = PrunedBanksRequestHandler {
934            pruned_banks_receiver,
935        };
936        let accounts_background_service = AccountsBackgroundService::new(
937            bank_forks.clone(),
938            exit.clone(),
939            AbsRequestHandlers {
940                snapshot_request_handler,
941                pruned_banks_request_handler,
942            },
943            config.accounts_db_test_hash_calculation,
944        );
945        info!(
946            "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}",
947            config.block_verification_method, config.block_production_method, config.transaction_struct
948        );
949
950        let (replay_vote_sender, replay_vote_receiver) = unbounded();
951
952        // block min prioritization fee cache should be readable by RPC, and writable by validator
953        // (by both replay stage and banking stage)
954        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
955
956        let leader_schedule_cache = Arc::new(leader_schedule_cache);
957        let startup_verification_complete;
958        let (poh_recorder, entry_receiver, record_receiver) = {
959            let bank = &bank_forks.read().unwrap().working_bank();
960            startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
961            PohRecorder::new_with_clear_signal(
962                bank.tick_height(),
963                bank.last_blockhash(),
964                bank.clone(),
965                None,
966                bank.ticks_per_slot(),
967                config.delay_leader_block_for_pending_fork,
968                blockstore.clone(),
969                blockstore.get_new_shred_signal(0),
970                &leader_schedule_cache,
971                &genesis_config.poh_config,
972                Some(poh_timing_point_sender),
973                exit.clone(),
974            )
975        };
976        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
977
978        let (banking_tracer, tracer_thread) =
979            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
980                &blockstore.banking_trace_path(),
981                exit.clone(),
982                config.banking_trace_dir_byte_limit,
983            )))?;
984        if banking_tracer.is_enabled() {
985            info!(
986                "Enabled banking trace (dir_byte_limit: {})",
987                config.banking_trace_dir_byte_limit
988            );
989        } else {
990            info!("Disabled banking trace");
991        }
992        let banking_tracer_channels = banking_tracer.create_channels(false);
993
994        match &config.block_verification_method {
995            BlockVerificationMethod::BlockstoreProcessor => {
996                info!("no scheduler pool is installed for block verification...");
997                if let Some(count) = config.unified_scheduler_handler_threads {
998                    warn!(
999                        "--unified-scheduler-handler-threads={count} is ignored because unified \
1000                         scheduler isn't enabled"
1001                    );
1002                }
1003            }
1004            BlockVerificationMethod::UnifiedScheduler => {
1005                let scheduler_pool = DefaultSchedulerPool::new_dyn(
1006                    config.unified_scheduler_handler_threads,
1007                    config.runtime_config.log_messages_bytes_limit,
1008                    transaction_status_sender.clone(),
1009                    Some(replay_vote_sender.clone()),
1010                    prioritization_fee_cache.clone(),
1011                );
1012                bank_forks
1013                    .write()
1014                    .unwrap()
1015                    .install_scheduler_pool(scheduler_pool);
1016            }
1017        }
1018
1019        let entry_notification_sender = entry_notifier_service
1020            .as_ref()
1021            .map(|service| service.sender());
1022        let mut process_blockstore = ProcessBlockStore::new(
1023            &id,
1024            vote_account,
1025            &start_progress,
1026            &blockstore,
1027            original_blockstore_root,
1028            &bank_forks,
1029            &leader_schedule_cache,
1030            &blockstore_process_options,
1031            transaction_status_sender.as_ref(),
1032            block_meta_sender.clone(),
1033            entry_notification_sender,
1034            blockstore_root_scan,
1035            accounts_background_request_sender.clone(),
1036            config,
1037        );
1038
1039        maybe_warp_slot(
1040            config,
1041            &mut process_blockstore,
1042            ledger_path,
1043            &bank_forks,
1044            &leader_schedule_cache,
1045            &accounts_background_request_sender,
1046        )
1047        .map_err(ValidatorError::Other)?;
1048
1049        if config.process_ledger_before_services {
1050            process_blockstore
1051                .process()
1052                .map_err(ValidatorError::Other)?;
1053        }
1054        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1055
1056        let sample_performance_service =
1057            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1058                Some(SamplePerformanceService::new(
1059                    &bank_forks,
1060                    blockstore.clone(),
1061                    exit.clone(),
1062                ))
1063            } else {
1064                None
1065            };
1066
1067        let mut block_commitment_cache = BlockCommitmentCache::default();
1068        let bank_forks_guard = bank_forks.read().unwrap();
1069        block_commitment_cache.initialize_slots(
1070            bank_forks_guard.working_bank().slot(),
1071            bank_forks_guard.root(),
1072        );
1073        drop(bank_forks_guard);
1074        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1075
1076        let optimistically_confirmed_bank =
1077            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1078
1079        let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1080            exit.clone(),
1081            max_complete_transaction_status_slot.clone(),
1082            max_complete_rewards_slot.clone(),
1083            blockstore.clone(),
1084            bank_forks.clone(),
1085            block_commitment_cache.clone(),
1086            optimistically_confirmed_bank.clone(),
1087            &config.pubsub_config,
1088            None,
1089        ));
1090
1091        let max_slots = Arc::new(MaxSlots::default());
1092
1093        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1094
1095        let connection_cache = if use_quic {
1096            let connection_cache = ConnectionCache::new_with_client_options(
1097                "connection_cache_tpu_quic",
1098                tpu_connection_pool_size,
1099                None,
1100                Some((
1101                    &identity_keypair,
1102                    node.info
1103                        .tpu(Protocol::UDP)
1104                        .ok_or_else(|| {
1105                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1106                        })?
1107                        .ip(),
1108                )),
1109                Some((&staked_nodes, &identity_keypair.pubkey())),
1110            );
1111            Arc::new(connection_cache)
1112        } else {
1113            Arc::new(ConnectionCache::with_udp(
1114                "connection_cache_tpu_udp",
1115                tpu_connection_pool_size,
1116            ))
1117        };
1118
1119        let vote_connection_cache = if vote_use_quic {
1120            let vote_connection_cache = ConnectionCache::new_with_client_options(
1121                "connection_cache_vote_quic",
1122                tpu_connection_pool_size,
1123                None, // client_endpoint
1124                Some((
1125                    &identity_keypair,
1126                    node.info
1127                        .tpu_vote(Protocol::QUIC)
1128                        .ok_or_else(|| {
1129                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1130                        })?
1131                        .ip(),
1132                )),
1133                Some((&staked_nodes, &identity_keypair.pubkey())),
1134            );
1135            Arc::new(vote_connection_cache)
1136        } else {
1137            Arc::new(ConnectionCache::with_udp(
1138                "connection_cache_vote_udp",
1139                tpu_connection_pool_size,
1140            ))
1141        };
1142
1143        let rpc_override_health_check =
1144            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1145        let (
1146            json_rpc_service,
1147            pubsub_service,
1148            completed_data_sets_sender,
1149            completed_data_sets_service,
1150            rpc_completed_slots_service,
1151            optimistically_confirmed_bank_tracker,
1152            bank_notification_sender,
1153        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1154            assert_eq!(
1155                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1156                node.info
1157                    .rpc_pubsub()
1158                    .map(|addr| socket_addr_space.check(&addr))
1159            );
1160            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1161            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1162                Some(Arc::new(RwLock::new(bank_notification_senders)))
1163            } else {
1164                None
1165            };
1166
1167            let json_rpc_service = JsonRpcService::new(
1168                rpc_addr,
1169                config.rpc_config.clone(),
1170                Some(config.snapshot_config.clone()),
1171                bank_forks.clone(),
1172                block_commitment_cache.clone(),
1173                blockstore.clone(),
1174                cluster_info.clone(),
1175                Some(poh_recorder.clone()),
1176                genesis_config.hash(),
1177                ledger_path,
1178                config.validator_exit.clone(),
1179                exit.clone(),
1180                rpc_override_health_check.clone(),
1181                startup_verification_complete,
1182                optimistically_confirmed_bank.clone(),
1183                config.send_transaction_service_config.clone(),
1184                max_slots.clone(),
1185                leader_schedule_cache.clone(),
1186                connection_cache.clone(),
1187                max_complete_transaction_status_slot,
1188                max_complete_rewards_slot,
1189                prioritization_fee_cache.clone(),
1190            )
1191            .map_err(ValidatorError::Other)?;
1192
1193            let pubsub_service = if !config.rpc_config.full_api {
1194                None
1195            } else {
1196                let (trigger, pubsub_service) = PubSubService::new(
1197                    config.pubsub_config.clone(),
1198                    &rpc_subscriptions,
1199                    rpc_pubsub_addr,
1200                );
1201                config
1202                    .validator_exit
1203                    .write()
1204                    .unwrap()
1205                    .register_exit(Box::new(move || trigger.cancel()));
1206
1207                Some(pubsub_service)
1208            };
1209
1210            let (completed_data_sets_sender, completed_data_sets_service) =
1211                if !config.rpc_config.full_api {
1212                    (None, None)
1213                } else {
1214                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1215                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1216                    let completed_data_sets_service = CompletedDataSetsService::new(
1217                        completed_data_sets_receiver,
1218                        blockstore.clone(),
1219                        rpc_subscriptions.clone(),
1220                        exit.clone(),
1221                        max_slots.clone(),
1222                    );
1223                    (
1224                        Some(completed_data_sets_sender),
1225                        Some(completed_data_sets_service),
1226                    )
1227                };
1228
1229            let rpc_completed_slots_service =
1230                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1231                    let (completed_slots_sender, completed_slots_receiver) =
1232                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1233                    blockstore.add_completed_slots_signal(completed_slots_sender);
1234
1235                    Some(RpcCompletedSlotsService::spawn(
1236                        completed_slots_receiver,
1237                        rpc_subscriptions.clone(),
1238                        slot_status_notifier.clone(),
1239                        exit.clone(),
1240                    ))
1241                } else {
1242                    None
1243                };
1244
1245            let optimistically_confirmed_bank_tracker =
1246                Some(OptimisticallyConfirmedBankTracker::new(
1247                    bank_notification_receiver,
1248                    exit.clone(),
1249                    bank_forks.clone(),
1250                    optimistically_confirmed_bank,
1251                    rpc_subscriptions.clone(),
1252                    confirmed_bank_subscribers,
1253                    prioritization_fee_cache.clone(),
1254                ));
1255            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1256                sender: bank_notification_sender,
1257                should_send_parents: geyser_plugin_service.is_some(),
1258            });
1259            (
1260                Some(json_rpc_service),
1261                pubsub_service,
1262                completed_data_sets_sender,
1263                completed_data_sets_service,
1264                rpc_completed_slots_service,
1265                optimistically_confirmed_bank_tracker,
1266                bank_notification_sender_config,
1267            )
1268        } else {
1269            (None, None, None, None, None, None, None)
1270        };
1271
1272        if config.halt_at_slot.is_some() {
1273            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1274            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1275            block_commitment_cache
1276                .write()
1277                .unwrap()
1278                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1279
1280            // Park with the RPC service running, ready for inspection!
1281            warn!("Validator halted");
1282            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1283            std::thread::park();
1284        }
1285        let ip_echo_server = match node.sockets.ip_echo {
1286            None => None,
1287            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1288                tcp_listener,
1289                config.ip_echo_server_threads,
1290                Some(node.info.shred_version()),
1291            )),
1292        };
1293
1294        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1295
1296        let stats_reporter_service =
1297            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1298
1299        let gossip_service = GossipService::new(
1300            &cluster_info,
1301            Some(bank_forks.clone()),
1302            node.sockets.gossip,
1303            config.gossip_validators.clone(),
1304            should_check_duplicate_instance,
1305            Some(stats_reporter_sender.clone()),
1306            exit.clone(),
1307        );
1308        let serve_repair = ServeRepair::new(
1309            cluster_info.clone(),
1310            bank_forks.clone(),
1311            config.repair_whitelist.clone(),
1312        );
1313        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1314        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1315        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1316            unbounded();
1317
1318        let waited_for_supermajority = wait_for_supermajority(
1319            config,
1320            Some(&mut process_blockstore),
1321            &bank_forks,
1322            &cluster_info,
1323            rpc_override_health_check,
1324            &start_progress,
1325        )?;
1326
1327        let blockstore_metric_report_service =
1328            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1329
1330        let wait_for_vote_to_start_leader =
1331            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1332
1333        let poh_service = PohService::new(
1334            poh_recorder.clone(),
1335            &genesis_config.poh_config,
1336            exit.clone(),
1337            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1338            config.poh_pinned_cpu_core,
1339            config.poh_hashes_per_batch,
1340            record_receiver,
1341        );
1342        assert_eq!(
1343            blockstore.get_new_shred_signals_len(),
1344            1,
1345            "New shred signal for the TVU should be the same as the clear bank signal."
1346        );
1347
1348        let vote_tracker = Arc::<VoteTracker>::default();
1349
1350        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1351        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1352        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1353        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1354
1355        let entry_notification_sender = entry_notifier_service
1356            .as_ref()
1357            .map(|service| service.sender_cloned());
1358
1359        // test-validator crate may start the validator in a tokio runtime
1360        // context which forces us to use the same runtime because a nested
1361        // runtime will cause panic at drop.
1362        // Outside test-validator crate, we always need a tokio runtime (and
1363        // the respective handle) to initialize the turbine QUIC endpoint.
1364        let current_runtime_handle = tokio::runtime::Handle::try_current();
1365        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367            .then(|| {
1368                tokio::runtime::Builder::new_multi_thread()
1369                    .enable_all()
1370                    .thread_name("solTurbineQuic")
1371                    .build()
1372                    .unwrap()
1373            });
1374        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375        let (
1376            turbine_quic_endpoint,
1377            turbine_quic_endpoint_sender,
1378            turbine_quic_endpoint_join_handle,
1379        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381            (None, sender, None)
1382        } else {
1383            solana_turbine::quic_endpoint::new_quic_endpoint(
1384                turbine_quic_endpoint_runtime
1385                    .as_ref()
1386                    .map(TokioRuntime::handle)
1387                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388                &identity_keypair,
1389                node.sockets.tvu_quic,
1390                turbine_quic_endpoint_sender,
1391                bank_forks.clone(),
1392            )
1393            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394            .unwrap()
1395        };
1396
1397        // Repair quic endpoint.
1398        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400            .then(|| {
1401                tokio::runtime::Builder::new_multi_thread()
1402                    .enable_all()
1403                    .thread_name("solRepairQuic")
1404                    .build()
1405                    .unwrap()
1406            });
1407        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409                (None, RepairQuicAsyncSenders::new_dummy(), None)
1410            } else {
1411                let repair_quic_sockets = RepairQuicSockets {
1412                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1413                    repair_client_quic_socket: node.sockets.repair_quic,
1414                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415                };
1416                let repair_quic_senders = RepairQuicSenders {
1417                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1418                    repair_response_quic_sender,
1419                    ancestor_hashes_response_quic_sender,
1420                };
1421                repair::quic_endpoint::new_quic_endpoints(
1422                    repair_quic_endpoints_runtime
1423                        .as_ref()
1424                        .map(TokioRuntime::handle)
1425                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426                    &identity_keypair,
1427                    repair_quic_sockets,
1428                    repair_quic_senders,
1429                    bank_forks.clone(),
1430                )
1431                .map(|(endpoints, senders, join_handle)| {
1432                    (Some(endpoints), senders, Some(join_handle))
1433                })
1434                .unwrap()
1435            };
1436        let serve_repair_service = ServeRepairService::new(
1437            serve_repair,
1438            // Incoming UDP repair requests are adapted into RemoteRequest
1439            // and also sent through the same channel.
1440            repair_request_quic_sender,
1441            repair_request_quic_receiver,
1442            repair_quic_async_senders.repair_response_quic_sender,
1443            blockstore.clone(),
1444            node.sockets.serve_repair,
1445            socket_addr_space,
1446            stats_reporter_sender,
1447            exit.clone(),
1448        );
1449
1450        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1451        let wen_restart_repair_slots = if in_wen_restart {
1452            Some(Arc::new(RwLock::new(Vec::new())))
1453        } else {
1454            None
1455        };
1456        let tower = match process_blockstore.process_to_create_tower() {
1457            Ok(tower) => {
1458                info!("Tower state: {:?}", tower);
1459                tower
1460            }
1461            Err(e) => {
1462                warn!(
1463                    "Unable to retrieve tower: {:?} creating default tower....",
1464                    e
1465                );
1466                Tower::default()
1467            }
1468        };
1469        let last_vote = tower.last_vote();
1470
1471        let outstanding_repair_requests =
1472            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1473        let cluster_slots =
1474            Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1475
1476        let tvu = Tvu::new(
1477            vote_account,
1478            authorized_voter_keypairs,
1479            &bank_forks,
1480            &cluster_info,
1481            TvuSockets {
1482                repair: node.sockets.repair.try_clone().unwrap(),
1483                retransmit: node.sockets.retransmit_sockets,
1484                fetch: node.sockets.tvu,
1485                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1486            },
1487            blockstore.clone(),
1488            ledger_signal_receiver,
1489            &rpc_subscriptions,
1490            &poh_recorder,
1491            tower,
1492            config.tower_storage.clone(),
1493            &leader_schedule_cache,
1494            exit.clone(),
1495            block_commitment_cache,
1496            config.turbine_disabled.clone(),
1497            transaction_status_sender.clone(),
1498            block_meta_sender,
1499            entry_notification_sender.clone(),
1500            vote_tracker.clone(),
1501            retransmit_slots_sender,
1502            gossip_verified_vote_hash_receiver,
1503            verified_vote_receiver,
1504            replay_vote_sender.clone(),
1505            completed_data_sets_sender,
1506            bank_notification_sender.clone(),
1507            duplicate_confirmed_slots_receiver,
1508            TvuConfig {
1509                max_ledger_shreds: config.max_ledger_shreds,
1510                shred_version: node.info.shred_version(),
1511                repair_validators: config.repair_validators.clone(),
1512                repair_whitelist: config.repair_whitelist.clone(),
1513                wait_for_vote_to_start_leader,
1514                replay_forks_threads: config.replay_forks_threads,
1515                replay_transactions_threads: config.replay_transactions_threads,
1516                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1517            },
1518            &max_slots,
1519            block_metadata_notifier,
1520            config.wait_to_vote_slot,
1521            accounts_background_request_sender.clone(),
1522            config.runtime_config.log_messages_bytes_limit,
1523            json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service
1524            &prioritization_fee_cache,
1525            banking_tracer.clone(),
1526            turbine_quic_endpoint_sender.clone(),
1527            turbine_quic_endpoint_receiver,
1528            repair_response_quic_receiver,
1529            repair_quic_async_senders.repair_request_quic_sender,
1530            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1531            ancestor_hashes_response_quic_receiver,
1532            outstanding_repair_requests.clone(),
1533            cluster_slots.clone(),
1534            wen_restart_repair_slots.clone(),
1535            slot_status_notifier,
1536            vote_connection_cache,
1537        )
1538        .map_err(ValidatorError::Other)?;
1539
1540        if in_wen_restart {
1541            info!("Waiting for wen_restart to finish");
1542            wait_for_wen_restart(WenRestartConfig {
1543                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1544                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1545                last_vote,
1546                blockstore: blockstore.clone(),
1547                cluster_info: cluster_info.clone(),
1548                bank_forks: bank_forks.clone(),
1549                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1550                wait_for_supermajority_threshold_percent:
1551                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1552                snapshot_config: config.snapshot_config.clone(),
1553                accounts_background_request_sender: accounts_background_request_sender.clone(),
1554                genesis_config_hash: genesis_config.hash(),
1555                exit: exit.clone(),
1556            })?;
1557            return Err(ValidatorError::WenRestartFinished.into());
1558        }
1559
1560        let (tpu, mut key_notifies) = Tpu::new(
1561            &cluster_info,
1562            &poh_recorder,
1563            entry_receiver,
1564            retransmit_slots_receiver,
1565            TpuSockets {
1566                transactions: node.sockets.tpu,
1567                transaction_forwards: node.sockets.tpu_forwards,
1568                vote: node.sockets.tpu_vote,
1569                broadcast: node.sockets.broadcast,
1570                transactions_quic: node.sockets.tpu_quic,
1571                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1572                vote_quic: node.sockets.tpu_vote_quic,
1573            },
1574            &rpc_subscriptions,
1575            transaction_status_sender,
1576            entry_notification_sender,
1577            blockstore.clone(),
1578            &config.broadcast_stage_type,
1579            exit,
1580            node.info.shred_version(),
1581            vote_tracker,
1582            bank_forks.clone(),
1583            verified_vote_sender,
1584            gossip_verified_vote_hash_sender,
1585            replay_vote_receiver,
1586            replay_vote_sender,
1587            bank_notification_sender.map(|sender| sender.sender),
1588            config.tpu_coalesce,
1589            duplicate_confirmed_slot_sender,
1590            &connection_cache,
1591            turbine_quic_endpoint_sender,
1592            &identity_keypair,
1593            config.runtime_config.log_messages_bytes_limit,
1594            &staked_nodes,
1595            config.staked_nodes_overrides.clone(),
1596            banking_tracer_channels,
1597            tracer_thread,
1598            tpu_enable_udp,
1599            tpu_quic_server_config,
1600            tpu_fwd_quic_server_config,
1601            vote_quic_server_config,
1602            &prioritization_fee_cache,
1603            config.block_production_method.clone(),
1604            config.transaction_struct.clone(),
1605            config.enable_block_production_forwarding,
1606            config.generator_config.clone(),
1607        );
1608
1609        datapoint_info!(
1610            "validator-new",
1611            ("id", id.to_string(), String),
1612            ("version", solana_version::version!(), String),
1613            ("cluster_type", genesis_config.cluster_type as u32, i64),
1614            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1615            ("waited_for_supermajority", waited_for_supermajority, bool),
1616            ("shred_version", shred_version as i64, i64),
1617        );
1618
1619        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1620        key_notifies.push(connection_cache);
1621
1622        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1623            bank_forks: bank_forks.clone(),
1624            cluster_info: cluster_info.clone(),
1625            vote_account: *vote_account,
1626            repair_whitelist: config.repair_whitelist.clone(),
1627            notifies: key_notifies,
1628            repair_socket: Arc::new(node.sockets.repair),
1629            outstanding_repair_requests,
1630            cluster_slots,
1631        });
1632
1633        Ok(Self {
1634            stats_reporter_service,
1635            gossip_service,
1636            serve_repair_service,
1637            json_rpc_service,
1638            pubsub_service,
1639            rpc_completed_slots_service,
1640            optimistically_confirmed_bank_tracker,
1641            transaction_status_service,
1642            block_meta_service,
1643            entry_notifier_service,
1644            system_monitor_service,
1645            sample_performance_service,
1646            poh_timing_report_service,
1647            snapshot_packager_service,
1648            completed_data_sets_service,
1649            tpu,
1650            tvu,
1651            poh_service,
1652            poh_recorder,
1653            ip_echo_server,
1654            validator_exit: config.validator_exit.clone(),
1655            cluster_info,
1656            bank_forks,
1657            blockstore,
1658            geyser_plugin_service,
1659            blockstore_metric_report_service,
1660            accounts_background_service,
1661            accounts_hash_verifier,
1662            turbine_quic_endpoint,
1663            turbine_quic_endpoint_runtime,
1664            turbine_quic_endpoint_join_handle,
1665            repair_quic_endpoints,
1666            repair_quic_endpoints_runtime,
1667            repair_quic_endpoints_join_handle,
1668            thread_manager,
1669        })
1670    }
1671
1672    // Used for notifying many nodes in parallel to exit
1673    pub fn exit(&mut self) {
1674        self.validator_exit.write().unwrap().exit();
1675
1676        // drop all signals in blockstore
1677        self.blockstore.drop_signal();
1678    }
1679
1680    pub fn close(mut self) {
1681        self.exit();
1682        self.join();
1683    }
1684
1685    fn print_node_info(node: &Node) {
1686        info!("{:?}", node.info);
1687        info!(
1688            "local gossip address: {}",
1689            node.sockets.gossip.local_addr().unwrap()
1690        );
1691        info!(
1692            "local broadcast address: {}",
1693            node.sockets
1694                .broadcast
1695                .first()
1696                .unwrap()
1697                .local_addr()
1698                .unwrap()
1699        );
1700        info!(
1701            "local repair address: {}",
1702            node.sockets.repair.local_addr().unwrap()
1703        );
1704        info!(
1705            "local retransmit address: {}",
1706            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1707        );
1708    }
1709
1710    pub fn join(self) {
1711        drop(self.bank_forks);
1712        drop(self.cluster_info);
1713
1714        self.poh_service.join().expect("poh_service");
1715        drop(self.poh_recorder);
1716
1717        if let Some(json_rpc_service) = self.json_rpc_service {
1718            json_rpc_service.join().expect("rpc_service");
1719        }
1720
1721        if let Some(pubsub_service) = self.pubsub_service {
1722            pubsub_service.join().expect("pubsub_service");
1723        }
1724
1725        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1726            rpc_completed_slots_service
1727                .join()
1728                .expect("rpc_completed_slots_service");
1729        }
1730
1731        if let Some(optimistically_confirmed_bank_tracker) =
1732            self.optimistically_confirmed_bank_tracker
1733        {
1734            optimistically_confirmed_bank_tracker
1735                .join()
1736                .expect("optimistically_confirmed_bank_tracker");
1737        }
1738
1739        if let Some(transaction_status_service) = self.transaction_status_service {
1740            transaction_status_service
1741                .join()
1742                .expect("transaction_status_service");
1743        }
1744
1745        if let Some(block_meta_service) = self.block_meta_service {
1746            block_meta_service.join().expect("block_meta_service");
1747        }
1748
1749        if let Some(system_monitor_service) = self.system_monitor_service {
1750            system_monitor_service
1751                .join()
1752                .expect("system_monitor_service");
1753        }
1754
1755        if let Some(sample_performance_service) = self.sample_performance_service {
1756            sample_performance_service
1757                .join()
1758                .expect("sample_performance_service");
1759        }
1760
1761        if let Some(entry_notifier_service) = self.entry_notifier_service {
1762            entry_notifier_service
1763                .join()
1764                .expect("entry_notifier_service");
1765        }
1766
1767        if let Some(s) = self.snapshot_packager_service {
1768            s.join().expect("snapshot_packager_service");
1769        }
1770
1771        self.gossip_service.join().expect("gossip_service");
1772        self.repair_quic_endpoints
1773            .iter()
1774            .flatten()
1775            .for_each(repair::quic_endpoint::close_quic_endpoint);
1776        self.serve_repair_service
1777            .join()
1778            .expect("serve_repair_service");
1779        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1780            self.repair_quic_endpoints_runtime
1781                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1782                .transpose()
1783                .unwrap();
1784        }
1785        self.stats_reporter_service
1786            .join()
1787            .expect("stats_reporter_service");
1788        self.blockstore_metric_report_service
1789            .join()
1790            .expect("ledger_metric_report_service");
1791        self.accounts_background_service
1792            .join()
1793            .expect("accounts_background_service");
1794        self.accounts_hash_verifier
1795            .join()
1796            .expect("accounts_hash_verifier");
1797        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1798            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1799        }
1800        self.tpu.join().expect("tpu");
1801        self.tvu.join().expect("tvu");
1802        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1803            self.turbine_quic_endpoint_runtime
1804                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1805                .transpose()
1806                .unwrap();
1807        }
1808        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1809            completed_data_sets_service
1810                .join()
1811                .expect("completed_data_sets_service");
1812        }
1813        if let Some(ip_echo_server) = self.ip_echo_server {
1814            ip_echo_server.shutdown_background();
1815        }
1816
1817        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1818            geyser_plugin_service.join().expect("geyser_plugin_service");
1819        }
1820
1821        self.poh_timing_report_service
1822            .join()
1823            .expect("poh_timing_report_service");
1824        self.thread_manager.destroy();
1825    }
1826}
1827
1828fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1829    if let Some(account) = &bank.get_account(vote_account) {
1830        if let Some(vote_state) = vote_state::from(account) {
1831            return !vote_state.votes.is_empty();
1832        }
1833    }
1834    false
1835}
1836
1837fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1838    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1839        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1840        return Ok(());
1841    };
1842
1843    let ticks_per_slot = bank.ticks_per_slot();
1844    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1845    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1846
1847    let hash_time = compute_hash_time(hash_samples);
1848    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1849
1850    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1851    let target_hashes_per_second =
1852        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1853
1854    info!(
1855        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1856         second {target_hashes_per_second}"
1857    );
1858    if my_hashes_per_second < target_hashes_per_second {
1859        return Err(ValidatorError::PohTooSlow {
1860            mine: my_hashes_per_second,
1861            target: target_hashes_per_second,
1862        });
1863    }
1864
1865    Ok(())
1866}
1867
1868fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1869    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
1870    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1871        if wait_slot_for_supermajority == root_slot {
1872            return Some(wait_slot_for_supermajority);
1873        }
1874    }
1875
1876    None
1877}
1878
1879fn post_process_restored_tower(
1880    restored_tower: crate::consensus::Result<Tower>,
1881    validator_identity: &Pubkey,
1882    vote_account: &Pubkey,
1883    config: &ValidatorConfig,
1884    bank_forks: &BankForks,
1885) -> Result<Tower, String> {
1886    let mut should_require_tower = config.require_tower;
1887
1888    let restored_tower = restored_tower.and_then(|tower| {
1889        let root_bank = bank_forks.root_bank();
1890        let slot_history = root_bank.get_slot_history();
1891        // make sure tower isn't corrupted first before the following hard fork check
1892        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1893
1894        if let Some(hard_fork_restart_slot) =
1895            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1896        {
1897            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
1898            // out-of-chain vote state doesn't make sense at all
1899            // what if --wait-for-supermajority again if the validator restarted?
1900            let message =
1901                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1902            datapoint_error!("tower_error", ("error", message, String),);
1903            error!("{}", message);
1904
1905            // unconditionally relax tower requirement so that we can always restore tower
1906            // from root bank.
1907            should_require_tower = false;
1908            return Err(crate::consensus::TowerError::HardFork(
1909                hard_fork_restart_slot,
1910            ));
1911        }
1912
1913        if let Some(warp_slot) = config.warp_slot {
1914            // unconditionally relax tower requirement so that we can always restore tower
1915            // from root bank after the warp
1916            should_require_tower = false;
1917            return Err(crate::consensus::TowerError::HardFork(warp_slot));
1918        }
1919
1920        tower
1921    });
1922
1923    let restored_tower = match restored_tower {
1924        Ok(tower) => tower,
1925        Err(err) => {
1926            let voting_has_been_active =
1927                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1928            if !err.is_file_missing() {
1929                datapoint_error!(
1930                    "tower_error",
1931                    ("error", format!("Unable to restore tower: {err}"), String),
1932                );
1933            }
1934            if should_require_tower && voting_has_been_active {
1935                return Err(format!(
1936                    "Requested mandatory tower restore failed: {err}. And there is an existing \
1937                     vote_account containing actual votes. Aborting due to possible conflicting \
1938                     duplicate votes"
1939                ));
1940            }
1941            if err.is_file_missing() && !voting_has_been_active {
1942                // Currently, don't protect against spoofed snapshots with no tower at all
1943                info!(
1944                    "Ignoring expected failed tower restore because this is the initial validator \
1945                     start with the vote account..."
1946                );
1947            } else {
1948                error!(
1949                    "Rebuilding a new tower from the latest vote account due to failed tower \
1950                     restore: {}",
1951                    err
1952                );
1953            }
1954
1955            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
1956        }
1957    };
1958
1959    Ok(restored_tower)
1960}
1961
1962fn load_genesis(
1963    config: &ValidatorConfig,
1964    ledger_path: &Path,
1965) -> Result<GenesisConfig, ValidatorError> {
1966    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
1967        .map_err(ValidatorError::OpenGenesisConfig)?;
1968
1969    // This needs to be limited otherwise the state in the VoteAccount data
1970    // grows too large
1971    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
1972    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
1973    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
1974    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
1975
1976    let genesis_hash = genesis_config.hash();
1977    info!("genesis hash: {}", genesis_hash);
1978
1979    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
1980        if genesis_hash != expected_genesis_hash {
1981            return Err(ValidatorError::GenesisHashMismatch(
1982                genesis_hash,
1983                expected_genesis_hash,
1984            ));
1985        }
1986    }
1987
1988    Ok(genesis_config)
1989}
1990
1991#[allow(clippy::type_complexity)]
1992fn load_blockstore(
1993    config: &ValidatorConfig,
1994    ledger_path: &Path,
1995    genesis_config: &GenesisConfig,
1996    exit: Arc<AtomicBool>,
1997    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1998    accounts_update_notifier: Option<AccountsUpdateNotifier>,
1999    transaction_notifier: Option<TransactionNotifierArc>,
2000    entry_notifier: Option<EntryNotifierArc>,
2001    poh_timing_point_sender: Option<PohTimingSender>,
2002) -> Result<
2003    (
2004        Arc<RwLock<BankForks>>,
2005        Arc<Blockstore>,
2006        Slot,
2007        Receiver<bool>,
2008        LeaderScheduleCache,
2009        Option<StartingSnapshotHashes>,
2010        TransactionHistoryServices,
2011        blockstore_processor::ProcessOptions,
2012        BlockstoreRootScan,
2013        DroppedSlotsReceiver,
2014        Option<EntryNotifierService>,
2015    ),
2016    String,
2017> {
2018    info!("loading ledger from {:?}...", ledger_path);
2019    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2020
2021    let mut blockstore =
2022        Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2023            .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2024
2025    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2026    blockstore.add_new_shred_signal(ledger_signal_sender);
2027
2028    blockstore.shred_timing_point_sender = poh_timing_point_sender;
2029    // following boot sequence (esp BankForks) could set root. so stash the original value
2030    // of blockstore root away here as soon as possible.
2031    let original_blockstore_root = blockstore.max_root();
2032
2033    let blockstore = Arc::new(blockstore);
2034    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2035    let halt_at_slot = config
2036        .halt_at_slot
2037        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2038
2039    let process_options = blockstore_processor::ProcessOptions {
2040        run_verification: config.run_verification,
2041        halt_at_slot,
2042        new_hard_forks: config.new_hard_forks.clone(),
2043        debug_keys: config.debug_keys.clone(),
2044        accounts_db_config: config.accounts_db_config.clone(),
2045        accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
2046        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2047        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2048        runtime_config: config.runtime_config.clone(),
2049        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2050        ..blockstore_processor::ProcessOptions::default()
2051    };
2052
2053    let enable_rpc_transaction_history =
2054        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2055    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2056    let transaction_history_services =
2057        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2058            initialize_rpc_transaction_history_services(
2059                blockstore.clone(),
2060                exit.clone(),
2061                enable_rpc_transaction_history,
2062                config.rpc_config.enable_extended_tx_metadata_storage,
2063                transaction_notifier,
2064            )
2065        } else {
2066            TransactionHistoryServices::default()
2067        };
2068
2069    let entry_notifier_service = entry_notifier
2070        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2071
2072    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2073        bank_forks_utils::load_bank_forks(
2074            genesis_config,
2075            &blockstore,
2076            config.account_paths.clone(),
2077            Some(&config.snapshot_config),
2078            &process_options,
2079            transaction_history_services.block_meta_sender.as_ref(),
2080            entry_notifier_service
2081                .as_ref()
2082                .map(|service| service.sender()),
2083            accounts_update_notifier,
2084            exit,
2085        )
2086        .map_err(|err| err.to_string())?;
2087
2088    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2089    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2090    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2091    // cache flush, clean, shrink, as long as the same thread performing those activities also
2092    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2093    let pruned_banks_receiver =
2094        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2095
2096    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2097    {
2098        let mut bank_forks = bank_forks.write().unwrap();
2099        bank_forks.set_snapshot_config(Some(config.snapshot_config.clone()));
2100        bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
2101    }
2102
2103    Ok((
2104        bank_forks,
2105        blockstore,
2106        original_blockstore_root,
2107        ledger_signal_receiver,
2108        leader_schedule_cache,
2109        starting_snapshot_hashes,
2110        transaction_history_services,
2111        process_options,
2112        blockstore_root_scan,
2113        pruned_banks_receiver,
2114        entry_notifier_service,
2115    ))
2116}
2117
2118pub struct ProcessBlockStore<'a> {
2119    id: &'a Pubkey,
2120    vote_account: &'a Pubkey,
2121    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2122    blockstore: &'a Blockstore,
2123    original_blockstore_root: Slot,
2124    bank_forks: &'a Arc<RwLock<BankForks>>,
2125    leader_schedule_cache: &'a LeaderScheduleCache,
2126    process_options: &'a blockstore_processor::ProcessOptions,
2127    transaction_status_sender: Option<&'a TransactionStatusSender>,
2128    block_meta_sender: Option<BlockMetaSender>,
2129    entry_notification_sender: Option<&'a EntryNotifierSender>,
2130    blockstore_root_scan: Option<BlockstoreRootScan>,
2131    accounts_background_request_sender: AbsRequestSender,
2132    config: &'a ValidatorConfig,
2133    tower: Option<Tower>,
2134}
2135
2136impl<'a> ProcessBlockStore<'a> {
2137    #[allow(clippy::too_many_arguments)]
2138    fn new(
2139        id: &'a Pubkey,
2140        vote_account: &'a Pubkey,
2141        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2142        blockstore: &'a Blockstore,
2143        original_blockstore_root: Slot,
2144        bank_forks: &'a Arc<RwLock<BankForks>>,
2145        leader_schedule_cache: &'a LeaderScheduleCache,
2146        process_options: &'a blockstore_processor::ProcessOptions,
2147        transaction_status_sender: Option<&'a TransactionStatusSender>,
2148        block_meta_sender: Option<BlockMetaSender>,
2149        entry_notification_sender: Option<&'a EntryNotifierSender>,
2150        blockstore_root_scan: BlockstoreRootScan,
2151        accounts_background_request_sender: AbsRequestSender,
2152        config: &'a ValidatorConfig,
2153    ) -> Self {
2154        Self {
2155            id,
2156            vote_account,
2157            start_progress,
2158            blockstore,
2159            original_blockstore_root,
2160            bank_forks,
2161            leader_schedule_cache,
2162            process_options,
2163            transaction_status_sender,
2164            block_meta_sender,
2165            entry_notification_sender,
2166            blockstore_root_scan: Some(blockstore_root_scan),
2167            accounts_background_request_sender,
2168            config,
2169            tower: None,
2170        }
2171    }
2172
2173    pub(crate) fn process(&mut self) -> Result<(), String> {
2174        if self.tower.is_none() {
2175            let previous_start_process = *self.start_progress.read().unwrap();
2176            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2177
2178            let exit = Arc::new(AtomicBool::new(false));
2179            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2180                let bank_forks = self.bank_forks.clone();
2181                let exit = exit.clone();
2182                let start_progress = self.start_progress.clone();
2183
2184                let _ = Builder::new()
2185                    .name("solRptLdgrStat".to_string())
2186                    .spawn(move || {
2187                        while !exit.load(Ordering::Relaxed) {
2188                            let slot = bank_forks.read().unwrap().working_bank().slot();
2189                            *start_progress.write().unwrap() =
2190                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2191                            sleep(Duration::from_secs(2));
2192                        }
2193                    })
2194                    .unwrap();
2195            }
2196            blockstore_processor::process_blockstore_from_root(
2197                self.blockstore,
2198                self.bank_forks,
2199                self.leader_schedule_cache,
2200                self.process_options,
2201                self.transaction_status_sender,
2202                self.block_meta_sender.as_ref(),
2203                self.entry_notification_sender,
2204                &self.accounts_background_request_sender,
2205            )
2206            .map_err(|err| {
2207                exit.store(true, Ordering::Relaxed);
2208                format!("Failed to load ledger: {err:?}")
2209            })?;
2210            exit.store(true, Ordering::Relaxed);
2211
2212            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2213                blockstore_root_scan.join();
2214            }
2215
2216            self.tower = Some({
2217                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2218                if let Ok(tower) = &restored_tower {
2219                    // reconciliation attempt 1 of 2 with tower
2220                    reconcile_blockstore_roots_with_external_source(
2221                        ExternalRootSource::Tower(tower.root()),
2222                        self.blockstore,
2223                        &mut self.original_blockstore_root,
2224                    )
2225                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2226                }
2227
2228                post_process_restored_tower(
2229                    restored_tower,
2230                    self.id,
2231                    self.vote_account,
2232                    self.config,
2233                    &self.bank_forks.read().unwrap(),
2234                )?
2235            });
2236
2237            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2238                self.config,
2239                self.bank_forks.read().unwrap().root(),
2240            ) {
2241                // reconciliation attempt 2 of 2 with hard fork
2242                // this should be #2 because hard fork root > tower root in almost all cases
2243                reconcile_blockstore_roots_with_external_source(
2244                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2245                    self.blockstore,
2246                    &mut self.original_blockstore_root,
2247                )
2248                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2249            }
2250
2251            *self.start_progress.write().unwrap() = previous_start_process;
2252        }
2253        Ok(())
2254    }
2255
2256    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2257        self.process()?;
2258        Ok(self.tower.unwrap())
2259    }
2260}
2261
2262fn maybe_warp_slot(
2263    config: &ValidatorConfig,
2264    process_blockstore: &mut ProcessBlockStore,
2265    ledger_path: &Path,
2266    bank_forks: &RwLock<BankForks>,
2267    leader_schedule_cache: &LeaderScheduleCache,
2268    accounts_background_request_sender: &AbsRequestSender,
2269) -> Result<(), String> {
2270    if let Some(warp_slot) = config.warp_slot {
2271        let mut bank_forks = bank_forks.write().unwrap();
2272
2273        let working_bank = bank_forks.working_bank();
2274
2275        if warp_slot <= working_bank.slot() {
2276            return Err(format!(
2277                "warp slot ({}) cannot be less than the working bank slot ({})",
2278                warp_slot,
2279                working_bank.slot()
2280            ));
2281        }
2282        info!("warping to slot {}", warp_slot);
2283
2284        let root_bank = bank_forks.root_bank();
2285
2286        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2287        // requires that the accounts cache has been flushed, which requires the parent slot to be
2288        // rooted.
2289        root_bank.squash();
2290        root_bank.force_flush_accounts_cache();
2291
2292        bank_forks.insert(Bank::warp_from_parent(
2293            root_bank,
2294            &Pubkey::default(),
2295            warp_slot,
2296            solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
2297        ));
2298        bank_forks
2299            .set_root(
2300                warp_slot,
2301                accounts_background_request_sender,
2302                Some(warp_slot),
2303            )
2304            .map_err(|err| err.to_string())?;
2305        leader_schedule_cache.set_root(&bank_forks.root_bank());
2306
2307        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2308            ledger_path,
2309            &bank_forks.root_bank(),
2310            None,
2311            &config.snapshot_config.full_snapshot_archives_dir,
2312            &config.snapshot_config.incremental_snapshot_archives_dir,
2313            config.snapshot_config.archive_format,
2314        ) {
2315            Ok(archive_info) => archive_info,
2316            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2317        };
2318        info!(
2319            "created snapshot: {}",
2320            full_snapshot_archive_info.path().display()
2321        );
2322
2323        drop(bank_forks);
2324        // Process blockstore after warping bank forks to make sure tower and
2325        // bank forks are in sync.
2326        process_blockstore.process()?;
2327    }
2328    Ok(())
2329}
2330
2331/// Returns the starting slot at which the blockstore should be scanned for
2332/// shreds with an incorrect shred version, or None if the check is unnecessary
2333fn should_cleanup_blockstore_incorrect_shred_versions(
2334    config: &ValidatorConfig,
2335    blockstore: &Blockstore,
2336    root_slot: Slot,
2337    hard_forks: &HardForks,
2338) -> Result<Option<Slot>, BlockstoreError> {
2339    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2340    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2341    if maybe_cluster_restart_slot.is_some() {
2342        return Ok(Some(root_slot + 1));
2343    }
2344
2345    // If there are no hard forks, the shred version cannot have changed
2346    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2347        return Ok(None);
2348    };
2349
2350    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2351    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2352        return Ok(None);
2353    };
2354    let blockstore_min_slot = blockstore.lowest_slot();
2355    info!(
2356        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2357        latest hard fork is {latest_hard_fork}"
2358    );
2359
2360    if latest_hard_fork < blockstore_min_slot {
2361        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2362        //
2363        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2364        // the correct shred version should have been inserted since the latest hard fork
2365        //
2366        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2367        // can skip the check for this case
2368        Ok(None)
2369    } else if latest_hard_fork < blockstore_max_slot {
2370        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2371        //
2372        // This could be a case where there was a cluster restart, but this node was not part of
2373        // the supermajority that actually restarted the cluster. Rather, this node likely
2374        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2375        // chosen restart slot. We need to perform the blockstore check for this case
2376        //
2377        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2378        // fork slot. Even though this node will only replay slots after root_slot, start the check
2379        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2380        Ok(Some(latest_hard_fork + 1))
2381    } else {
2382        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2383        //
2384        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2385        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2386        //
2387        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2388        // and just restarted with a new downloaded snapshot but the old blockstore
2389        Ok(None)
2390    }
2391}
2392
2393/// Searches the blockstore for data shreds with a shred version that differs
2394/// from the passed `expected_shred_version`
2395fn scan_blockstore_for_incorrect_shred_version(
2396    blockstore: &Blockstore,
2397    start_slot: Slot,
2398    expected_shred_version: u16,
2399) -> Result<Option<u16>, BlockstoreError> {
2400    const TIMEOUT: Duration = Duration::from_secs(60);
2401    let timer = Instant::now();
2402    // Search for shreds with incompatible version in blockstore
2403    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2404
2405    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2406    for (slot, _meta) in slot_meta_iterator {
2407        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2408        for shred in &shreds {
2409            if shred.version() != expected_shred_version {
2410                return Ok(Some(shred.version()));
2411            }
2412        }
2413        if timer.elapsed() > TIMEOUT {
2414            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2415            break;
2416        }
2417    }
2418    Ok(None)
2419}
2420
2421/// If the blockstore contains any shreds with the incorrect shred version,
2422/// copy them to a backup blockstore and purge them from the actual blockstore.
2423fn cleanup_blockstore_incorrect_shred_versions(
2424    blockstore: &Blockstore,
2425    config: &ValidatorConfig,
2426    start_slot: Slot,
2427    expected_shred_version: u16,
2428) -> Result<(), BlockstoreError> {
2429    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2430        blockstore,
2431        start_slot,
2432        expected_shred_version,
2433    )?;
2434    let Some(incorrect_shred_version) = incorrect_shred_version else {
2435        info!("Only shreds with the correct version were found in the blockstore");
2436        return Ok(());
2437    };
2438
2439    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2440    let end_slot = blockstore.highest_slot()?.unwrap();
2441
2442    // Backing up the shreds that will be deleted from primary blockstore is
2443    // not critical, so swallow errors from backup blockstore operations.
2444    let backup_folder = format!(
2445        "{}_backup_{}_{}_{}",
2446        BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, incorrect_shred_version, start_slot, end_slot
2447    );
2448    match Blockstore::open_with_options(
2449        &blockstore.ledger_path().join(backup_folder),
2450        config.blockstore_options.clone(),
2451    ) {
2452        Ok(backup_blockstore) => {
2453            info!("Backing up slots from {start_slot} to {end_slot}");
2454            let mut timer = Measure::start("blockstore backup");
2455
2456            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2457            let mut print_timer = Instant::now();
2458            let mut num_slots_copied = 0;
2459            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2460            for (slot, _meta) in slot_meta_iterator {
2461                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2462                let _ = backup_blockstore.insert_shreds(shreds, None, true);
2463                num_slots_copied += 1;
2464
2465                if print_timer.elapsed() > PRINT_INTERVAL {
2466                    info!("Backed up {num_slots_copied} slots thus far");
2467                    print_timer = Instant::now();
2468                }
2469            }
2470
2471            timer.stop();
2472            info!("Backing up slots done. {timer}");
2473        }
2474        Err(err) => {
2475            warn!("Unable to backup shreds with incorrect shred version: {err}");
2476        }
2477    }
2478
2479    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2480    let mut timer = Measure::start("blockstore purge");
2481    blockstore.purge_from_next_slots(start_slot, end_slot);
2482    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2483    timer.stop();
2484    info!("Purging slots done. {timer}");
2485
2486    Ok(())
2487}
2488
2489fn initialize_rpc_transaction_history_services(
2490    blockstore: Arc<Blockstore>,
2491    exit: Arc<AtomicBool>,
2492    enable_rpc_transaction_history: bool,
2493    enable_extended_tx_metadata_storage: bool,
2494    transaction_notifier: Option<TransactionNotifierArc>,
2495) -> TransactionHistoryServices {
2496    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2497    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2498    let transaction_status_sender = Some(TransactionStatusSender {
2499        sender: transaction_status_sender,
2500    });
2501    let transaction_status_service = Some(TransactionStatusService::new(
2502        transaction_status_receiver,
2503        max_complete_transaction_status_slot.clone(),
2504        enable_rpc_transaction_history,
2505        transaction_notifier,
2506        blockstore.clone(),
2507        enable_extended_tx_metadata_storage,
2508        exit.clone(),
2509    ));
2510
2511    let max_complete_rewards_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2512    let (block_meta_sender, block_meta_receiver) = unbounded();
2513    let block_meta_sender = Some(block_meta_sender);
2514    let block_meta_service = Some(BlockMetaService::new(
2515        block_meta_receiver,
2516        blockstore,
2517        max_complete_rewards_slot.clone(),
2518        exit,
2519    ));
2520    TransactionHistoryServices {
2521        transaction_status_sender,
2522        transaction_status_service,
2523        max_complete_transaction_status_slot,
2524        max_complete_rewards_slot,
2525        block_meta_sender,
2526        block_meta_service,
2527    }
2528}
2529
2530#[derive(Error, Debug)]
2531pub enum ValidatorError {
2532    #[error("Bad expected bank hash")]
2533    BadExpectedBankHash,
2534
2535    #[error("blockstore error: {0}")]
2536    Blockstore(#[source] BlockstoreError),
2537
2538    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2539    GenesisHashMismatch(Hash, Hash),
2540
2541    #[error("Ledger does not have enough data to wait for supermajority")]
2542    NotEnoughLedgerData,
2543
2544    #[error("failed to open genesis: {0}")]
2545    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2546
2547    #[error("{0}")]
2548    Other(String),
2549
2550    #[error(
2551        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2552    )]
2553    PohTooSlow { mine: u64, target: u64 },
2554
2555    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2556    ShredVersionMismatch { actual: u16, expected: u16 },
2557
2558    #[error(transparent)]
2559    TraceError(#[from] TraceError),
2560
2561    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2562    WenRestartFinished,
2563}
2564
2565// Return if the validator waited on other nodes to start. In this case
2566// it should not wait for one of it's votes to land to produce blocks
2567// because if the whole network is waiting, then it will stall.
2568//
2569// Error indicates that a bad hash was encountered or another condition
2570// that is unrecoverable and the validator should exit.
2571fn wait_for_supermajority(
2572    config: &ValidatorConfig,
2573    process_blockstore: Option<&mut ProcessBlockStore>,
2574    bank_forks: &RwLock<BankForks>,
2575    cluster_info: &ClusterInfo,
2576    rpc_override_health_check: Arc<AtomicBool>,
2577    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2578) -> Result<bool, ValidatorError> {
2579    match config.wait_for_supermajority {
2580        None => Ok(false),
2581        Some(wait_for_supermajority_slot) => {
2582            if let Some(process_blockstore) = process_blockstore {
2583                process_blockstore
2584                    .process()
2585                    .map_err(ValidatorError::Other)?;
2586            }
2587
2588            let bank = bank_forks.read().unwrap().working_bank();
2589            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2590                std::cmp::Ordering::Less => return Ok(false),
2591                std::cmp::Ordering::Greater => {
2592                    error!(
2593                        "Ledger does not have enough data to wait for supermajority, please \
2594                         enable snapshot fetch. Has {} needs {}",
2595                        bank.slot(),
2596                        wait_for_supermajority_slot
2597                    );
2598                    return Err(ValidatorError::NotEnoughLedgerData);
2599                }
2600                _ => {}
2601            }
2602
2603            if let Some(expected_bank_hash) = config.expected_bank_hash {
2604                if bank.hash() != expected_bank_hash {
2605                    error!(
2606                        "Bank hash({}) does not match expected value: {}",
2607                        bank.hash(),
2608                        expected_bank_hash
2609                    );
2610                    return Err(ValidatorError::BadExpectedBankHash);
2611                }
2612            }
2613
2614            for i in 1.. {
2615                let logging = i % 10 == 1;
2616                if logging {
2617                    info!(
2618                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2619                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2620                        bank.slot()
2621                    );
2622                }
2623
2624                let gossip_stake_percent =
2625                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2626
2627                *start_progress.write().unwrap() =
2628                    ValidatorStartProgress::WaitingForSupermajority {
2629                        slot: wait_for_supermajority_slot,
2630                        gossip_stake_percent,
2631                    };
2632
2633                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2634                    info!(
2635                        "Supermajority reached, {}% active stake detected, starting up now.",
2636                        gossip_stake_percent,
2637                    );
2638                    break;
2639                }
2640                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2641                // prevent load balancers from removing the node from their list of candidates during a
2642                // manual restart.
2643                rpc_override_health_check.store(true, Ordering::Relaxed);
2644                sleep(Duration::new(1, 0));
2645            }
2646            rpc_override_health_check.store(false, Ordering::Relaxed);
2647            Ok(true)
2648        }
2649    }
2650}
2651
2652// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2653fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2654    let mut online_stake = 0;
2655    let mut wrong_shred_stake = 0;
2656    let mut wrong_shred_nodes = vec![];
2657    let mut offline_stake = 0;
2658    let mut offline_nodes = vec![];
2659
2660    let mut total_activated_stake = 0;
2661    let now = timestamp();
2662    // Nodes contact infos are saved to disk and restored on validator startup.
2663    // Staked nodes entries will not expire until an epoch after. So it
2664    // is necessary here to filter for recent entries to establish liveness.
2665    let peers: HashMap<_, _> = cluster_info
2666        .all_tvu_peers()
2667        .into_iter()
2668        .filter(|node| {
2669            let age = now.saturating_sub(node.wallclock());
2670            // Contact infos are refreshed twice during this period.
2671            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2672        })
2673        .map(|node| (*node.pubkey(), node))
2674        .collect();
2675    let my_shred_version = cluster_info.my_shred_version();
2676    let my_id = cluster_info.id();
2677
2678    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2679        let activated_stake = *activated_stake;
2680        total_activated_stake += activated_stake;
2681
2682        if activated_stake == 0 {
2683            continue;
2684        }
2685        let vote_state_node_pubkey = *vote_account.node_pubkey();
2686
2687        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2688            if peer.shred_version() == my_shred_version {
2689                trace!(
2690                    "observed {} in gossip, (activated_stake={})",
2691                    vote_state_node_pubkey,
2692                    activated_stake
2693                );
2694                online_stake += activated_stake;
2695            } else {
2696                wrong_shred_stake += activated_stake;
2697                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2698            }
2699        } else if vote_state_node_pubkey == my_id {
2700            online_stake += activated_stake; // This node is online
2701        } else {
2702            offline_stake += activated_stake;
2703            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2704        }
2705    }
2706
2707    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2708    if log {
2709        info!(
2710            "{:.3}% of active stake visible in gossip",
2711            online_stake_percentage
2712        );
2713
2714        if !wrong_shred_nodes.is_empty() {
2715            info!(
2716                "{:.3}% of active stake has the wrong shred version in gossip",
2717                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2718            );
2719            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2720            for (stake, identity) in wrong_shred_nodes {
2721                info!(
2722                    "    {:.3}% - {}",
2723                    (stake as f64 / total_activated_stake as f64) * 100.,
2724                    identity
2725                );
2726            }
2727        }
2728
2729        if !offline_nodes.is_empty() {
2730            info!(
2731                "{:.3}% of active stake is not visible in gossip",
2732                (offline_stake as f64 / total_activated_stake as f64) * 100.
2733            );
2734            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2735            for (stake, identity) in offline_nodes {
2736                info!(
2737                    "    {:.3}% - {}",
2738                    (stake as f64 / total_activated_stake as f64) * 100.,
2739                    identity
2740                );
2741            }
2742        }
2743    }
2744
2745    online_stake_percentage as u64
2746}
2747
2748fn cleanup_accounts_paths(config: &ValidatorConfig) {
2749    for account_path in &config.account_paths {
2750        move_and_async_delete_path_contents(account_path);
2751    }
2752    if let Some(shrink_paths) = config
2753        .accounts_db_config
2754        .as_ref()
2755        .and_then(|config| config.shrink_paths.as_ref())
2756    {
2757        for shrink_path in shrink_paths {
2758            move_and_async_delete_path_contents(shrink_path);
2759        }
2760    }
2761}
2762
2763pub fn is_snapshot_config_valid(
2764    snapshot_config: &SnapshotConfig,
2765    accounts_hash_interval_slots: Slot,
2766) -> bool {
2767    // if the snapshot config is configured to *not* take snapshots, then it is valid
2768    if !snapshot_config.should_generate_snapshots() {
2769        return true;
2770    }
2771
2772    let full_snapshot_interval_slots = snapshot_config.full_snapshot_archive_interval_slots;
2773    let incremental_snapshot_interval_slots =
2774        snapshot_config.incremental_snapshot_archive_interval_slots;
2775
2776    let is_incremental_config_valid =
2777        if incremental_snapshot_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
2778            true
2779        } else {
2780            incremental_snapshot_interval_slots >= accounts_hash_interval_slots
2781                && incremental_snapshot_interval_slots % accounts_hash_interval_slots == 0
2782                && full_snapshot_interval_slots > incremental_snapshot_interval_slots
2783        };
2784
2785    full_snapshot_interval_slots >= accounts_hash_interval_slots
2786        && full_snapshot_interval_slots % accounts_hash_interval_slots == 0
2787        && is_incremental_config_valid
2788}
2789
2790#[cfg(test)]
2791mod tests {
2792    use {
2793        super::*,
2794        crossbeam_channel::{bounded, RecvTimeoutError},
2795        solana_entry::entry,
2796        solana_gossip::contact_info::ContactInfo,
2797        solana_ledger::{
2798            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2799            get_tmp_ledger_path_auto_delete,
2800        },
2801        solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
2802        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2803        std::{fs::remove_dir_all, thread, time::Duration},
2804    };
2805
2806    #[test]
2807    fn validator_exit() {
2808        solana_logger::setup();
2809        let leader_keypair = Keypair::new();
2810        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2811
2812        let validator_keypair = Keypair::new();
2813        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2814        let genesis_config =
2815            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2816                .genesis_config;
2817        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2818
2819        let voting_keypair = Arc::new(Keypair::new());
2820        let config = ValidatorConfig {
2821            rpc_addrs: Some((
2822                validator_node.info.rpc().unwrap(),
2823                validator_node.info.rpc_pubsub().unwrap(),
2824            )),
2825            ..ValidatorConfig::default_for_test()
2826        };
2827        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2828        let validator = Validator::new(
2829            validator_node,
2830            Arc::new(validator_keypair),
2831            &validator_ledger_path,
2832            &voting_keypair.pubkey(),
2833            Arc::new(RwLock::new(vec![voting_keypair])),
2834            vec![leader_node.info],
2835            &config,
2836            true, // should_check_duplicate_instance
2837            None, // rpc_to_plugin_manager_receiver
2838            start_progress.clone(),
2839            SocketAddrSpace::Unspecified,
2840            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2841            Arc::new(RwLock::new(None)),
2842        )
2843        .expect("assume successful validator start");
2844        assert_eq!(
2845            *start_progress.read().unwrap(),
2846            ValidatorStartProgress::Running
2847        );
2848        validator.close();
2849        remove_dir_all(validator_ledger_path).unwrap();
2850    }
2851
2852    #[test]
2853    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2854        solana_logger::setup();
2855
2856        let ledger_path = get_tmp_ledger_path_auto_delete!();
2857        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2858
2859        let mut validator_config = ValidatorConfig::default_for_test();
2860        let mut hard_forks = HardForks::default();
2861        let mut root_slot;
2862
2863        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2864        root_slot = 10;
2865        validator_config.wait_for_supermajority = Some(root_slot);
2866        assert_eq!(
2867            should_cleanup_blockstore_incorrect_shred_versions(
2868                &validator_config,
2869                &blockstore,
2870                root_slot,
2871                &hard_forks
2872            )
2873            .unwrap(),
2874            Some(root_slot + 1)
2875        );
2876
2877        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
2878        // Arguably operator error to pass a value for wait_for_supermajority in this case
2879        root_slot = 15;
2880        assert_eq!(
2881            should_cleanup_blockstore_incorrect_shred_versions(
2882                &validator_config,
2883                &blockstore,
2884                root_slot,
2885                &hard_forks
2886            )
2887            .unwrap(),
2888            None,
2889        );
2890
2891        // Emulate cluster restart at slot 10
2892        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
2893        hard_forks.register(10);
2894        assert_eq!(
2895            should_cleanup_blockstore_incorrect_shred_versions(
2896                &validator_config,
2897                &blockstore,
2898                root_slot,
2899                &hard_forks
2900            )
2901            .unwrap(),
2902            None,
2903        );
2904
2905        // Insert some shreds at newer slots than hard fork
2906        let entries = entry::create_ticks(1, 0, Hash::default());
2907        for i in 20..35 {
2908            let shreds = blockstore::entries_to_test_shreds(
2909                &entries,
2910                i,     // slot
2911                i - 1, // parent_slot
2912                true,  // is_full_slot
2913                1,     // version
2914                true,  // merkle_variant
2915            );
2916            blockstore.insert_shreds(shreds, None, true).unwrap();
2917        }
2918
2919        // No check as all blockstore data is newer than latest hard fork
2920        assert_eq!(
2921            should_cleanup_blockstore_incorrect_shred_versions(
2922                &validator_config,
2923                &blockstore,
2924                root_slot,
2925                &hard_forks
2926            )
2927            .unwrap(),
2928            None,
2929        );
2930
2931        // Emulate cluster restart at slot 25
2932        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
2933        root_slot = 25;
2934        hard_forks.register(root_slot);
2935        validator_config.wait_for_supermajority = Some(root_slot);
2936        assert_eq!(
2937            should_cleanup_blockstore_incorrect_shred_versions(
2938                &validator_config,
2939                &blockstore,
2940                root_slot,
2941                &hard_forks
2942            )
2943            .unwrap(),
2944            Some(root_slot + 1),
2945        );
2946        validator_config.wait_for_supermajority = None;
2947        assert_eq!(
2948            should_cleanup_blockstore_incorrect_shred_versions(
2949                &validator_config,
2950                &blockstore,
2951                root_slot,
2952                &hard_forks
2953            )
2954            .unwrap(),
2955            Some(root_slot + 1),
2956        );
2957
2958        // Do check with advanced root slot, even without wait_for_supermajority set correctly
2959        // Check starts from latest hard fork + 1
2960        root_slot = 30;
2961        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2962        assert_eq!(
2963            should_cleanup_blockstore_incorrect_shred_versions(
2964                &validator_config,
2965                &blockstore,
2966                root_slot,
2967                &hard_forks
2968            )
2969            .unwrap(),
2970            Some(latest_hard_fork + 1),
2971        );
2972
2973        // Purge blockstore up to latest hard fork
2974        // No check since all blockstore data newer than latest hard fork
2975        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2976        assert_eq!(
2977            should_cleanup_blockstore_incorrect_shred_versions(
2978                &validator_config,
2979                &blockstore,
2980                root_slot,
2981                &hard_forks
2982            )
2983            .unwrap(),
2984            None,
2985        );
2986    }
2987
2988    #[test]
2989    fn test_cleanup_blockstore_incorrect_shred_versions() {
2990        solana_logger::setup();
2991
2992        let validator_config = ValidatorConfig::default_for_test();
2993        let ledger_path = get_tmp_ledger_path_auto_delete!();
2994        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2995
2996        let entries = entry::create_ticks(1, 0, Hash::default());
2997        for i in 1..10 {
2998            let shreds = blockstore::entries_to_test_shreds(
2999                &entries,
3000                i,     // slot
3001                i - 1, // parent_slot
3002                true,  // is_full_slot
3003                1,     // version
3004                true,  // merkle_variant
3005            );
3006            blockstore.insert_shreds(shreds, None, true).unwrap();
3007        }
3008
3009        // this purges and compacts all slots greater than or equal to 5
3010        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3011        // assert that slots less than 5 aren't affected
3012        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3013        for i in 5..10 {
3014            assert!(blockstore
3015                .get_data_shreds_for_slot(i, 0)
3016                .unwrap()
3017                .is_empty());
3018        }
3019    }
3020
3021    #[test]
3022    fn validator_parallel_exit() {
3023        let leader_keypair = Keypair::new();
3024        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3025        let genesis_config =
3026            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3027                .genesis_config;
3028
3029        let mut ledger_paths = vec![];
3030        let mut validators: Vec<Validator> = (0..2)
3031            .map(|_| {
3032                let validator_keypair = Keypair::new();
3033                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3034                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3035                ledger_paths.push(validator_ledger_path.clone());
3036                let vote_account_keypair = Keypair::new();
3037                let config = ValidatorConfig {
3038                    rpc_addrs: Some((
3039                        validator_node.info.rpc().unwrap(),
3040                        validator_node.info.rpc_pubsub().unwrap(),
3041                    )),
3042                    ..ValidatorConfig::default_for_test()
3043                };
3044                Validator::new(
3045                    validator_node,
3046                    Arc::new(validator_keypair),
3047                    &validator_ledger_path,
3048                    &vote_account_keypair.pubkey(),
3049                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3050                    vec![leader_node.info.clone()],
3051                    &config,
3052                    true, // should_check_duplicate_instance.
3053                    None, // rpc_to_plugin_manager_receiver
3054                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3055                    SocketAddrSpace::Unspecified,
3056                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3057                    Arc::new(RwLock::new(None)),
3058                )
3059                .expect("assume successful validator start")
3060            })
3061            .collect();
3062
3063        // Each validator can exit in parallel to speed many sequential calls to join`
3064        validators.iter_mut().for_each(|v| v.exit());
3065
3066        // spawn a new thread to wait for the join of the validator
3067        let (sender, receiver) = bounded(0);
3068        let _ = thread::spawn(move || {
3069            validators.into_iter().for_each(|validator| {
3070                validator.join();
3071            });
3072            sender.send(()).unwrap();
3073        });
3074
3075        let timeout = Duration::from_secs(60);
3076        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3077            panic!("timeout for shutting down validators",);
3078        }
3079
3080        for path in ledger_paths {
3081            remove_dir_all(path).unwrap();
3082        }
3083    }
3084
3085    #[test]
3086    fn test_wait_for_supermajority() {
3087        solana_logger::setup();
3088        use solana_sdk::hash::hash;
3089        let node_keypair = Arc::new(Keypair::new());
3090        let cluster_info = ClusterInfo::new(
3091            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3092            node_keypair,
3093            SocketAddrSpace::Unspecified,
3094        );
3095
3096        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3097        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3098        let mut config = ValidatorConfig::default_for_test();
3099        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3100        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3101
3102        assert!(!wait_for_supermajority(
3103            &config,
3104            None,
3105            &bank_forks,
3106            &cluster_info,
3107            rpc_override_health_check.clone(),
3108            &start_progress,
3109        )
3110        .unwrap());
3111
3112        // bank=0, wait=1, should fail
3113        config.wait_for_supermajority = Some(1);
3114        matches!(
3115            wait_for_supermajority(
3116                &config,
3117                None,
3118                &bank_forks,
3119                &cluster_info,
3120                rpc_override_health_check.clone(),
3121                &start_progress,
3122            ),
3123            Err(ValidatorError::NotEnoughLedgerData),
3124        );
3125
3126        // bank=1, wait=0, should pass, bank is past the wait slot
3127        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3128            bank_forks.read().unwrap().root_bank(),
3129            &Pubkey::default(),
3130            1,
3131        ));
3132        config.wait_for_supermajority = Some(0);
3133        assert!(!wait_for_supermajority(
3134            &config,
3135            None,
3136            &bank_forks,
3137            &cluster_info,
3138            rpc_override_health_check.clone(),
3139            &start_progress,
3140        )
3141        .unwrap());
3142
3143        // bank=1, wait=1, equal, but bad hash provided
3144        config.wait_for_supermajority = Some(1);
3145        config.expected_bank_hash = Some(hash(&[1]));
3146        matches!(
3147            wait_for_supermajority(
3148                &config,
3149                None,
3150                &bank_forks,
3151                &cluster_info,
3152                rpc_override_health_check,
3153                &start_progress,
3154            ),
3155            Err(ValidatorError::BadExpectedBankHash),
3156        );
3157    }
3158
3159    #[test]
3160    fn test_interval_check() {
3161        fn new_snapshot_config(
3162            full_snapshot_archive_interval_slots: Slot,
3163            incremental_snapshot_archive_interval_slots: Slot,
3164        ) -> SnapshotConfig {
3165            SnapshotConfig {
3166                full_snapshot_archive_interval_slots,
3167                incremental_snapshot_archive_interval_slots,
3168                ..SnapshotConfig::default()
3169            }
3170        }
3171
3172        assert!(is_snapshot_config_valid(
3173            &new_snapshot_config(300, 200),
3174            100
3175        ));
3176
3177        let default_accounts_hash_interval =
3178            snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS;
3179        assert!(is_snapshot_config_valid(
3180            &new_snapshot_config(
3181                snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3182                snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS
3183            ),
3184            default_accounts_hash_interval,
3185        ));
3186        assert!(is_snapshot_config_valid(
3187            &new_snapshot_config(
3188                snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3189                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3190            ),
3191            default_accounts_hash_interval
3192        ));
3193        assert!(is_snapshot_config_valid(
3194            &new_snapshot_config(
3195                snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3196                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3197            ),
3198            default_accounts_hash_interval
3199        ));
3200        assert!(is_snapshot_config_valid(
3201            &new_snapshot_config(
3202                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3203                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3204            ),
3205            Slot::MAX
3206        ));
3207
3208        assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 100), 100));
3209        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 0), 100));
3210        assert!(!is_snapshot_config_valid(
3211            &new_snapshot_config(42, 100),
3212            100
3213        ));
3214        assert!(!is_snapshot_config_valid(
3215            &new_snapshot_config(100, 42),
3216            100
3217        ));
3218        assert!(!is_snapshot_config_valid(
3219            &new_snapshot_config(100, 100),
3220            100
3221        ));
3222        assert!(!is_snapshot_config_valid(
3223            &new_snapshot_config(100, 200),
3224            100
3225        ));
3226        assert!(!is_snapshot_config_valid(
3227            &new_snapshot_config(444, 200),
3228            100
3229        ));
3230        assert!(!is_snapshot_config_valid(
3231            &new_snapshot_config(400, 222),
3232            100
3233        ));
3234
3235        assert!(is_snapshot_config_valid(
3236            &SnapshotConfig::new_load_only(),
3237            100
3238        ));
3239        assert!(is_snapshot_config_valid(
3240            &SnapshotConfig {
3241                full_snapshot_archive_interval_slots: 41,
3242                incremental_snapshot_archive_interval_slots: 37,
3243                ..SnapshotConfig::new_load_only()
3244            },
3245            100
3246        ));
3247        assert!(is_snapshot_config_valid(
3248            &SnapshotConfig {
3249                full_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3250                incremental_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3251                ..SnapshotConfig::new_load_only()
3252            },
3253            100
3254        ));
3255    }
3256
3257    fn target_tick_duration() -> Duration {
3258        // DEFAULT_MS_PER_SLOT = 400
3259        // DEFAULT_TICKS_PER_SLOT = 64
3260        // MS_PER_TICK = 6
3261        //
3262        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3263        //
3264        // So, convert to microseconds first to avoid the integer rounding error
3265        let target_tick_duration_us = solana_sdk::clock::DEFAULT_MS_PER_SLOT * 1000
3266            / solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
3267        assert_eq!(target_tick_duration_us, 6250);
3268        Duration::from_micros(target_tick_duration_us)
3269    }
3270
3271    #[test]
3272    fn test_poh_speed() {
3273        solana_logger::setup();
3274        let poh_config = PohConfig {
3275            target_tick_duration: target_tick_duration(),
3276            // make PoH rate really fast to cause the panic condition
3277            hashes_per_tick: Some(100 * solana_sdk::clock::DEFAULT_HASHES_PER_TICK),
3278            ..PohConfig::default()
3279        };
3280        let genesis_config = GenesisConfig {
3281            poh_config,
3282            ..GenesisConfig::default()
3283        };
3284        let bank = Bank::new_for_tests(&genesis_config);
3285        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3286    }
3287
3288    #[test]
3289    fn test_poh_speed_no_hashes_per_tick() {
3290        solana_logger::setup();
3291        let poh_config = PohConfig {
3292            target_tick_duration: target_tick_duration(),
3293            hashes_per_tick: None,
3294            ..PohConfig::default()
3295        };
3296        let genesis_config = GenesisConfig {
3297            poh_config,
3298            ..GenesisConfig::default()
3299        };
3300        let bank = Bank::new_for_tests(&genesis_config);
3301        check_poh_speed(&bank, Some(10_000)).unwrap();
3302    }
3303}