solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        accounts_hash_verifier::AccountsHashVerifier,
7        admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
8        banking_trace::{self, BankingTracer, TraceError},
9        cluster_info_vote_listener::VoteTracker,
10        completed_data_sets_service::CompletedDataSetsService,
11        consensus::{
12            reconcile_blockstore_roots_with_external_source,
13            tower_storage::{NullTowerStorage, TowerStorage},
14            ExternalRootSource, Tower,
15        },
16        repair::{
17            self,
18            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19            serve_repair::ServeRepair,
20            serve_repair_service::ServeRepairService,
21        },
22        sample_performance_service::SamplePerformanceService,
23        sigverify,
24        snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService},
25        stats_reporter_service::StatsReporterService,
26        system_monitor_service::{
27            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28        },
29        tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30        tvu::{Tvu, TvuConfig, TvuSockets},
31    },
32    anyhow::{anyhow, Context, Result},
33    crossbeam_channel::{bounded, unbounded, Receiver},
34    quinn::Endpoint,
35    solana_accounts_db::{
36        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37        accounts_update_notifier_interface::AccountsUpdateNotifier,
38        hardened_unpack::{
39            open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40        },
41        utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
42    },
43    solana_client::connection_cache::{ConnectionCache, Protocol},
44    solana_clock::Slot,
45    solana_entry::poh::compute_hash_time,
46    solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
47    solana_genesis_config::{ClusterType, GenesisConfig},
48    solana_geyser_plugin_manager::{
49        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
50    },
51    solana_gossip::{
52        cluster_info::{
53            ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
54            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
55        },
56        contact_info::ContactInfo,
57        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
58        gossip_service::GossipService,
59    },
60    solana_hard_forks::HardForks,
61    solana_hash::Hash,
62    solana_keypair::Keypair,
63    solana_ledger::{
64        bank_forks_utils,
65        blockstore::{
66            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
67            MAX_REPLAY_WAKE_UP_SIGNALS,
68        },
69        blockstore_metric_report_service::BlockstoreMetricReportService,
70        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
71        blockstore_processor::{self, TransactionStatusSender},
72        entry_notifier_interface::EntryNotifierArc,
73        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
74        leader_schedule::FixedSchedule,
75        leader_schedule_cache::LeaderScheduleCache,
76        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
77    },
78    solana_measure::measure::Measure,
79    solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
80    solana_poh::{
81        poh_recorder::PohRecorder,
82        poh_service::{self, PohService},
83        transaction_recorder::TransactionRecorder,
84    },
85    solana_pubkey::Pubkey,
86    solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
87    solana_rpc::{
88        max_slots::MaxSlots,
89        optimistically_confirmed_bank_tracker::{
90            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
91            OptimisticallyConfirmedBankTracker,
92        },
93        rpc::JsonRpcConfig,
94        rpc_completed_slots_service::RpcCompletedSlotsService,
95        rpc_pubsub_service::{PubSubConfig, PubSubService},
96        rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
97        rpc_subscriptions::RpcSubscriptions,
98        transaction_notifier_interface::TransactionNotifierArc,
99        transaction_status_service::TransactionStatusService,
100    },
101    solana_runtime::{
102        accounts_background_service::{
103            AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
104            PrunedBanksRequestHandler, SnapshotRequestHandler,
105        },
106        bank::Bank,
107        bank_forks::BankForks,
108        commitment::BlockCommitmentCache,
109        prioritization_fee_cache::PrioritizationFeeCache,
110        runtime_config::RuntimeConfig,
111        snapshot_archive_info::SnapshotArchiveInfoGetter,
112        snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
113        snapshot_config::SnapshotConfig,
114        snapshot_controller::SnapshotController,
115        snapshot_hash::StartingSnapshotHashes,
116        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
117    },
118    solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
119    solana_shred_version::compute_shred_version,
120    solana_signer::Signer,
121    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
122    solana_time_utils::timestamp,
123    solana_tpu_client::tpu_client::{
124        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
125    },
126    solana_turbine::{self, broadcast_stage::BroadcastStageType, xdp::XdpConfig},
127    solana_unified_scheduler_pool::DefaultSchedulerPool,
128    solana_validator_exit::Exit,
129    solana_vote_program::vote_state,
130    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
131    std::{
132        borrow::Cow,
133        collections::{HashMap, HashSet},
134        net::SocketAddr,
135        num::NonZeroUsize,
136        path::{Path, PathBuf},
137        sync::{
138            atomic::{AtomicBool, AtomicU64, Ordering},
139            Arc, Mutex, RwLock,
140        },
141        thread::{sleep, Builder, JoinHandle},
142        time::{Duration, Instant},
143    },
144    strum::VariantNames,
145    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
146    thiserror::Error,
147    tokio::runtime::Runtime as TokioRuntime,
148    tokio_util::sync::CancellationToken,
149};
150
151const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
152const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
153// Right now since we reuse the wait for supermajority code, the
154// following threshold should always greater than or equal to
155// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
156const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
157    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
158
159#[derive(
160    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
161)]
162#[strum(serialize_all = "kebab-case")]
163pub enum BlockVerificationMethod {
164    BlockstoreProcessor,
165    #[default]
166    UnifiedScheduler,
167}
168
169impl BlockVerificationMethod {
170    pub const fn cli_names() -> &'static [&'static str] {
171        Self::VARIANTS
172    }
173
174    pub fn cli_message() -> &'static str {
175        "Switch transaction scheduling method for verifying ledger entries"
176    }
177}
178
179#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
180#[strum(serialize_all = "kebab-case")]
181pub enum BlockProductionMethod {
182    CentralScheduler,
183    #[default]
184    CentralSchedulerGreedy,
185}
186
187impl BlockProductionMethod {
188    pub const fn cli_names() -> &'static [&'static str] {
189        Self::VARIANTS
190    }
191
192    pub fn cli_message() -> &'static str {
193        "Switch transaction scheduling method for producing ledger entries"
194    }
195}
196
197#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
198#[strum(serialize_all = "kebab-case")]
199pub enum TransactionStructure {
200    #[default]
201    Sdk,
202    View,
203}
204
205impl TransactionStructure {
206    pub const fn cli_names() -> &'static [&'static str] {
207        Self::VARIANTS
208    }
209
210    pub fn cli_message() -> &'static str {
211        "Switch internal transaction structure/representation"
212    }
213}
214
215/// Configuration for the block generator invalidator for replay.
216#[derive(Clone, Debug)]
217pub struct GeneratorConfig {
218    pub accounts_path: String,
219    pub starting_keypairs: Arc<Vec<Keypair>>,
220}
221
222pub struct ValidatorConfig {
223    pub halt_at_slot: Option<Slot>,
224    pub expected_genesis_hash: Option<Hash>,
225    pub expected_bank_hash: Option<Hash>,
226    pub expected_shred_version: Option<u16>,
227    pub voting_disabled: bool,
228    pub account_paths: Vec<PathBuf>,
229    pub account_snapshot_paths: Vec<PathBuf>,
230    pub rpc_config: JsonRpcConfig,
231    /// Specifies which plugins to start up with
232    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
233    pub geyser_plugin_always_enabled: bool,
234    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
235    pub pubsub_config: PubSubConfig,
236    pub snapshot_config: SnapshotConfig,
237    pub max_ledger_shreds: Option<u64>,
238    pub blockstore_options: BlockstoreOptions,
239    pub broadcast_stage_type: BroadcastStageType,
240    pub turbine_disabled: Arc<AtomicBool>,
241    pub fixed_leader_schedule: Option<FixedSchedule>,
242    pub wait_for_supermajority: Option<Slot>,
243    pub new_hard_forks: Option<Vec<Slot>>,
244    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
245    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
246    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
247    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
248    pub max_genesis_archive_unpacked_size: u64,
249    /// Run PoH, transaction signature and other transaction verifications during blockstore
250    /// processing.
251    pub run_verification: bool,
252    pub require_tower: bool,
253    pub tower_storage: Arc<dyn TowerStorage>,
254    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
255    pub contact_debug_interval: u64,
256    pub contact_save_interval: u64,
257    pub send_transaction_service_config: SendTransactionServiceConfig,
258    pub no_poh_speed_test: bool,
259    pub no_os_memory_stats_reporting: bool,
260    pub no_os_network_stats_reporting: bool,
261    pub no_os_cpu_stats_reporting: bool,
262    pub no_os_disk_stats_reporting: bool,
263    pub poh_pinned_cpu_core: usize,
264    pub poh_hashes_per_batch: u64,
265    pub process_ledger_before_services: bool,
266    pub accounts_db_config: Option<AccountsDbConfig>,
267    pub warp_slot: Option<Slot>,
268    pub accounts_db_test_hash_calculation: bool,
269    pub accounts_db_skip_shrink: bool,
270    pub accounts_db_force_initial_clean: bool,
271    pub tpu_coalesce: Duration,
272    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
273    pub validator_exit: Arc<RwLock<Exit>>,
274    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
275    pub no_wait_for_vote_to_start_leader: bool,
276    pub wait_to_vote_slot: Option<Slot>,
277    pub runtime_config: RuntimeConfig,
278    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
279    pub block_verification_method: BlockVerificationMethod,
280    pub block_production_method: BlockProductionMethod,
281    pub transaction_struct: TransactionStructure,
282    pub enable_block_production_forwarding: bool,
283    pub generator_config: Option<GeneratorConfig>,
284    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
285    pub wen_restart_proto_path: Option<PathBuf>,
286    pub wen_restart_coordinator: Option<Pubkey>,
287    pub unified_scheduler_handler_threads: Option<usize>,
288    pub ip_echo_server_threads: NonZeroUsize,
289    pub rayon_global_threads: NonZeroUsize,
290    pub replay_forks_threads: NonZeroUsize,
291    pub replay_transactions_threads: NonZeroUsize,
292    pub tvu_shred_sigverify_threads: NonZeroUsize,
293    pub delay_leader_block_for_pending_fork: bool,
294    pub use_tpu_client_next: bool,
295    pub retransmit_xdp: Option<XdpConfig>,
296}
297
298impl Default for ValidatorConfig {
299    fn default() -> Self {
300        Self {
301            halt_at_slot: None,
302            expected_genesis_hash: None,
303            expected_bank_hash: None,
304            expected_shred_version: None,
305            voting_disabled: false,
306            max_ledger_shreds: None,
307            blockstore_options: BlockstoreOptions::default(),
308            account_paths: Vec::new(),
309            account_snapshot_paths: Vec::new(),
310            rpc_config: JsonRpcConfig::default(),
311            on_start_geyser_plugin_config_files: None,
312            geyser_plugin_always_enabled: false,
313            rpc_addrs: None,
314            pubsub_config: PubSubConfig::default(),
315            snapshot_config: SnapshotConfig::new_load_only(),
316            broadcast_stage_type: BroadcastStageType::Standard,
317            turbine_disabled: Arc::<AtomicBool>::default(),
318            fixed_leader_schedule: None,
319            wait_for_supermajority: None,
320            new_hard_forks: None,
321            known_validators: None,
322            repair_validators: None,
323            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
324            gossip_validators: None,
325            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
326            run_verification: true,
327            require_tower: false,
328            tower_storage: Arc::new(NullTowerStorage::default()),
329            debug_keys: None,
330            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
331            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
332            send_transaction_service_config: SendTransactionServiceConfig::default(),
333            no_poh_speed_test: true,
334            no_os_memory_stats_reporting: true,
335            no_os_network_stats_reporting: true,
336            no_os_cpu_stats_reporting: true,
337            no_os_disk_stats_reporting: true,
338            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
339            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
340            process_ledger_before_services: false,
341            warp_slot: None,
342            accounts_db_test_hash_calculation: false,
343            accounts_db_skip_shrink: false,
344            accounts_db_force_initial_clean: false,
345            tpu_coalesce: DEFAULT_TPU_COALESCE,
346            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
347            validator_exit: Arc::new(RwLock::new(Exit::default())),
348            validator_exit_backpressure: HashMap::default(),
349            no_wait_for_vote_to_start_leader: true,
350            accounts_db_config: None,
351            wait_to_vote_slot: None,
352            runtime_config: RuntimeConfig::default(),
353            banking_trace_dir_byte_limit: 0,
354            block_verification_method: BlockVerificationMethod::default(),
355            block_production_method: BlockProductionMethod::default(),
356            transaction_struct: TransactionStructure::default(),
357            enable_block_production_forwarding: false,
358            generator_config: None,
359            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
360            wen_restart_proto_path: None,
361            wen_restart_coordinator: None,
362            unified_scheduler_handler_threads: None,
363            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
364            rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
365            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
366            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
367            tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
368            delay_leader_block_for_pending_fork: false,
369            use_tpu_client_next: true,
370            retransmit_xdp: None,
371        }
372    }
373}
374
375impl ValidatorConfig {
376    pub fn default_for_test() -> Self {
377        let max_thread_count =
378            NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");
379
380        Self {
381            accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
382            blockstore_options: BlockstoreOptions::default_for_tests(),
383            rpc_config: JsonRpcConfig::default_for_test(),
384            block_production_method: BlockProductionMethod::default(),
385            enable_block_production_forwarding: true, // enable forwarding by default for tests
386            rayon_global_threads: max_thread_count,
387            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
388            replay_transactions_threads: max_thread_count,
389            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
390                .expect("thread count is non-zero"),
391            ..Self::default()
392        }
393    }
394
395    pub fn enable_default_rpc_block_subscribe(&mut self) {
396        let pubsub_config = PubSubConfig {
397            enable_block_subscription: true,
398            ..PubSubConfig::default()
399        };
400        let rpc_config = JsonRpcConfig {
401            enable_rpc_transaction_history: true,
402            ..JsonRpcConfig::default_for_test()
403        };
404
405        self.pubsub_config = pubsub_config;
406        self.rpc_config = rpc_config;
407    }
408}
409
410// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
411// the admin RPC channel to help them to follow the general progress of node startup without
412// having to watch log messages.
413#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
414pub enum ValidatorStartProgress {
415    Initializing, // Catch all, default state
416    SearchingForRpcService,
417    DownloadingSnapshot {
418        slot: Slot,
419        rpc_addr: SocketAddr,
420    },
421    CleaningBlockStore,
422    CleaningAccounts,
423    LoadingLedger,
424    ProcessingLedger {
425        slot: Slot,
426        max_slot: Slot,
427    },
428    StartingServices,
429    Halted, // Validator halted due to `--dev-halt-at-slot` argument
430    WaitingForSupermajority {
431        slot: Slot,
432        gossip_stake_percent: u64,
433    },
434
435    // `Running` is the terminal state once the validator fully starts and all services are
436    // operational
437    Running,
438}
439
440impl Default for ValidatorStartProgress {
441    fn default() -> Self {
442        Self::Initializing
443    }
444}
445
446struct BlockstoreRootScan {
447    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
448}
449
450impl BlockstoreRootScan {
451    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
452        let thread = if config.rpc_addrs.is_some()
453            && config.rpc_config.enable_rpc_transaction_history
454            && config.rpc_config.rpc_scan_and_fix_roots
455        {
456            Some(
457                Builder::new()
458                    .name("solBStoreRtScan".to_string())
459                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
460                    .unwrap(),
461            )
462        } else {
463            None
464        };
465        Self { thread }
466    }
467
468    fn join(self) {
469        if let Some(blockstore_root_scan) = self.thread {
470            if let Err(err) = blockstore_root_scan.join() {
471                warn!("blockstore_root_scan failed to join {:?}", err);
472            }
473        }
474    }
475}
476
477#[derive(Default)]
478struct TransactionHistoryServices {
479    transaction_status_sender: Option<TransactionStatusSender>,
480    transaction_status_service: Option<TransactionStatusService>,
481    max_complete_transaction_status_slot: Arc<AtomicU64>,
482}
483
484/// A struct easing passing Validator TPU Configurations
485pub struct ValidatorTpuConfig {
486    /// Controls if to use QUIC for sending regular TPU transaction
487    pub use_quic: bool,
488    /// Controls if to use QUIC for sending TPU votes
489    pub vote_use_quic: bool,
490    /// Controls the connection cache pool size
491    pub tpu_connection_pool_size: usize,
492    /// Controls if to enable UDP for TPU tansactions.
493    pub tpu_enable_udp: bool,
494    /// QUIC server config for regular TPU
495    pub tpu_quic_server_config: QuicServerParams,
496    /// QUIC server config for TPU forward
497    pub tpu_fwd_quic_server_config: QuicServerParams,
498    /// QUIC server config for Vote
499    pub vote_quic_server_config: QuicServerParams,
500}
501
502impl ValidatorTpuConfig {
503    /// A convenient function to build a ValidatorTpuConfig for testing with good
504    /// default.
505    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
506        let tpu_quic_server_config = QuicServerParams {
507            max_connections_per_ipaddr_per_min: 32,
508            coalesce_channel_size: 100_000, // smaller channel size for faster test
509            ..Default::default()
510        };
511
512        let tpu_fwd_quic_server_config = QuicServerParams {
513            max_connections_per_ipaddr_per_min: 32,
514            max_unstaked_connections: 0,
515            coalesce_channel_size: 100_000, // smaller channel size for faster test
516            ..Default::default()
517        };
518
519        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
520        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
521
522        ValidatorTpuConfig {
523            use_quic: DEFAULT_TPU_USE_QUIC,
524            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
525            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
526            tpu_enable_udp,
527            tpu_quic_server_config,
528            tpu_fwd_quic_server_config,
529            vote_quic_server_config,
530        }
531    }
532}
533
534pub struct Validator {
535    validator_exit: Arc<RwLock<Exit>>,
536    json_rpc_service: Option<JsonRpcService>,
537    pubsub_service: Option<PubSubService>,
538    rpc_completed_slots_service: Option<JoinHandle<()>>,
539    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
540    transaction_status_service: Option<TransactionStatusService>,
541    entry_notifier_service: Option<EntryNotifierService>,
542    system_monitor_service: Option<SystemMonitorService>,
543    sample_performance_service: Option<SamplePerformanceService>,
544    stats_reporter_service: StatsReporterService,
545    gossip_service: GossipService,
546    serve_repair_service: ServeRepairService,
547    completed_data_sets_service: Option<CompletedDataSetsService>,
548    snapshot_packager_service: Option<SnapshotPackagerService>,
549    poh_recorder: Arc<RwLock<PohRecorder>>,
550    poh_service: PohService,
551    tpu: Tpu,
552    tvu: Tvu,
553    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
554    pub cluster_info: Arc<ClusterInfo>,
555    pub bank_forks: Arc<RwLock<BankForks>>,
556    pub blockstore: Arc<Blockstore>,
557    geyser_plugin_service: Option<GeyserPluginService>,
558    blockstore_metric_report_service: BlockstoreMetricReportService,
559    accounts_background_service: AccountsBackgroundService,
560    accounts_hash_verifier: AccountsHashVerifier,
561    turbine_quic_endpoint: Option<Endpoint>,
562    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
563    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
564    repair_quic_endpoints: Option<[Endpoint; 3]>,
565    repair_quic_endpoints_runtime: Option<TokioRuntime>,
566    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
567    // This runtime is used to run the client owned by SendTransactionService.
568    // We don't wait for its JoinHandle here because ownership and shutdown
569    // are managed elsewhere. This variable is intentionally unused.
570    _tpu_client_next_runtime: Option<TokioRuntime>,
571}
572
573impl Validator {
574    #[allow(clippy::too_many_arguments)]
575    pub fn new(
576        mut node: Node,
577        identity_keypair: Arc<Keypair>,
578        ledger_path: &Path,
579        vote_account: &Pubkey,
580        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
581        cluster_entrypoints: Vec<ContactInfo>,
582        config: &ValidatorConfig,
583        should_check_duplicate_instance: bool,
584        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
585        start_progress: Arc<RwLock<ValidatorStartProgress>>,
586        socket_addr_space: SocketAddrSpace,
587        tpu_config: ValidatorTpuConfig,
588        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
589    ) -> Result<Self> {
590        #[cfg(debug_assertions)]
591        const DEBUG_ASSERTION_STATUS: &str = "enabled";
592        #[cfg(not(debug_assertions))]
593        const DEBUG_ASSERTION_STATUS: &str = "disabled";
594        info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
595
596        let ValidatorTpuConfig {
597            use_quic,
598            vote_use_quic,
599            tpu_connection_pool_size,
600            tpu_enable_udp,
601            tpu_quic_server_config,
602            tpu_fwd_quic_server_config,
603            vote_quic_server_config,
604        } = tpu_config;
605
606        let start_time = Instant::now();
607
608        // Initialize the global rayon pool first to ensure the value in config
609        // is honored. Otherwise, some code accessing the global pool could
610        // cause it to get initialized with Rayon's default (not ours)
611        if rayon::ThreadPoolBuilder::new()
612            .thread_name(|i| format!("solRayonGlob{i:02}"))
613            .num_threads(config.rayon_global_threads.get())
614            .build_global()
615            .is_err()
616        {
617            warn!("Rayon global thread pool already initialized");
618        }
619
620        let id = identity_keypair.pubkey();
621        assert_eq!(&id, node.info.pubkey());
622
623        info!("identity pubkey: {id}");
624        info!("vote account pubkey: {vote_account}");
625
626        if !config.no_os_network_stats_reporting {
627            verify_net_stats_access().map_err(|e| {
628                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
629            })?;
630        }
631
632        let mut bank_notification_senders = Vec::new();
633
634        let exit = Arc::new(AtomicBool::new(false));
635
636        let geyser_plugin_config_files = config
637            .on_start_geyser_plugin_config_files
638            .as_ref()
639            .map(Cow::Borrowed)
640            .or_else(|| {
641                config
642                    .geyser_plugin_always_enabled
643                    .then_some(Cow::Owned(vec![]))
644            });
645        let geyser_plugin_service =
646            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
647                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
648                bank_notification_senders.push(confirmed_bank_sender);
649                let rpc_to_plugin_manager_receiver_and_exit =
650                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
651                Some(
652                    GeyserPluginService::new_with_receiver(
653                        confirmed_bank_receiver,
654                        config.geyser_plugin_always_enabled,
655                        geyser_plugin_config_files.as_ref(),
656                        rpc_to_plugin_manager_receiver_and_exit,
657                    )
658                    .map_err(|err| {
659                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
660                    })?,
661                )
662            } else {
663                None
664            };
665
666        if config.voting_disabled {
667            warn!("voting disabled");
668            authorized_voter_keypairs.write().unwrap().clear();
669        } else {
670            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
671                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
672            }
673        }
674
675        for cluster_entrypoint in &cluster_entrypoints {
676            info!("entrypoint: {:?}", cluster_entrypoint);
677        }
678
679        if solana_perf::perf_libs::api().is_some() {
680            info!("Initializing sigverify, this could take a while...");
681        } else {
682            info!("Initializing sigverify...");
683        }
684        sigverify::init();
685        info!("Initializing sigverify done.");
686
687        if !ledger_path.is_dir() {
688            return Err(anyhow!(
689                "ledger directory does not exist or is not accessible: {ledger_path:?}"
690            ));
691        }
692        let genesis_config = load_genesis(config, ledger_path)?;
693        metrics_config_sanity_check(genesis_config.cluster_type)?;
694
695        info!("Cleaning accounts paths..");
696        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
697        let mut timer = Measure::start("clean_accounts_paths");
698        cleanup_accounts_paths(config);
699        timer.stop();
700        info!("Cleaning accounts paths done. {timer}");
701
702        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
703        snapshot_utils::purge_old_bank_snapshots_at_startup(
704            &config.snapshot_config.bank_snapshots_dir,
705        );
706
707        info!("Cleaning orphaned account snapshot directories..");
708        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
709        clean_orphaned_account_snapshot_dirs(
710            &config.snapshot_config.bank_snapshots_dir,
711            &config.account_snapshot_paths,
712        )
713        .context("failed to clean orphaned account snapshot directories")?;
714        timer.stop();
715        info!("Cleaning orphaned account snapshot directories done. {timer}");
716
717        // The accounts hash cache dir was renamed, so cleanup any old dirs that exist.
718        let accounts_hash_cache_path = config
719            .accounts_db_config
720            .as_ref()
721            .and_then(|config| config.accounts_hash_cache_path.as_ref())
722            .map(PathBuf::as_path)
723            .unwrap_or(ledger_path);
724        let old_accounts_hash_cache_dirs = [
725            ledger_path.join("calculate_accounts_hash_cache"),
726            accounts_hash_cache_path.join("full"),
727            accounts_hash_cache_path.join("incremental"),
728            accounts_hash_cache_path.join("transient"),
729        ];
730        for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
731            if old_accounts_hash_cache_dir.exists() {
732                move_and_async_delete_path(old_accounts_hash_cache_dir);
733            }
734        }
735
736        // token used to cancel tpu-client-next.
737        let cancel_tpu_client_next = CancellationToken::new();
738        {
739            let exit = exit.clone();
740            config
741                .validator_exit
742                .write()
743                .unwrap()
744                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
745            let cancel_tpu_client_next = cancel_tpu_client_next.clone();
746            config
747                .validator_exit
748                .write()
749                .unwrap()
750                .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
751        }
752
753        let accounts_update_notifier = geyser_plugin_service
754            .as_ref()
755            .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
756
757        let transaction_notifier = geyser_plugin_service
758            .as_ref()
759            .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());
760
761        let entry_notifier = geyser_plugin_service
762            .as_ref()
763            .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());
764
765        let block_metadata_notifier = geyser_plugin_service
766            .as_ref()
767            .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
768
769        let slot_status_notifier = geyser_plugin_service
770            .as_ref()
771            .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());
772
773        info!(
774            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
775             entry_notifier: {}",
776            accounts_update_notifier.is_some(),
777            transaction_notifier.is_some(),
778            entry_notifier.is_some()
779        );
780
781        let system_monitor_service = Some(SystemMonitorService::new(
782            exit.clone(),
783            SystemMonitorStatsReportConfig {
784                report_os_memory_stats: !config.no_os_memory_stats_reporting,
785                report_os_network_stats: !config.no_os_network_stats_reporting,
786                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
787                report_os_disk_stats: !config.no_os_disk_stats_reporting,
788            },
789        ));
790
791        let (
792            bank_forks,
793            blockstore,
794            original_blockstore_root,
795            ledger_signal_receiver,
796            leader_schedule_cache,
797            starting_snapshot_hashes,
798            TransactionHistoryServices {
799                transaction_status_sender,
800                transaction_status_service,
801                max_complete_transaction_status_slot,
802            },
803            blockstore_process_options,
804            blockstore_root_scan,
805            pruned_banks_receiver,
806            entry_notifier_service,
807        ) = load_blockstore(
808            config,
809            ledger_path,
810            &genesis_config,
811            exit.clone(),
812            &start_progress,
813            accounts_update_notifier,
814            transaction_notifier,
815            entry_notifier,
816        )
817        .map_err(ValidatorError::Other)?;
818
819        if !config.no_poh_speed_test {
820            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
821        }
822
823        let (root_slot, hard_forks) = {
824            let root_bank = bank_forks.read().unwrap().root_bank();
825            (root_bank.slot(), root_bank.hard_forks())
826        };
827        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
828        info!(
829            "shred version: {shred_version}, hard forks: {:?}",
830            hard_forks
831        );
832
833        if let Some(expected_shred_version) = config.expected_shred_version {
834            if expected_shred_version != shred_version {
835                return Err(ValidatorError::ShredVersionMismatch {
836                    actual: shred_version,
837                    expected: expected_shred_version,
838                }
839                .into());
840            }
841        }
842
843        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
844            config,
845            &blockstore,
846            root_slot,
847            &hard_forks,
848        )? {
849            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
850            cleanup_blockstore_incorrect_shred_versions(
851                &blockstore,
852                config,
853                start_slot,
854                shred_version,
855            )?;
856        } else {
857            info!("Skipping the blockstore check for shreds with incorrect version");
858        }
859
860        node.info.set_shred_version(shred_version);
861        node.info.set_wallclock(timestamp());
862        Self::print_node_info(&node);
863
864        let mut cluster_info = ClusterInfo::new(
865            node.info.clone(),
866            identity_keypair.clone(),
867            socket_addr_space,
868        );
869        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
870        cluster_info.set_entrypoints(cluster_entrypoints);
871        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
872        let cluster_info = Arc::new(cluster_info);
873
874        assert!(is_snapshot_config_valid(&config.snapshot_config));
875
876        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
877        let snapshot_controller = Arc::new(SnapshotController::new(
878            snapshot_request_sender.clone(),
879            config.snapshot_config.clone(),
880            bank_forks.read().unwrap().root(),
881        ));
882
883        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
884        let snapshot_packager_service = if snapshot_controller
885            .snapshot_config()
886            .should_generate_snapshots()
887        {
888            let exit_backpressure = config
889                .validator_exit_backpressure
890                .get(SnapshotPackagerService::NAME)
891                .cloned();
892            let enable_gossip_push = true;
893            let snapshot_packager_service = SnapshotPackagerService::new(
894                pending_snapshot_packages.clone(),
895                starting_snapshot_hashes,
896                exit.clone(),
897                exit_backpressure,
898                cluster_info.clone(),
899                snapshot_controller.clone(),
900                enable_gossip_push,
901            );
902            Some(snapshot_packager_service)
903        } else {
904            None
905        };
906
907        let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
908        let accounts_hash_verifier = AccountsHashVerifier::new(
909            accounts_package_sender.clone(),
910            accounts_package_receiver,
911            pending_snapshot_packages,
912            exit.clone(),
913            snapshot_controller.clone(),
914        );
915        let snapshot_request_handler = SnapshotRequestHandler {
916            snapshot_controller: snapshot_controller.clone(),
917            snapshot_request_receiver,
918            accounts_package_sender,
919        };
920        let pruned_banks_request_handler = PrunedBanksRequestHandler {
921            pruned_banks_receiver,
922        };
923        let accounts_background_service = AccountsBackgroundService::new(
924            bank_forks.clone(),
925            exit.clone(),
926            AbsRequestHandlers {
927                snapshot_request_handler,
928                pruned_banks_request_handler,
929            },
930            config.accounts_db_test_hash_calculation,
931        );
932        info!(
933            "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}",
934            config.block_verification_method, config.block_production_method, config.transaction_struct
935        );
936
937        let (replay_vote_sender, replay_vote_receiver) = unbounded();
938
939        // block min prioritization fee cache should be readable by RPC, and writable by validator
940        // (by both replay stage and banking stage)
941        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
942
943        let leader_schedule_cache = Arc::new(leader_schedule_cache);
944        let startup_verification_complete;
945        let (poh_recorder, entry_receiver) = {
946            let bank = &bank_forks.read().unwrap().working_bank();
947            startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
948            PohRecorder::new_with_clear_signal(
949                bank.tick_height(),
950                bank.last_blockhash(),
951                bank.clone(),
952                None,
953                bank.ticks_per_slot(),
954                config.delay_leader_block_for_pending_fork,
955                blockstore.clone(),
956                blockstore.get_new_shred_signal(0),
957                &leader_schedule_cache,
958                &genesis_config.poh_config,
959                exit.clone(),
960            )
961        };
962        let (record_sender, record_receiver) = unbounded();
963        let transaction_recorder =
964            TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
965        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
966
967        let (banking_tracer, tracer_thread) =
968            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
969                &blockstore.banking_trace_path(),
970                exit.clone(),
971                config.banking_trace_dir_byte_limit,
972            )))?;
973        if banking_tracer.is_enabled() {
974            info!(
975                "Enabled banking trace (dir_byte_limit: {})",
976                config.banking_trace_dir_byte_limit
977            );
978        } else {
979            info!("Disabled banking trace");
980        }
981        let banking_tracer_channels = banking_tracer.create_channels(false);
982
983        match &config.block_verification_method {
984            BlockVerificationMethod::BlockstoreProcessor => {
985                info!("no scheduler pool is installed for block verification...");
986                if let Some(count) = config.unified_scheduler_handler_threads {
987                    warn!(
988                        "--unified-scheduler-handler-threads={count} is ignored because unified \
989                         scheduler isn't enabled"
990                    );
991                }
992            }
993            BlockVerificationMethod::UnifiedScheduler => {
994                let scheduler_pool = DefaultSchedulerPool::new_dyn(
995                    config.unified_scheduler_handler_threads,
996                    config.runtime_config.log_messages_bytes_limit,
997                    transaction_status_sender.clone(),
998                    Some(replay_vote_sender.clone()),
999                    prioritization_fee_cache.clone(),
1000                );
1001                bank_forks
1002                    .write()
1003                    .unwrap()
1004                    .install_scheduler_pool(scheduler_pool);
1005            }
1006        }
1007
1008        let entry_notification_sender = entry_notifier_service
1009            .as_ref()
1010            .map(|service| service.sender());
1011        let mut process_blockstore = ProcessBlockStore::new(
1012            &id,
1013            vote_account,
1014            &start_progress,
1015            &blockstore,
1016            original_blockstore_root,
1017            &bank_forks,
1018            &leader_schedule_cache,
1019            &blockstore_process_options,
1020            transaction_status_sender.as_ref(),
1021            entry_notification_sender,
1022            blockstore_root_scan,
1023            &snapshot_controller,
1024            config,
1025        );
1026
1027        maybe_warp_slot(
1028            config,
1029            &mut process_blockstore,
1030            ledger_path,
1031            &bank_forks,
1032            &leader_schedule_cache,
1033            &snapshot_controller,
1034        )
1035        .map_err(ValidatorError::Other)?;
1036
1037        if config.process_ledger_before_services {
1038            process_blockstore
1039                .process()
1040                .map_err(ValidatorError::Other)?;
1041        }
1042        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1043
1044        let sample_performance_service =
1045            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1046                Some(SamplePerformanceService::new(
1047                    &bank_forks,
1048                    blockstore.clone(),
1049                    exit.clone(),
1050                ))
1051            } else {
1052                None
1053            };
1054
1055        let mut block_commitment_cache = BlockCommitmentCache::default();
1056        let bank_forks_guard = bank_forks.read().unwrap();
1057        block_commitment_cache.initialize_slots(
1058            bank_forks_guard.working_bank().slot(),
1059            bank_forks_guard.root(),
1060        );
1061        drop(bank_forks_guard);
1062        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1063
1064        let optimistically_confirmed_bank =
1065            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1066
1067        let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1068            exit.clone(),
1069            max_complete_transaction_status_slot.clone(),
1070            blockstore.clone(),
1071            bank_forks.clone(),
1072            block_commitment_cache.clone(),
1073            optimistically_confirmed_bank.clone(),
1074            &config.pubsub_config,
1075            None,
1076        ));
1077
1078        let max_slots = Arc::new(MaxSlots::default());
1079
1080        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1081
1082        let mut tpu_transactions_forwards_client =
1083            Some(node.sockets.tpu_transaction_forwarding_client);
1084
1085        let connection_cache = match (config.use_tpu_client_next, use_quic) {
1086            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1087                "connection_cache_tpu_quic",
1088                tpu_connection_pool_size,
1089                Some(
1090                    tpu_transactions_forwards_client
1091                        .take()
1092                        .expect("Socket should exist."),
1093                ),
1094                Some((
1095                    &identity_keypair,
1096                    node.info
1097                        .tpu(Protocol::UDP)
1098                        .ok_or_else(|| {
1099                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1100                        })?
1101                        .ip(),
1102                )),
1103                Some((&staked_nodes, &identity_keypair.pubkey())),
1104            ))),
1105            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1106                "connection_cache_tpu_udp",
1107                tpu_connection_pool_size,
1108            ))),
1109            (true, _) => None,
1110        };
1111
1112        let vote_connection_cache = if vote_use_quic {
1113            let vote_connection_cache = ConnectionCache::new_with_client_options(
1114                "connection_cache_vote_quic",
1115                tpu_connection_pool_size,
1116                Some(node.sockets.quic_vote_client),
1117                Some((
1118                    &identity_keypair,
1119                    node.info
1120                        .tpu_vote(Protocol::QUIC)
1121                        .ok_or_else(|| {
1122                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1123                        })?
1124                        .ip(),
1125                )),
1126                Some((&staked_nodes, &identity_keypair.pubkey())),
1127            );
1128            Arc::new(vote_connection_cache)
1129        } else {
1130            Arc::new(ConnectionCache::with_udp(
1131                "connection_cache_vote_udp",
1132                tpu_connection_pool_size,
1133            ))
1134        };
1135
1136        // test-validator crate may start the validator in a tokio runtime
1137        // context which forces us to use the same runtime because a nested
1138        // runtime will cause panic at drop. Outside test-validator crate, we
1139        // always need a tokio runtime (and the respective handle) to initialize
1140        // the turbine QUIC endpoint.
1141        let current_runtime_handle = tokio::runtime::Handle::try_current();
1142        let tpu_client_next_runtime =
1143            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1144                tokio::runtime::Builder::new_multi_thread()
1145                    .enable_all()
1146                    .worker_threads(2)
1147                    .thread_name("solTpuClientRt")
1148                    .build()
1149                    .unwrap()
1150            });
1151
1152        let rpc_override_health_check =
1153            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1154        let (
1155            json_rpc_service,
1156            pubsub_service,
1157            completed_data_sets_sender,
1158            completed_data_sets_service,
1159            rpc_completed_slots_service,
1160            optimistically_confirmed_bank_tracker,
1161            bank_notification_sender,
1162        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1163            assert_eq!(
1164                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1165                node.info
1166                    .rpc_pubsub()
1167                    .map(|addr| socket_addr_space.check(&addr))
1168            );
1169            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1170            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1171                Some(Arc::new(RwLock::new(bank_notification_senders)))
1172            } else {
1173                None
1174            };
1175
1176            let client_option = if config.use_tpu_client_next {
1177                let runtime_handle = tpu_client_next_runtime
1178                    .as_ref()
1179                    .map(TokioRuntime::handle)
1180                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1181                ClientOption::TpuClientNext(
1182                    Arc::as_ref(&identity_keypair),
1183                    node.sockets.rpc_sts_client,
1184                    runtime_handle.clone(),
1185                    cancel_tpu_client_next.clone(),
1186                )
1187            } else {
1188                let Some(connection_cache) = &connection_cache else {
1189                    panic!("ConnectionCache should exist by construction.");
1190                };
1191                ClientOption::ConnectionCache(connection_cache.clone())
1192            };
1193            let rpc_svc_config = JsonRpcServiceConfig {
1194                rpc_addr,
1195                rpc_config: config.rpc_config.clone(),
1196                snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1197                bank_forks: bank_forks.clone(),
1198                block_commitment_cache: block_commitment_cache.clone(),
1199                blockstore: blockstore.clone(),
1200                cluster_info: cluster_info.clone(),
1201                poh_recorder: Some(poh_recorder.clone()),
1202                genesis_hash: genesis_config.hash(),
1203                ledger_path: ledger_path.to_path_buf(),
1204                validator_exit: config.validator_exit.clone(),
1205                exit: exit.clone(),
1206                override_health_check: rpc_override_health_check.clone(),
1207                startup_verification_complete,
1208                optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1209                send_transaction_service_config: config.send_transaction_service_config.clone(),
1210                max_slots: max_slots.clone(),
1211                leader_schedule_cache: leader_schedule_cache.clone(),
1212                max_complete_transaction_status_slot,
1213                prioritization_fee_cache: prioritization_fee_cache.clone(),
1214                client_option,
1215            };
1216            let json_rpc_service =
1217                JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1218
1219            let pubsub_service = if !config.rpc_config.full_api {
1220                None
1221            } else {
1222                let (trigger, pubsub_service) = PubSubService::new(
1223                    config.pubsub_config.clone(),
1224                    &rpc_subscriptions,
1225                    rpc_pubsub_addr,
1226                );
1227                config
1228                    .validator_exit
1229                    .write()
1230                    .unwrap()
1231                    .register_exit(Box::new(move || trigger.cancel()));
1232
1233                Some(pubsub_service)
1234            };
1235
1236            let (completed_data_sets_sender, completed_data_sets_service) =
1237                if !config.rpc_config.full_api {
1238                    (None, None)
1239                } else {
1240                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1241                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1242                    let completed_data_sets_service = CompletedDataSetsService::new(
1243                        completed_data_sets_receiver,
1244                        blockstore.clone(),
1245                        rpc_subscriptions.clone(),
1246                        exit.clone(),
1247                        max_slots.clone(),
1248                    );
1249                    (
1250                        Some(completed_data_sets_sender),
1251                        Some(completed_data_sets_service),
1252                    )
1253                };
1254
1255            let rpc_completed_slots_service =
1256                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1257                    let (completed_slots_sender, completed_slots_receiver) =
1258                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1259                    blockstore.add_completed_slots_signal(completed_slots_sender);
1260
1261                    Some(RpcCompletedSlotsService::spawn(
1262                        completed_slots_receiver,
1263                        rpc_subscriptions.clone(),
1264                        slot_status_notifier.clone(),
1265                        exit.clone(),
1266                    ))
1267                } else {
1268                    None
1269                };
1270
1271            let optimistically_confirmed_bank_tracker =
1272                Some(OptimisticallyConfirmedBankTracker::new(
1273                    bank_notification_receiver,
1274                    exit.clone(),
1275                    bank_forks.clone(),
1276                    optimistically_confirmed_bank,
1277                    rpc_subscriptions.clone(),
1278                    confirmed_bank_subscribers,
1279                    prioritization_fee_cache.clone(),
1280                ));
1281            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1282                sender: bank_notification_sender,
1283                should_send_parents: geyser_plugin_service.is_some(),
1284            });
1285            (
1286                Some(json_rpc_service),
1287                pubsub_service,
1288                completed_data_sets_sender,
1289                completed_data_sets_service,
1290                rpc_completed_slots_service,
1291                optimistically_confirmed_bank_tracker,
1292                bank_notification_sender_config,
1293            )
1294        } else {
1295            (None, None, None, None, None, None, None)
1296        };
1297
1298        if config.halt_at_slot.is_some() {
1299            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1300            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1301            block_commitment_cache
1302                .write()
1303                .unwrap()
1304                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1305
1306            // Park with the RPC service running, ready for inspection!
1307            warn!("Validator halted");
1308            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1309            std::thread::park();
1310        }
1311        let ip_echo_server = match node.sockets.ip_echo {
1312            None => None,
1313            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1314                tcp_listener,
1315                config.ip_echo_server_threads,
1316                Some(node.info.shred_version()),
1317            )),
1318        };
1319
1320        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1321
1322        let stats_reporter_service =
1323            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1324
1325        let gossip_service = GossipService::new(
1326            &cluster_info,
1327            Some(bank_forks.clone()),
1328            node.sockets.gossip,
1329            config.gossip_validators.clone(),
1330            should_check_duplicate_instance,
1331            Some(stats_reporter_sender.clone()),
1332            exit.clone(),
1333        );
1334        let serve_repair = ServeRepair::new(
1335            cluster_info.clone(),
1336            bank_forks.clone(),
1337            config.repair_whitelist.clone(),
1338        );
1339        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1340        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1341        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1342            unbounded();
1343
1344        let waited_for_supermajority = wait_for_supermajority(
1345            config,
1346            Some(&mut process_blockstore),
1347            &bank_forks,
1348            &cluster_info,
1349            rpc_override_health_check,
1350            &start_progress,
1351        )?;
1352
1353        let blockstore_metric_report_service =
1354            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1355
1356        let wait_for_vote_to_start_leader =
1357            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1358
1359        let poh_service = PohService::new(
1360            poh_recorder.clone(),
1361            &genesis_config.poh_config,
1362            exit.clone(),
1363            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1364            config.poh_pinned_cpu_core,
1365            config.poh_hashes_per_batch,
1366            record_receiver,
1367        );
1368        assert_eq!(
1369            blockstore.get_new_shred_signals_len(),
1370            1,
1371            "New shred signal for the TVU should be the same as the clear bank signal."
1372        );
1373
1374        let vote_tracker = Arc::<VoteTracker>::default();
1375
1376        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1377        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1378        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1379        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1380
1381        let entry_notification_sender = entry_notifier_service
1382            .as_ref()
1383            .map(|service| service.sender_cloned());
1384
1385        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1386            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1387            .then(|| {
1388                tokio::runtime::Builder::new_multi_thread()
1389                    .enable_all()
1390                    .thread_name("solTurbineQuic")
1391                    .build()
1392                    .unwrap()
1393            });
1394        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1395        let (
1396            turbine_quic_endpoint,
1397            turbine_quic_endpoint_sender,
1398            turbine_quic_endpoint_join_handle,
1399        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1400            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1401            (None, sender, None)
1402        } else {
1403            solana_turbine::quic_endpoint::new_quic_endpoint(
1404                turbine_quic_endpoint_runtime
1405                    .as_ref()
1406                    .map(TokioRuntime::handle)
1407                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1408                &identity_keypair,
1409                node.sockets.tvu_quic,
1410                turbine_quic_endpoint_sender,
1411                bank_forks.clone(),
1412            )
1413            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1414            .unwrap()
1415        };
1416
1417        // Repair quic endpoint.
1418        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1419            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1420            .then(|| {
1421                tokio::runtime::Builder::new_multi_thread()
1422                    .enable_all()
1423                    .thread_name("solRepairQuic")
1424                    .build()
1425                    .unwrap()
1426            });
1427        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1428            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1429                (None, RepairQuicAsyncSenders::new_dummy(), None)
1430            } else {
1431                let repair_quic_sockets = RepairQuicSockets {
1432                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1433                    repair_client_quic_socket: node.sockets.repair_quic,
1434                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1435                };
1436                let repair_quic_senders = RepairQuicSenders {
1437                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1438                    repair_response_quic_sender,
1439                    ancestor_hashes_response_quic_sender,
1440                };
1441                repair::quic_endpoint::new_quic_endpoints(
1442                    repair_quic_endpoints_runtime
1443                        .as_ref()
1444                        .map(TokioRuntime::handle)
1445                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1446                    &identity_keypair,
1447                    repair_quic_sockets,
1448                    repair_quic_senders,
1449                    bank_forks.clone(),
1450                )
1451                .map(|(endpoints, senders, join_handle)| {
1452                    (Some(endpoints), senders, Some(join_handle))
1453                })
1454                .unwrap()
1455            };
1456        let serve_repair_service = ServeRepairService::new(
1457            serve_repair,
1458            // Incoming UDP repair requests are adapted into RemoteRequest
1459            // and also sent through the same channel.
1460            repair_request_quic_sender,
1461            repair_request_quic_receiver,
1462            repair_quic_async_senders.repair_response_quic_sender,
1463            blockstore.clone(),
1464            node.sockets.serve_repair,
1465            socket_addr_space,
1466            stats_reporter_sender,
1467            exit.clone(),
1468        );
1469
1470        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1471        let wen_restart_repair_slots = if in_wen_restart {
1472            Some(Arc::new(RwLock::new(Vec::new())))
1473        } else {
1474            None
1475        };
1476        let tower = match process_blockstore.process_to_create_tower() {
1477            Ok(tower) => {
1478                info!("Tower state: {:?}", tower);
1479                tower
1480            }
1481            Err(e) => {
1482                warn!(
1483                    "Unable to retrieve tower: {:?} creating default tower....",
1484                    e
1485                );
1486                Tower::default()
1487            }
1488        };
1489        let last_vote = tower.last_vote();
1490
1491        let outstanding_repair_requests =
1492            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1493        let cluster_slots =
1494            Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1495
1496        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
1497        let connection_cache_for_warmup =
1498            if json_rpc_service.is_some() && connection_cache.is_some() {
1499                connection_cache.as_ref()
1500            } else {
1501                None
1502            };
1503
1504        let tvu = Tvu::new(
1505            vote_account,
1506            authorized_voter_keypairs,
1507            &bank_forks,
1508            &cluster_info,
1509            TvuSockets {
1510                repair: node.sockets.repair.try_clone().unwrap(),
1511                retransmit: node.sockets.retransmit_sockets,
1512                fetch: node.sockets.tvu,
1513                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1514            },
1515            blockstore.clone(),
1516            ledger_signal_receiver,
1517            &rpc_subscriptions,
1518            &poh_recorder,
1519            tower,
1520            config.tower_storage.clone(),
1521            &leader_schedule_cache,
1522            exit.clone(),
1523            block_commitment_cache,
1524            config.turbine_disabled.clone(),
1525            transaction_status_sender.clone(),
1526            entry_notification_sender.clone(),
1527            vote_tracker.clone(),
1528            retransmit_slots_sender,
1529            gossip_verified_vote_hash_receiver,
1530            verified_vote_receiver,
1531            replay_vote_sender.clone(),
1532            completed_data_sets_sender,
1533            bank_notification_sender.clone(),
1534            duplicate_confirmed_slots_receiver,
1535            TvuConfig {
1536                max_ledger_shreds: config.max_ledger_shreds,
1537                shred_version: node.info.shred_version(),
1538                repair_validators: config.repair_validators.clone(),
1539                repair_whitelist: config.repair_whitelist.clone(),
1540                wait_for_vote_to_start_leader,
1541                replay_forks_threads: config.replay_forks_threads,
1542                replay_transactions_threads: config.replay_transactions_threads,
1543                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1544                retransmit_xdp: config.retransmit_xdp.clone(),
1545            },
1546            &max_slots,
1547            block_metadata_notifier,
1548            config.wait_to_vote_slot,
1549            Some(snapshot_controller.clone()),
1550            config.runtime_config.log_messages_bytes_limit,
1551            connection_cache_for_warmup,
1552            &prioritization_fee_cache,
1553            banking_tracer.clone(),
1554            turbine_quic_endpoint_sender.clone(),
1555            turbine_quic_endpoint_receiver,
1556            repair_response_quic_receiver,
1557            repair_quic_async_senders.repair_request_quic_sender,
1558            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1559            ancestor_hashes_response_quic_receiver,
1560            outstanding_repair_requests.clone(),
1561            cluster_slots.clone(),
1562            wen_restart_repair_slots.clone(),
1563            slot_status_notifier,
1564            vote_connection_cache,
1565        )
1566        .map_err(ValidatorError::Other)?;
1567
1568        if in_wen_restart {
1569            info!("Waiting for wen_restart to finish");
1570            wait_for_wen_restart(WenRestartConfig {
1571                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1572                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1573                last_vote,
1574                blockstore: blockstore.clone(),
1575                cluster_info: cluster_info.clone(),
1576                bank_forks: bank_forks.clone(),
1577                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1578                wait_for_supermajority_threshold_percent:
1579                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1580                snapshot_controller: Some(snapshot_controller.clone()),
1581                abs_status: accounts_background_service.status().clone(),
1582                genesis_config_hash: genesis_config.hash(),
1583                exit: exit.clone(),
1584            })?;
1585            return Err(ValidatorError::WenRestartFinished.into());
1586        }
1587
1588        let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1589        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1590            ForwardingClientOption::ConnectionCache(connection_cache.clone())
1591        } else {
1592            let runtime_handle = tpu_client_next_runtime
1593                .as_ref()
1594                .map(TokioRuntime::handle)
1595                .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1596            ForwardingClientOption::TpuClientNext((
1597                Arc::as_ref(&identity_keypair),
1598                tpu_transactions_forwards_client
1599                    .take()
1600                    .expect("Socket should exist."),
1601                runtime_handle.clone(),
1602                cancel_tpu_client_next,
1603            ))
1604        };
1605        let tpu = Tpu::new_with_client(
1606            &cluster_info,
1607            &poh_recorder,
1608            transaction_recorder,
1609            entry_receiver,
1610            retransmit_slots_receiver,
1611            TpuSockets {
1612                transactions: node.sockets.tpu,
1613                transaction_forwards: node.sockets.tpu_forwards,
1614                vote: node.sockets.tpu_vote,
1615                broadcast: node.sockets.broadcast,
1616                transactions_quic: node.sockets.tpu_quic,
1617                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1618                vote_quic: node.sockets.tpu_vote_quic,
1619                vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1620            },
1621            &rpc_subscriptions,
1622            transaction_status_sender,
1623            entry_notification_sender,
1624            blockstore.clone(),
1625            &config.broadcast_stage_type,
1626            exit,
1627            node.info.shred_version(),
1628            vote_tracker,
1629            bank_forks.clone(),
1630            verified_vote_sender,
1631            gossip_verified_vote_hash_sender,
1632            replay_vote_receiver,
1633            replay_vote_sender,
1634            bank_notification_sender.map(|sender| sender.sender),
1635            config.tpu_coalesce,
1636            duplicate_confirmed_slot_sender,
1637            forwarding_tpu_client,
1638            turbine_quic_endpoint_sender,
1639            &identity_keypair,
1640            config.runtime_config.log_messages_bytes_limit,
1641            &staked_nodes,
1642            config.staked_nodes_overrides.clone(),
1643            banking_tracer_channels,
1644            tracer_thread,
1645            tpu_enable_udp,
1646            tpu_quic_server_config,
1647            tpu_fwd_quic_server_config,
1648            vote_quic_server_config,
1649            &prioritization_fee_cache,
1650            config.block_production_method.clone(),
1651            config.transaction_struct.clone(),
1652            config.enable_block_production_forwarding,
1653            config.generator_config.clone(),
1654            key_notifiers.clone(),
1655        );
1656
1657        datapoint_info!(
1658            "validator-new",
1659            ("id", id.to_string(), String),
1660            ("version", solana_version::version!(), String),
1661            ("cluster_type", genesis_config.cluster_type as u32, i64),
1662            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1663            ("waited_for_supermajority", waited_for_supermajority, bool),
1664            ("shred_version", shred_version as i64, i64),
1665        );
1666
1667        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1668        if config.use_tpu_client_next {
1669            if let Some(json_rpc_service) = &json_rpc_service {
1670                key_notifiers.write().unwrap().add(
1671                    KeyUpdaterType::RpcService,
1672                    json_rpc_service.get_client_key_updater(),
1673                );
1674            }
1675            // note, that we don't need to add ConnectionClient to key_notifiers
1676            // because it is added inside Tpu.
1677        }
1678
1679        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1680            bank_forks: bank_forks.clone(),
1681            cluster_info: cluster_info.clone(),
1682            vote_account: *vote_account,
1683            repair_whitelist: config.repair_whitelist.clone(),
1684            notifies: key_notifiers,
1685            repair_socket: Arc::new(node.sockets.repair),
1686            outstanding_repair_requests,
1687            cluster_slots,
1688        });
1689
1690        Ok(Self {
1691            stats_reporter_service,
1692            gossip_service,
1693            serve_repair_service,
1694            json_rpc_service,
1695            pubsub_service,
1696            rpc_completed_slots_service,
1697            optimistically_confirmed_bank_tracker,
1698            transaction_status_service,
1699            entry_notifier_service,
1700            system_monitor_service,
1701            sample_performance_service,
1702            snapshot_packager_service,
1703            completed_data_sets_service,
1704            tpu,
1705            tvu,
1706            poh_service,
1707            poh_recorder,
1708            ip_echo_server,
1709            validator_exit: config.validator_exit.clone(),
1710            cluster_info,
1711            bank_forks,
1712            blockstore,
1713            geyser_plugin_service,
1714            blockstore_metric_report_service,
1715            accounts_background_service,
1716            accounts_hash_verifier,
1717            turbine_quic_endpoint,
1718            turbine_quic_endpoint_runtime,
1719            turbine_quic_endpoint_join_handle,
1720            repair_quic_endpoints,
1721            repair_quic_endpoints_runtime,
1722            repair_quic_endpoints_join_handle,
1723            _tpu_client_next_runtime: tpu_client_next_runtime,
1724        })
1725    }
1726
1727    // Used for notifying many nodes in parallel to exit
1728    pub fn exit(&mut self) {
1729        self.validator_exit.write().unwrap().exit();
1730
1731        // drop all signals in blockstore
1732        self.blockstore.drop_signal();
1733    }
1734
1735    pub fn close(mut self) {
1736        self.exit();
1737        self.join();
1738    }
1739
1740    fn print_node_info(node: &Node) {
1741        info!("{:?}", node.info);
1742        info!(
1743            "local gossip address: {}",
1744            node.sockets.gossip.local_addr().unwrap()
1745        );
1746        info!(
1747            "local broadcast address: {}",
1748            node.sockets
1749                .broadcast
1750                .first()
1751                .unwrap()
1752                .local_addr()
1753                .unwrap()
1754        );
1755        info!(
1756            "local repair address: {}",
1757            node.sockets.repair.local_addr().unwrap()
1758        );
1759        info!(
1760            "local retransmit address: {}",
1761            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1762        );
1763    }
1764
1765    pub fn join(self) {
1766        drop(self.bank_forks);
1767        drop(self.cluster_info);
1768
1769        self.poh_service.join().expect("poh_service");
1770        drop(self.poh_recorder);
1771
1772        if let Some(json_rpc_service) = self.json_rpc_service {
1773            json_rpc_service.join().expect("rpc_service");
1774        }
1775
1776        if let Some(pubsub_service) = self.pubsub_service {
1777            pubsub_service.join().expect("pubsub_service");
1778        }
1779
1780        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1781            rpc_completed_slots_service
1782                .join()
1783                .expect("rpc_completed_slots_service");
1784        }
1785
1786        if let Some(optimistically_confirmed_bank_tracker) =
1787            self.optimistically_confirmed_bank_tracker
1788        {
1789            optimistically_confirmed_bank_tracker
1790                .join()
1791                .expect("optimistically_confirmed_bank_tracker");
1792        }
1793
1794        if let Some(transaction_status_service) = self.transaction_status_service {
1795            transaction_status_service
1796                .join()
1797                .expect("transaction_status_service");
1798        }
1799
1800        if let Some(system_monitor_service) = self.system_monitor_service {
1801            system_monitor_service
1802                .join()
1803                .expect("system_monitor_service");
1804        }
1805
1806        if let Some(sample_performance_service) = self.sample_performance_service {
1807            sample_performance_service
1808                .join()
1809                .expect("sample_performance_service");
1810        }
1811
1812        if let Some(entry_notifier_service) = self.entry_notifier_service {
1813            entry_notifier_service
1814                .join()
1815                .expect("entry_notifier_service");
1816        }
1817
1818        if let Some(s) = self.snapshot_packager_service {
1819            s.join().expect("snapshot_packager_service");
1820        }
1821
1822        self.gossip_service.join().expect("gossip_service");
1823        self.repair_quic_endpoints
1824            .iter()
1825            .flatten()
1826            .for_each(repair::quic_endpoint::close_quic_endpoint);
1827        self.serve_repair_service
1828            .join()
1829            .expect("serve_repair_service");
1830        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1831            self.repair_quic_endpoints_runtime
1832                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1833                .transpose()
1834                .unwrap();
1835        }
1836        self.stats_reporter_service
1837            .join()
1838            .expect("stats_reporter_service");
1839        self.blockstore_metric_report_service
1840            .join()
1841            .expect("ledger_metric_report_service");
1842        self.accounts_background_service
1843            .join()
1844            .expect("accounts_background_service");
1845        self.accounts_hash_verifier
1846            .join()
1847            .expect("accounts_hash_verifier");
1848        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1849            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1850        }
1851        self.tpu.join().expect("tpu");
1852        self.tvu.join().expect("tvu");
1853        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1854            self.turbine_quic_endpoint_runtime
1855                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1856                .transpose()
1857                .unwrap();
1858        }
1859        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1860            completed_data_sets_service
1861                .join()
1862                .expect("completed_data_sets_service");
1863        }
1864        if let Some(ip_echo_server) = self.ip_echo_server {
1865            ip_echo_server.shutdown_background();
1866        }
1867
1868        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1869            geyser_plugin_service.join().expect("geyser_plugin_service");
1870        }
1871    }
1872}
1873
1874fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1875    if let Some(account) = &bank.get_account(vote_account) {
1876        if let Some(vote_state) = vote_state::from(account) {
1877            return !vote_state.votes.is_empty();
1878        }
1879    }
1880    false
1881}
1882
1883fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1884    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1885        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1886        return Ok(());
1887    };
1888
1889    let ticks_per_slot = bank.ticks_per_slot();
1890    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1891    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1892
1893    let hash_time = compute_hash_time(hash_samples);
1894    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1895
1896    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1897    let target_hashes_per_second =
1898        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1899
1900    info!(
1901        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1902         second {target_hashes_per_second}"
1903    );
1904    if my_hashes_per_second < target_hashes_per_second {
1905        return Err(ValidatorError::PohTooSlow {
1906            mine: my_hashes_per_second,
1907            target: target_hashes_per_second,
1908        });
1909    }
1910
1911    Ok(())
1912}
1913
1914fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1915    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
1916    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1917        if wait_slot_for_supermajority == root_slot {
1918            return Some(wait_slot_for_supermajority);
1919        }
1920    }
1921
1922    None
1923}
1924
1925fn post_process_restored_tower(
1926    restored_tower: crate::consensus::Result<Tower>,
1927    validator_identity: &Pubkey,
1928    vote_account: &Pubkey,
1929    config: &ValidatorConfig,
1930    bank_forks: &BankForks,
1931) -> Result<Tower, String> {
1932    let mut should_require_tower = config.require_tower;
1933
1934    let restored_tower = restored_tower.and_then(|tower| {
1935        let root_bank = bank_forks.root_bank();
1936        let slot_history = root_bank.get_slot_history();
1937        // make sure tower isn't corrupted first before the following hard fork check
1938        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1939
1940        if let Some(hard_fork_restart_slot) =
1941            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1942        {
1943            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
1944            // out-of-chain vote state doesn't make sense at all
1945            // what if --wait-for-supermajority again if the validator restarted?
1946            let message =
1947                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1948            datapoint_error!("tower_error", ("error", message, String),);
1949            error!("{}", message);
1950
1951            // unconditionally relax tower requirement so that we can always restore tower
1952            // from root bank.
1953            should_require_tower = false;
1954            return Err(crate::consensus::TowerError::HardFork(
1955                hard_fork_restart_slot,
1956            ));
1957        }
1958
1959        if let Some(warp_slot) = config.warp_slot {
1960            // unconditionally relax tower requirement so that we can always restore tower
1961            // from root bank after the warp
1962            should_require_tower = false;
1963            return Err(crate::consensus::TowerError::HardFork(warp_slot));
1964        }
1965
1966        tower
1967    });
1968
1969    let restored_tower = match restored_tower {
1970        Ok(tower) => tower,
1971        Err(err) => {
1972            let voting_has_been_active =
1973                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1974            if !err.is_file_missing() {
1975                datapoint_error!(
1976                    "tower_error",
1977                    ("error", format!("Unable to restore tower: {err}"), String),
1978                );
1979            }
1980            if should_require_tower && voting_has_been_active {
1981                return Err(format!(
1982                    "Requested mandatory tower restore failed: {err}. And there is an existing \
1983                     vote_account containing actual votes. Aborting due to possible conflicting \
1984                     duplicate votes"
1985                ));
1986            }
1987            if err.is_file_missing() && !voting_has_been_active {
1988                // Currently, don't protect against spoofed snapshots with no tower at all
1989                info!(
1990                    "Ignoring expected failed tower restore because this is the initial validator \
1991                     start with the vote account..."
1992                );
1993            } else {
1994                error!(
1995                    "Rebuilding a new tower from the latest vote account due to failed tower \
1996                     restore: {}",
1997                    err
1998                );
1999            }
2000
2001            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2002        }
2003    };
2004
2005    Ok(restored_tower)
2006}
2007
2008fn load_genesis(
2009    config: &ValidatorConfig,
2010    ledger_path: &Path,
2011) -> Result<GenesisConfig, ValidatorError> {
2012    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2013        .map_err(ValidatorError::OpenGenesisConfig)?;
2014
2015    // This needs to be limited otherwise the state in the VoteAccount data
2016    // grows too large
2017    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2018    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2019    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2020    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2021
2022    let genesis_hash = genesis_config.hash();
2023    info!("genesis hash: {}", genesis_hash);
2024
2025    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2026        if genesis_hash != expected_genesis_hash {
2027            return Err(ValidatorError::GenesisHashMismatch(
2028                genesis_hash,
2029                expected_genesis_hash,
2030            ));
2031        }
2032    }
2033
2034    Ok(genesis_config)
2035}
2036
2037#[allow(clippy::type_complexity)]
2038fn load_blockstore(
2039    config: &ValidatorConfig,
2040    ledger_path: &Path,
2041    genesis_config: &GenesisConfig,
2042    exit: Arc<AtomicBool>,
2043    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2044    accounts_update_notifier: Option<AccountsUpdateNotifier>,
2045    transaction_notifier: Option<TransactionNotifierArc>,
2046    entry_notifier: Option<EntryNotifierArc>,
2047) -> Result<
2048    (
2049        Arc<RwLock<BankForks>>,
2050        Arc<Blockstore>,
2051        Slot,
2052        Receiver<bool>,
2053        LeaderScheduleCache,
2054        Option<StartingSnapshotHashes>,
2055        TransactionHistoryServices,
2056        blockstore_processor::ProcessOptions,
2057        BlockstoreRootScan,
2058        DroppedSlotsReceiver,
2059        Option<EntryNotifierService>,
2060    ),
2061    String,
2062> {
2063    info!("loading ledger from {:?}...", ledger_path);
2064    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2065
2066    let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2067        .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2068
2069    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2070    blockstore.add_new_shred_signal(ledger_signal_sender);
2071
2072    // following boot sequence (esp BankForks) could set root. so stash the original value
2073    // of blockstore root away here as soon as possible.
2074    let original_blockstore_root = blockstore.max_root();
2075
2076    let blockstore = Arc::new(blockstore);
2077    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2078    let halt_at_slot = config
2079        .halt_at_slot
2080        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2081
2082    let process_options = blockstore_processor::ProcessOptions {
2083        run_verification: config.run_verification,
2084        halt_at_slot,
2085        new_hard_forks: config.new_hard_forks.clone(),
2086        debug_keys: config.debug_keys.clone(),
2087        accounts_db_config: config.accounts_db_config.clone(),
2088        accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
2089        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2090        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2091        runtime_config: config.runtime_config.clone(),
2092        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2093        ..blockstore_processor::ProcessOptions::default()
2094    };
2095
2096    let enable_rpc_transaction_history =
2097        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2098    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2099    let transaction_history_services =
2100        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2101            initialize_rpc_transaction_history_services(
2102                blockstore.clone(),
2103                exit.clone(),
2104                enable_rpc_transaction_history,
2105                config.rpc_config.enable_extended_tx_metadata_storage,
2106                transaction_notifier,
2107            )
2108        } else {
2109            TransactionHistoryServices::default()
2110        };
2111
2112    let entry_notifier_service = entry_notifier
2113        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2114
2115    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2116        bank_forks_utils::load_bank_forks(
2117            genesis_config,
2118            &blockstore,
2119            config.account_paths.clone(),
2120            &config.snapshot_config,
2121            &process_options,
2122            transaction_history_services
2123                .transaction_status_sender
2124                .as_ref(),
2125            entry_notifier_service
2126                .as_ref()
2127                .map(|service| service.sender()),
2128            accounts_update_notifier,
2129            exit,
2130        )
2131        .map_err(|err| err.to_string())?;
2132
2133    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2134    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2135    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2136    // cache flush, clean, shrink, as long as the same thread performing those activities also
2137    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2138    let pruned_banks_receiver =
2139        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2140
2141    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2142
2143    Ok((
2144        bank_forks,
2145        blockstore,
2146        original_blockstore_root,
2147        ledger_signal_receiver,
2148        leader_schedule_cache,
2149        starting_snapshot_hashes,
2150        transaction_history_services,
2151        process_options,
2152        blockstore_root_scan,
2153        pruned_banks_receiver,
2154        entry_notifier_service,
2155    ))
2156}
2157
2158pub struct ProcessBlockStore<'a> {
2159    id: &'a Pubkey,
2160    vote_account: &'a Pubkey,
2161    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2162    blockstore: &'a Blockstore,
2163    original_blockstore_root: Slot,
2164    bank_forks: &'a Arc<RwLock<BankForks>>,
2165    leader_schedule_cache: &'a LeaderScheduleCache,
2166    process_options: &'a blockstore_processor::ProcessOptions,
2167    transaction_status_sender: Option<&'a TransactionStatusSender>,
2168    entry_notification_sender: Option<&'a EntryNotifierSender>,
2169    blockstore_root_scan: Option<BlockstoreRootScan>,
2170    snapshot_controller: &'a SnapshotController,
2171    config: &'a ValidatorConfig,
2172    tower: Option<Tower>,
2173}
2174
2175impl<'a> ProcessBlockStore<'a> {
2176    #[allow(clippy::too_many_arguments)]
2177    fn new(
2178        id: &'a Pubkey,
2179        vote_account: &'a Pubkey,
2180        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2181        blockstore: &'a Blockstore,
2182        original_blockstore_root: Slot,
2183        bank_forks: &'a Arc<RwLock<BankForks>>,
2184        leader_schedule_cache: &'a LeaderScheduleCache,
2185        process_options: &'a blockstore_processor::ProcessOptions,
2186        transaction_status_sender: Option<&'a TransactionStatusSender>,
2187        entry_notification_sender: Option<&'a EntryNotifierSender>,
2188        blockstore_root_scan: BlockstoreRootScan,
2189        snapshot_controller: &'a SnapshotController,
2190        config: &'a ValidatorConfig,
2191    ) -> Self {
2192        Self {
2193            id,
2194            vote_account,
2195            start_progress,
2196            blockstore,
2197            original_blockstore_root,
2198            bank_forks,
2199            leader_schedule_cache,
2200            process_options,
2201            transaction_status_sender,
2202            entry_notification_sender,
2203            blockstore_root_scan: Some(blockstore_root_scan),
2204            snapshot_controller,
2205            config,
2206            tower: None,
2207        }
2208    }
2209
2210    pub(crate) fn process(&mut self) -> Result<(), String> {
2211        if self.tower.is_none() {
2212            let previous_start_process = *self.start_progress.read().unwrap();
2213            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2214
2215            let exit = Arc::new(AtomicBool::new(false));
2216            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2217                let bank_forks = self.bank_forks.clone();
2218                let exit = exit.clone();
2219                let start_progress = self.start_progress.clone();
2220
2221                let _ = Builder::new()
2222                    .name("solRptLdgrStat".to_string())
2223                    .spawn(move || {
2224                        while !exit.load(Ordering::Relaxed) {
2225                            let slot = bank_forks.read().unwrap().working_bank().slot();
2226                            *start_progress.write().unwrap() =
2227                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2228                            sleep(Duration::from_secs(2));
2229                        }
2230                    })
2231                    .unwrap();
2232            }
2233            blockstore_processor::process_blockstore_from_root(
2234                self.blockstore,
2235                self.bank_forks,
2236                self.leader_schedule_cache,
2237                self.process_options,
2238                self.transaction_status_sender,
2239                self.entry_notification_sender,
2240                Some(self.snapshot_controller),
2241            )
2242            .map_err(|err| {
2243                exit.store(true, Ordering::Relaxed);
2244                format!("Failed to load ledger: {err:?}")
2245            })?;
2246            exit.store(true, Ordering::Relaxed);
2247
2248            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2249                blockstore_root_scan.join();
2250            }
2251
2252            self.tower = Some({
2253                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2254                if let Ok(tower) = &restored_tower {
2255                    // reconciliation attempt 1 of 2 with tower
2256                    reconcile_blockstore_roots_with_external_source(
2257                        ExternalRootSource::Tower(tower.root()),
2258                        self.blockstore,
2259                        &mut self.original_blockstore_root,
2260                    )
2261                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2262                }
2263
2264                post_process_restored_tower(
2265                    restored_tower,
2266                    self.id,
2267                    self.vote_account,
2268                    self.config,
2269                    &self.bank_forks.read().unwrap(),
2270                )?
2271            });
2272
2273            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2274                self.config,
2275                self.bank_forks.read().unwrap().root(),
2276            ) {
2277                // reconciliation attempt 2 of 2 with hard fork
2278                // this should be #2 because hard fork root > tower root in almost all cases
2279                reconcile_blockstore_roots_with_external_source(
2280                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2281                    self.blockstore,
2282                    &mut self.original_blockstore_root,
2283                )
2284                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2285            }
2286
2287            *self.start_progress.write().unwrap() = previous_start_process;
2288        }
2289        Ok(())
2290    }
2291
2292    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2293        self.process()?;
2294        Ok(self.tower.unwrap())
2295    }
2296}
2297
2298fn maybe_warp_slot(
2299    config: &ValidatorConfig,
2300    process_blockstore: &mut ProcessBlockStore,
2301    ledger_path: &Path,
2302    bank_forks: &RwLock<BankForks>,
2303    leader_schedule_cache: &LeaderScheduleCache,
2304    snapshot_controller: &SnapshotController,
2305) -> Result<(), String> {
2306    if let Some(warp_slot) = config.warp_slot {
2307        let mut bank_forks = bank_forks.write().unwrap();
2308
2309        let working_bank = bank_forks.working_bank();
2310
2311        if warp_slot <= working_bank.slot() {
2312            return Err(format!(
2313                "warp slot ({}) cannot be less than the working bank slot ({})",
2314                warp_slot,
2315                working_bank.slot()
2316            ));
2317        }
2318        info!("warping to slot {}", warp_slot);
2319
2320        let root_bank = bank_forks.root_bank();
2321
2322        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2323        // requires that the accounts cache has been flushed, which requires the parent slot to be
2324        // rooted.
2325        root_bank.squash();
2326        root_bank.force_flush_accounts_cache();
2327
2328        bank_forks.insert(Bank::warp_from_parent(
2329            root_bank,
2330            &Pubkey::default(),
2331            warp_slot,
2332            solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
2333        ));
2334        bank_forks
2335            .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2336            .map_err(|err| err.to_string())?;
2337        leader_schedule_cache.set_root(&bank_forks.root_bank());
2338
2339        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2340            ledger_path,
2341            &bank_forks.root_bank(),
2342            None,
2343            &config.snapshot_config.full_snapshot_archives_dir,
2344            &config.snapshot_config.incremental_snapshot_archives_dir,
2345            config.snapshot_config.archive_format,
2346        ) {
2347            Ok(archive_info) => archive_info,
2348            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2349        };
2350        info!(
2351            "created snapshot: {}",
2352            full_snapshot_archive_info.path().display()
2353        );
2354
2355        drop(bank_forks);
2356        // Process blockstore after warping bank forks to make sure tower and
2357        // bank forks are in sync.
2358        process_blockstore.process()?;
2359    }
2360    Ok(())
2361}
2362
2363/// Returns the starting slot at which the blockstore should be scanned for
2364/// shreds with an incorrect shred version, or None if the check is unnecessary
2365fn should_cleanup_blockstore_incorrect_shred_versions(
2366    config: &ValidatorConfig,
2367    blockstore: &Blockstore,
2368    root_slot: Slot,
2369    hard_forks: &HardForks,
2370) -> Result<Option<Slot>, BlockstoreError> {
2371    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2372    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2373    if maybe_cluster_restart_slot.is_some() {
2374        return Ok(Some(root_slot + 1));
2375    }
2376
2377    // If there are no hard forks, the shred version cannot have changed
2378    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2379        return Ok(None);
2380    };
2381
2382    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2383    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2384        return Ok(None);
2385    };
2386    let blockstore_min_slot = blockstore.lowest_slot();
2387    info!(
2388        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2389        latest hard fork is {latest_hard_fork}"
2390    );
2391
2392    if latest_hard_fork < blockstore_min_slot {
2393        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2394        //
2395        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2396        // the correct shred version should have been inserted since the latest hard fork
2397        //
2398        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2399        // can skip the check for this case
2400        Ok(None)
2401    } else if latest_hard_fork < blockstore_max_slot {
2402        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2403        //
2404        // This could be a case where there was a cluster restart, but this node was not part of
2405        // the supermajority that actually restarted the cluster. Rather, this node likely
2406        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2407        // chosen restart slot. We need to perform the blockstore check for this case
2408        //
2409        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2410        // fork slot. Even though this node will only replay slots after root_slot, start the check
2411        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2412        Ok(Some(latest_hard_fork + 1))
2413    } else {
2414        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2415        //
2416        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2417        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2418        //
2419        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2420        // and just restarted with a new downloaded snapshot but the old blockstore
2421        Ok(None)
2422    }
2423}
2424
2425/// Searches the blockstore for data shreds with a shred version that differs
2426/// from the passed `expected_shred_version`
2427fn scan_blockstore_for_incorrect_shred_version(
2428    blockstore: &Blockstore,
2429    start_slot: Slot,
2430    expected_shred_version: u16,
2431) -> Result<Option<u16>, BlockstoreError> {
2432    const TIMEOUT: Duration = Duration::from_secs(60);
2433    let timer = Instant::now();
2434    // Search for shreds with incompatible version in blockstore
2435    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2436
2437    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2438    for (slot, _meta) in slot_meta_iterator {
2439        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2440        for shred in &shreds {
2441            if shred.version() != expected_shred_version {
2442                return Ok(Some(shred.version()));
2443            }
2444        }
2445        if timer.elapsed() > TIMEOUT {
2446            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2447            break;
2448        }
2449    }
2450    Ok(None)
2451}
2452
2453/// If the blockstore contains any shreds with the incorrect shred version,
2454/// copy them to a backup blockstore and purge them from the actual blockstore.
2455fn cleanup_blockstore_incorrect_shred_versions(
2456    blockstore: &Blockstore,
2457    config: &ValidatorConfig,
2458    start_slot: Slot,
2459    expected_shred_version: u16,
2460) -> Result<(), BlockstoreError> {
2461    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2462        blockstore,
2463        start_slot,
2464        expected_shred_version,
2465    )?;
2466    let Some(incorrect_shred_version) = incorrect_shred_version else {
2467        info!("Only shreds with the correct version were found in the blockstore");
2468        return Ok(());
2469    };
2470
2471    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2472    let end_slot = blockstore.highest_slot()?.unwrap();
2473
2474    // Backing up the shreds that will be deleted from primary blockstore is
2475    // not critical, so swallow errors from backup blockstore operations.
2476    let backup_folder = format!(
2477        "{}_backup_{}_{}_{}",
2478        BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, incorrect_shred_version, start_slot, end_slot
2479    );
2480    match Blockstore::open_with_options(
2481        &blockstore.ledger_path().join(backup_folder),
2482        config.blockstore_options.clone(),
2483    ) {
2484        Ok(backup_blockstore) => {
2485            info!("Backing up slots from {start_slot} to {end_slot}");
2486            let mut timer = Measure::start("blockstore backup");
2487
2488            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2489            let mut print_timer = Instant::now();
2490            let mut num_slots_copied = 0;
2491            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2492            for (slot, _meta) in slot_meta_iterator {
2493                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2494                let shreds = shreds.into_iter().map(Cow::Owned);
2495                let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2496                num_slots_copied += 1;
2497
2498                if print_timer.elapsed() > PRINT_INTERVAL {
2499                    info!("Backed up {num_slots_copied} slots thus far");
2500                    print_timer = Instant::now();
2501                }
2502            }
2503
2504            timer.stop();
2505            info!("Backing up slots done. {timer}");
2506        }
2507        Err(err) => {
2508            warn!("Unable to backup shreds with incorrect shred version: {err}");
2509        }
2510    }
2511
2512    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2513    let mut timer = Measure::start("blockstore purge");
2514    blockstore.purge_from_next_slots(start_slot, end_slot);
2515    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2516    timer.stop();
2517    info!("Purging slots done. {timer}");
2518
2519    Ok(())
2520}
2521
2522fn initialize_rpc_transaction_history_services(
2523    blockstore: Arc<Blockstore>,
2524    exit: Arc<AtomicBool>,
2525    enable_rpc_transaction_history: bool,
2526    enable_extended_tx_metadata_storage: bool,
2527    transaction_notifier: Option<TransactionNotifierArc>,
2528) -> TransactionHistoryServices {
2529    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2530    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2531    let transaction_status_sender = Some(TransactionStatusSender {
2532        sender: transaction_status_sender,
2533    });
2534    let transaction_status_service = Some(TransactionStatusService::new(
2535        transaction_status_receiver,
2536        max_complete_transaction_status_slot.clone(),
2537        enable_rpc_transaction_history,
2538        transaction_notifier,
2539        blockstore.clone(),
2540        enable_extended_tx_metadata_storage,
2541        exit.clone(),
2542    ));
2543
2544    TransactionHistoryServices {
2545        transaction_status_sender,
2546        transaction_status_service,
2547        max_complete_transaction_status_slot,
2548    }
2549}
2550
2551#[derive(Error, Debug)]
2552pub enum ValidatorError {
2553    #[error("bank hash mismatch: actual={0}, expected={1}")]
2554    BankHashMismatch(Hash, Hash),
2555
2556    #[error("blockstore error: {0}")]
2557    Blockstore(#[source] BlockstoreError),
2558
2559    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2560    GenesisHashMismatch(Hash, Hash),
2561
2562    #[error(
2563        "ledger does not have enough data to wait for supermajority: \
2564        current slot={0}, needed slot={1}"
2565    )]
2566    NotEnoughLedgerData(Slot, Slot),
2567
2568    #[error("failed to open genesis: {0}")]
2569    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2570
2571    #[error("{0}")]
2572    Other(String),
2573
2574    #[error(
2575        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2576    )]
2577    PohTooSlow { mine: u64, target: u64 },
2578
2579    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2580    ShredVersionMismatch { actual: u16, expected: u16 },
2581
2582    #[error(transparent)]
2583    TraceError(#[from] TraceError),
2584
2585    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2586    WenRestartFinished,
2587}
2588
2589// Return if the validator waited on other nodes to start. In this case
2590// it should not wait for one of it's votes to land to produce blocks
2591// because if the whole network is waiting, then it will stall.
2592//
2593// Error indicates that a bad hash was encountered or another condition
2594// that is unrecoverable and the validator should exit.
2595fn wait_for_supermajority(
2596    config: &ValidatorConfig,
2597    process_blockstore: Option<&mut ProcessBlockStore>,
2598    bank_forks: &RwLock<BankForks>,
2599    cluster_info: &ClusterInfo,
2600    rpc_override_health_check: Arc<AtomicBool>,
2601    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2602) -> Result<bool, ValidatorError> {
2603    match config.wait_for_supermajority {
2604        None => Ok(false),
2605        Some(wait_for_supermajority_slot) => {
2606            if let Some(process_blockstore) = process_blockstore {
2607                process_blockstore
2608                    .process()
2609                    .map_err(ValidatorError::Other)?;
2610            }
2611
2612            let bank = bank_forks.read().unwrap().working_bank();
2613            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2614                std::cmp::Ordering::Less => return Ok(false),
2615                std::cmp::Ordering::Greater => {
2616                    return Err(ValidatorError::NotEnoughLedgerData(
2617                        bank.slot(),
2618                        wait_for_supermajority_slot,
2619                    ));
2620                }
2621                _ => {}
2622            }
2623
2624            if let Some(expected_bank_hash) = config.expected_bank_hash {
2625                if bank.hash() != expected_bank_hash {
2626                    return Err(ValidatorError::BankHashMismatch(
2627                        bank.hash(),
2628                        expected_bank_hash,
2629                    ));
2630                }
2631            }
2632
2633            for i in 1.. {
2634                let logging = i % 10 == 1;
2635                if logging {
2636                    info!(
2637                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2638                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2639                        bank.slot()
2640                    );
2641                }
2642
2643                let gossip_stake_percent =
2644                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2645
2646                *start_progress.write().unwrap() =
2647                    ValidatorStartProgress::WaitingForSupermajority {
2648                        slot: wait_for_supermajority_slot,
2649                        gossip_stake_percent,
2650                    };
2651
2652                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2653                    info!(
2654                        "Supermajority reached, {}% active stake detected, starting up now.",
2655                        gossip_stake_percent,
2656                    );
2657                    break;
2658                }
2659                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2660                // prevent load balancers from removing the node from their list of candidates during a
2661                // manual restart.
2662                rpc_override_health_check.store(true, Ordering::Relaxed);
2663                sleep(Duration::new(1, 0));
2664            }
2665            rpc_override_health_check.store(false, Ordering::Relaxed);
2666            Ok(true)
2667        }
2668    }
2669}
2670
2671// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2672fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2673    let mut online_stake = 0;
2674    let mut wrong_shred_stake = 0;
2675    let mut wrong_shred_nodes = vec![];
2676    let mut offline_stake = 0;
2677    let mut offline_nodes = vec![];
2678
2679    let mut total_activated_stake = 0;
2680    let now = timestamp();
2681    // Nodes contact infos are saved to disk and restored on validator startup.
2682    // Staked nodes entries will not expire until an epoch after. So it
2683    // is necessary here to filter for recent entries to establish liveness.
2684    let peers: HashMap<_, _> = cluster_info
2685        .all_tvu_peers()
2686        .into_iter()
2687        .filter(|node| {
2688            let age = now.saturating_sub(node.wallclock());
2689            // Contact infos are refreshed twice during this period.
2690            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2691        })
2692        .map(|node| (*node.pubkey(), node))
2693        .collect();
2694    let my_shred_version = cluster_info.my_shred_version();
2695    let my_id = cluster_info.id();
2696
2697    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2698        let activated_stake = *activated_stake;
2699        total_activated_stake += activated_stake;
2700
2701        if activated_stake == 0 {
2702            continue;
2703        }
2704        let vote_state_node_pubkey = *vote_account.node_pubkey();
2705
2706        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2707            if peer.shred_version() == my_shred_version {
2708                trace!(
2709                    "observed {} in gossip, (activated_stake={})",
2710                    vote_state_node_pubkey,
2711                    activated_stake
2712                );
2713                online_stake += activated_stake;
2714            } else {
2715                wrong_shred_stake += activated_stake;
2716                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2717            }
2718        } else if vote_state_node_pubkey == my_id {
2719            online_stake += activated_stake; // This node is online
2720        } else {
2721            offline_stake += activated_stake;
2722            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2723        }
2724    }
2725
2726    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2727    if log {
2728        info!(
2729            "{:.3}% of active stake visible in gossip",
2730            online_stake_percentage
2731        );
2732
2733        if !wrong_shred_nodes.is_empty() {
2734            info!(
2735                "{:.3}% of active stake has the wrong shred version in gossip",
2736                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2737            );
2738            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2739            for (stake, identity) in wrong_shred_nodes {
2740                info!(
2741                    "    {:.3}% - {}",
2742                    (stake as f64 / total_activated_stake as f64) * 100.,
2743                    identity
2744                );
2745            }
2746        }
2747
2748        if !offline_nodes.is_empty() {
2749            info!(
2750                "{:.3}% of active stake is not visible in gossip",
2751                (offline_stake as f64 / total_activated_stake as f64) * 100.
2752            );
2753            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2754            for (stake, identity) in offline_nodes {
2755                info!(
2756                    "    {:.3}% - {}",
2757                    (stake as f64 / total_activated_stake as f64) * 100.,
2758                    identity
2759                );
2760            }
2761        }
2762    }
2763
2764    online_stake_percentage as u64
2765}
2766
2767fn cleanup_accounts_paths(config: &ValidatorConfig) {
2768    for account_path in &config.account_paths {
2769        move_and_async_delete_path_contents(account_path);
2770    }
2771    if let Some(shrink_paths) = config
2772        .accounts_db_config
2773        .as_ref()
2774        .and_then(|config| config.shrink_paths.as_ref())
2775    {
2776        for shrink_path in shrink_paths {
2777            move_and_async_delete_path_contents(shrink_path);
2778        }
2779    }
2780}
2781
2782pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2783    // if the snapshot config is configured to *not* take snapshots, then it is valid
2784    if !snapshot_config.should_generate_snapshots() {
2785        return true;
2786    }
2787
2788    let full_snapshot_interval_slots = snapshot_config.full_snapshot_archive_interval_slots;
2789    let incremental_snapshot_interval_slots =
2790        snapshot_config.incremental_snapshot_archive_interval_slots;
2791
2792    if incremental_snapshot_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
2793        true
2794    } else if incremental_snapshot_interval_slots == 0 {
2795        false
2796    } else {
2797        full_snapshot_interval_slots > incremental_snapshot_interval_slots
2798    }
2799}
2800
2801#[cfg(test)]
2802mod tests {
2803    use {
2804        super::*,
2805        crossbeam_channel::{bounded, RecvTimeoutError},
2806        solana_entry::entry,
2807        solana_genesis_config::create_genesis_config,
2808        solana_gossip::contact_info::ContactInfo,
2809        solana_ledger::{
2810            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2811            get_tmp_ledger_path_auto_delete,
2812        },
2813        solana_poh_config::PohConfig,
2814        solana_sha256_hasher::hash,
2815        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2816        std::{fs::remove_dir_all, thread, time::Duration},
2817    };
2818
2819    #[test]
2820    fn validator_exit() {
2821        solana_logger::setup();
2822        let leader_keypair = Keypair::new();
2823        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2824
2825        let validator_keypair = Keypair::new();
2826        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2827        let genesis_config =
2828            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2829                .genesis_config;
2830        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2831
2832        let voting_keypair = Arc::new(Keypair::new());
2833        let config = ValidatorConfig {
2834            rpc_addrs: Some((
2835                validator_node.info.rpc().unwrap(),
2836                validator_node.info.rpc_pubsub().unwrap(),
2837            )),
2838            ..ValidatorConfig::default_for_test()
2839        };
2840        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2841        let validator = Validator::new(
2842            validator_node,
2843            Arc::new(validator_keypair),
2844            &validator_ledger_path,
2845            &voting_keypair.pubkey(),
2846            Arc::new(RwLock::new(vec![voting_keypair])),
2847            vec![leader_node.info],
2848            &config,
2849            true, // should_check_duplicate_instance
2850            None, // rpc_to_plugin_manager_receiver
2851            start_progress.clone(),
2852            SocketAddrSpace::Unspecified,
2853            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2854            Arc::new(RwLock::new(None)),
2855        )
2856        .expect("assume successful validator start");
2857        assert_eq!(
2858            *start_progress.read().unwrap(),
2859            ValidatorStartProgress::Running
2860        );
2861        validator.close();
2862        remove_dir_all(validator_ledger_path).unwrap();
2863    }
2864
2865    #[test]
2866    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2867        solana_logger::setup();
2868
2869        let ledger_path = get_tmp_ledger_path_auto_delete!();
2870        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2871
2872        let mut validator_config = ValidatorConfig::default_for_test();
2873        let mut hard_forks = HardForks::default();
2874        let mut root_slot;
2875
2876        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2877        root_slot = 10;
2878        validator_config.wait_for_supermajority = Some(root_slot);
2879        assert_eq!(
2880            should_cleanup_blockstore_incorrect_shred_versions(
2881                &validator_config,
2882                &blockstore,
2883                root_slot,
2884                &hard_forks
2885            )
2886            .unwrap(),
2887            Some(root_slot + 1)
2888        );
2889
2890        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
2891        // Arguably operator error to pass a value for wait_for_supermajority in this case
2892        root_slot = 15;
2893        assert_eq!(
2894            should_cleanup_blockstore_incorrect_shred_versions(
2895                &validator_config,
2896                &blockstore,
2897                root_slot,
2898                &hard_forks
2899            )
2900            .unwrap(),
2901            None,
2902        );
2903
2904        // Emulate cluster restart at slot 10
2905        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
2906        hard_forks.register(10);
2907        assert_eq!(
2908            should_cleanup_blockstore_incorrect_shred_versions(
2909                &validator_config,
2910                &blockstore,
2911                root_slot,
2912                &hard_forks
2913            )
2914            .unwrap(),
2915            None,
2916        );
2917
2918        // Insert some shreds at newer slots than hard fork
2919        let entries = entry::create_ticks(1, 0, Hash::default());
2920        for i in 20..35 {
2921            let shreds = blockstore::entries_to_test_shreds(
2922                &entries,
2923                i,     // slot
2924                i - 1, // parent_slot
2925                true,  // is_full_slot
2926                1,     // version
2927                true,  // merkle_variant
2928            );
2929            blockstore.insert_shreds(shreds, None, true).unwrap();
2930        }
2931
2932        // No check as all blockstore data is newer than latest hard fork
2933        assert_eq!(
2934            should_cleanup_blockstore_incorrect_shred_versions(
2935                &validator_config,
2936                &blockstore,
2937                root_slot,
2938                &hard_forks
2939            )
2940            .unwrap(),
2941            None,
2942        );
2943
2944        // Emulate cluster restart at slot 25
2945        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
2946        root_slot = 25;
2947        hard_forks.register(root_slot);
2948        validator_config.wait_for_supermajority = Some(root_slot);
2949        assert_eq!(
2950            should_cleanup_blockstore_incorrect_shred_versions(
2951                &validator_config,
2952                &blockstore,
2953                root_slot,
2954                &hard_forks
2955            )
2956            .unwrap(),
2957            Some(root_slot + 1),
2958        );
2959        validator_config.wait_for_supermajority = None;
2960        assert_eq!(
2961            should_cleanup_blockstore_incorrect_shred_versions(
2962                &validator_config,
2963                &blockstore,
2964                root_slot,
2965                &hard_forks
2966            )
2967            .unwrap(),
2968            Some(root_slot + 1),
2969        );
2970
2971        // Do check with advanced root slot, even without wait_for_supermajority set correctly
2972        // Check starts from latest hard fork + 1
2973        root_slot = 30;
2974        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2975        assert_eq!(
2976            should_cleanup_blockstore_incorrect_shred_versions(
2977                &validator_config,
2978                &blockstore,
2979                root_slot,
2980                &hard_forks
2981            )
2982            .unwrap(),
2983            Some(latest_hard_fork + 1),
2984        );
2985
2986        // Purge blockstore up to latest hard fork
2987        // No check since all blockstore data newer than latest hard fork
2988        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2989        assert_eq!(
2990            should_cleanup_blockstore_incorrect_shred_versions(
2991                &validator_config,
2992                &blockstore,
2993                root_slot,
2994                &hard_forks
2995            )
2996            .unwrap(),
2997            None,
2998        );
2999    }
3000
3001    #[test]
3002    fn test_cleanup_blockstore_incorrect_shred_versions() {
3003        solana_logger::setup();
3004
3005        let validator_config = ValidatorConfig::default_for_test();
3006        let ledger_path = get_tmp_ledger_path_auto_delete!();
3007        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3008
3009        let entries = entry::create_ticks(1, 0, Hash::default());
3010        for i in 1..10 {
3011            let shreds = blockstore::entries_to_test_shreds(
3012                &entries,
3013                i,     // slot
3014                i - 1, // parent_slot
3015                true,  // is_full_slot
3016                1,     // version
3017                true,  // merkle_variant
3018            );
3019            blockstore.insert_shreds(shreds, None, true).unwrap();
3020        }
3021
3022        // this purges and compacts all slots greater than or equal to 5
3023        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3024        // assert that slots less than 5 aren't affected
3025        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3026        for i in 5..10 {
3027            assert!(blockstore
3028                .get_data_shreds_for_slot(i, 0)
3029                .unwrap()
3030                .is_empty());
3031        }
3032    }
3033
3034    #[test]
3035    fn validator_parallel_exit() {
3036        let leader_keypair = Keypair::new();
3037        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3038        let genesis_config =
3039            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3040                .genesis_config;
3041
3042        let mut ledger_paths = vec![];
3043        let mut validators: Vec<Validator> = (0..2)
3044            .map(|_| {
3045                let validator_keypair = Keypair::new();
3046                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3047                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3048                ledger_paths.push(validator_ledger_path.clone());
3049                let vote_account_keypair = Keypair::new();
3050                let config = ValidatorConfig {
3051                    rpc_addrs: Some((
3052                        validator_node.info.rpc().unwrap(),
3053                        validator_node.info.rpc_pubsub().unwrap(),
3054                    )),
3055                    ..ValidatorConfig::default_for_test()
3056                };
3057                Validator::new(
3058                    validator_node,
3059                    Arc::new(validator_keypair),
3060                    &validator_ledger_path,
3061                    &vote_account_keypair.pubkey(),
3062                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3063                    vec![leader_node.info.clone()],
3064                    &config,
3065                    true, // should_check_duplicate_instance.
3066                    None, // rpc_to_plugin_manager_receiver
3067                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3068                    SocketAddrSpace::Unspecified,
3069                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3070                    Arc::new(RwLock::new(None)),
3071                )
3072                .expect("assume successful validator start")
3073            })
3074            .collect();
3075
3076        // Each validator can exit in parallel to speed many sequential calls to join`
3077        validators.iter_mut().for_each(|v| v.exit());
3078
3079        // spawn a new thread to wait for the join of the validator
3080        let (sender, receiver) = bounded(0);
3081        let _ = thread::spawn(move || {
3082            validators.into_iter().for_each(|validator| {
3083                validator.join();
3084            });
3085            sender.send(()).unwrap();
3086        });
3087
3088        let timeout = Duration::from_secs(60);
3089        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3090            panic!("timeout for shutting down validators",);
3091        }
3092
3093        for path in ledger_paths {
3094            remove_dir_all(path).unwrap();
3095        }
3096    }
3097
3098    #[test]
3099    fn test_wait_for_supermajority() {
3100        solana_logger::setup();
3101        let node_keypair = Arc::new(Keypair::new());
3102        let cluster_info = ClusterInfo::new(
3103            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3104            node_keypair,
3105            SocketAddrSpace::Unspecified,
3106        );
3107
3108        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3109        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3110        let mut config = ValidatorConfig::default_for_test();
3111        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3112        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3113
3114        assert!(!wait_for_supermajority(
3115            &config,
3116            None,
3117            &bank_forks,
3118            &cluster_info,
3119            rpc_override_health_check.clone(),
3120            &start_progress,
3121        )
3122        .unwrap());
3123
3124        // bank=0, wait=1, should fail
3125        config.wait_for_supermajority = Some(1);
3126        assert!(matches!(
3127            wait_for_supermajority(
3128                &config,
3129                None,
3130                &bank_forks,
3131                &cluster_info,
3132                rpc_override_health_check.clone(),
3133                &start_progress,
3134            ),
3135            Err(ValidatorError::NotEnoughLedgerData(_, _)),
3136        ));
3137
3138        // bank=1, wait=0, should pass, bank is past the wait slot
3139        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3140            bank_forks.read().unwrap().root_bank(),
3141            &Pubkey::default(),
3142            1,
3143        ));
3144        config.wait_for_supermajority = Some(0);
3145        assert!(!wait_for_supermajority(
3146            &config,
3147            None,
3148            &bank_forks,
3149            &cluster_info,
3150            rpc_override_health_check.clone(),
3151            &start_progress,
3152        )
3153        .unwrap());
3154
3155        // bank=1, wait=1, equal, but bad hash provided
3156        config.wait_for_supermajority = Some(1);
3157        config.expected_bank_hash = Some(hash(&[1]));
3158        assert!(matches!(
3159            wait_for_supermajority(
3160                &config,
3161                None,
3162                &bank_forks,
3163                &cluster_info,
3164                rpc_override_health_check,
3165                &start_progress,
3166            ),
3167            Err(ValidatorError::BankHashMismatch(_, _)),
3168        ));
3169    }
3170
3171    #[test]
3172    fn test_interval_check() {
3173        fn new_snapshot_config(
3174            full_snapshot_archive_interval_slots: Slot,
3175            incremental_snapshot_archive_interval_slots: Slot,
3176        ) -> SnapshotConfig {
3177            SnapshotConfig {
3178                full_snapshot_archive_interval_slots,
3179                incremental_snapshot_archive_interval_slots,
3180                ..SnapshotConfig::default()
3181            }
3182        }
3183
3184        assert!(is_snapshot_config_valid(&new_snapshot_config(300, 200)));
3185
3186        assert!(is_snapshot_config_valid(&new_snapshot_config(
3187            snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3188            snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS
3189        )));
3190        assert!(is_snapshot_config_valid(&new_snapshot_config(
3191            snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3192            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3193        )));
3194        assert!(is_snapshot_config_valid(&new_snapshot_config(
3195            snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3196            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3197        )));
3198        assert!(is_snapshot_config_valid(&new_snapshot_config(
3199            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3200            DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3201        )));
3202
3203        // Full snaphot intervals used to be required to be a multiple of
3204        // incremental snapshot intervals, but that's no longer the case
3205        // so test that the check is relaxed.
3206        assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3207        assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3208        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3209
3210        assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 100)));
3211        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 0)));
3212        assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 0)));
3213        assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3214        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3215        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3216
3217        assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3218        assert!(is_snapshot_config_valid(&SnapshotConfig {
3219            full_snapshot_archive_interval_slots: 37,
3220            incremental_snapshot_archive_interval_slots: 41,
3221            ..SnapshotConfig::new_load_only()
3222        }));
3223        assert!(is_snapshot_config_valid(&SnapshotConfig {
3224            full_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3225            incremental_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3226            ..SnapshotConfig::new_load_only()
3227        }));
3228    }
3229
3230    fn target_tick_duration() -> Duration {
3231        // DEFAULT_MS_PER_SLOT = 400
3232        // DEFAULT_TICKS_PER_SLOT = 64
3233        // MS_PER_TICK = 6
3234        //
3235        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3236        //
3237        // So, convert to microseconds first to avoid the integer rounding error
3238        let target_tick_duration_us =
3239            solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3240        assert_eq!(target_tick_duration_us, 6250);
3241        Duration::from_micros(target_tick_duration_us)
3242    }
3243
3244    #[test]
3245    fn test_poh_speed() {
3246        solana_logger::setup();
3247        let poh_config = PohConfig {
3248            target_tick_duration: target_tick_duration(),
3249            // make PoH rate really fast to cause the panic condition
3250            hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3251            ..PohConfig::default()
3252        };
3253        let genesis_config = GenesisConfig {
3254            poh_config,
3255            ..GenesisConfig::default()
3256        };
3257        let bank = Bank::new_for_tests(&genesis_config);
3258        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3259    }
3260
3261    #[test]
3262    fn test_poh_speed_no_hashes_per_tick() {
3263        solana_logger::setup();
3264        let poh_config = PohConfig {
3265            target_tick_duration: target_tick_duration(),
3266            hashes_per_tick: None,
3267            ..PohConfig::default()
3268        };
3269        let genesis_config = GenesisConfig {
3270            poh_config,
3271            ..GenesisConfig::default()
3272        };
3273        let bank = Bank::new_for_tests(&genesis_config);
3274        check_poh_speed(&bank, Some(10_000)).unwrap();
3275    }
3276}