Skip to main content

solana_core/
validator.rs

1//! The `validator` module hosts all the validator microservices.
2
3pub use solana_perf::report_target_features;
4use {
5    crate::{
6        admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7        banking_stage::{
8            transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
9        },
10        banking_trace::{self, BankingTracer, TraceError},
11        cluster_info_vote_listener::VoteTracker,
12        completed_data_sets_service::CompletedDataSetsService,
13        consensus::{
14            reconcile_blockstore_roots_with_external_source,
15            tower_storage::{NullTowerStorage, TowerStorage},
16            ExternalRootSource, Tower,
17        },
18        repair::{
19            self,
20            quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
21            repair_handler::RepairHandlerType,
22            serve_repair_service::ServeRepairService,
23        },
24        resource_limits::{
25            adjust_nofile_limit, validate_memlock_limit_for_disk_io, ResourceLimitError,
26        },
27        sample_performance_service::SamplePerformanceService,
28        sigverify,
29        snapshot_packager_service::SnapshotPackagerService,
30        stats_reporter_service::StatsReporterService,
31        system_monitor_service::{
32            verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
33        },
34        tpu::{ForwardingClientOption, Tpu, TpuSockets},
35        tvu::{Tvu, TvuConfig, TvuSockets},
36    },
37    agave_snapshots::{
38        snapshot_archive_info::SnapshotArchiveInfoGetter as _, snapshot_config::SnapshotConfig,
39        snapshot_hash::StartingSnapshotHashes, SnapshotInterval,
40    },
41    anyhow::{anyhow, Context, Result},
42    crossbeam_channel::{bounded, unbounded, Receiver},
43    quinn::Endpoint,
44    serde::{Deserialize, Serialize},
45    solana_account::ReadableAccount,
46    solana_accounts_db::{
47        accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
48        accounts_update_notifier_interface::AccountsUpdateNotifier,
49        utils::move_and_async_delete_path_contents,
50    },
51    solana_client::{
52        client_option::ClientOption,
53        connection_cache::{ConnectionCache, Protocol},
54    },
55    solana_clock::Slot,
56    solana_cluster_type::ClusterType,
57    solana_entry::poh::compute_hash_time,
58    solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
59    solana_genesis_config::GenesisConfig,
60    solana_genesis_utils::{
61        open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
62    },
63    solana_geyser_plugin_manager::{
64        geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
65    },
66    solana_gossip::{
67        cluster_info::{
68            ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
69            DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
70        },
71        contact_info::ContactInfo,
72        crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
73        gossip_service::GossipService,
74        node::{Node, NodeMultihoming},
75    },
76    solana_hard_forks::HardForks,
77    solana_hash::Hash,
78    solana_keypair::Keypair,
79    solana_ledger::{
80        bank_forks_utils,
81        blockstore::{
82            Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
83            MAX_REPLAY_WAKE_UP_SIGNALS,
84        },
85        blockstore_metric_report_service::BlockstoreMetricReportService,
86        blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
87        blockstore_processor::{self, TransactionStatusSender},
88        entry_notifier_interface::EntryNotifierArc,
89        entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
90        leader_schedule::FixedSchedule,
91        leader_schedule_cache::LeaderScheduleCache,
92        use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
93    },
94    solana_measure::measure::Measure,
95    solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
96    solana_poh::{
97        poh_controller::PohController,
98        poh_recorder::PohRecorder,
99        poh_service::{self, PohService},
100        record_channels::record_channels,
101        transaction_recorder::TransactionRecorder,
102    },
103    solana_pubkey::Pubkey,
104    solana_rayon_threadlimit::get_thread_count,
105    solana_rpc::{
106        max_slots::MaxSlots,
107        optimistically_confirmed_bank_tracker::{
108            BankNotificationSenderConfig, OptimisticallyConfirmedBank,
109            OptimisticallyConfirmedBankTracker,
110        },
111        rpc::JsonRpcConfig,
112        rpc_completed_slots_service::RpcCompletedSlotsService,
113        rpc_pubsub_service::{PubSubConfig, PubSubService},
114        rpc_service::{JsonRpcService, JsonRpcServiceConfig},
115        rpc_subscriptions::RpcSubscriptions,
116        transaction_notifier_interface::TransactionNotifierArc,
117        transaction_status_service::TransactionStatusService,
118    },
119    solana_runtime::{
120        accounts_background_service::{
121            AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
122            PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
123        },
124        bank::Bank,
125        bank_forks::BankForks,
126        commitment::BlockCommitmentCache,
127        dependency_tracker::DependencyTracker,
128        prioritization_fee_cache::PrioritizationFeeCache,
129        runtime_config::RuntimeConfig,
130        snapshot_bank_utils,
131        snapshot_controller::SnapshotController,
132        snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
133    },
134    solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
135    solana_shred_version::compute_shred_version,
136    solana_signer::Signer,
137    solana_streamer::{
138        nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
139        quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
140        socket::SocketAddrSpace,
141        streamer::StakedNodes,
142    },
143    solana_time_utils::timestamp,
144    solana_tpu_client::tpu_client::{
145        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
146    },
147    solana_turbine::{
148        self,
149        broadcast_stage::BroadcastStageType,
150        xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
151    },
152    solana_unified_scheduler_pool::DefaultSchedulerPool,
153    solana_validator_exit::Exit,
154    solana_vote_program::vote_state::VoteStateV4,
155    solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
156    std::{
157        borrow::Cow,
158        collections::{HashMap, HashSet},
159        net::{IpAddr, SocketAddr},
160        num::{NonZeroU64, NonZeroUsize},
161        path::{Path, PathBuf},
162        str::FromStr,
163        sync::{
164            atomic::{AtomicBool, AtomicU64, Ordering},
165            Arc, Mutex, RwLock,
166        },
167        thread::{sleep, Builder, JoinHandle},
168        time::{Duration, Instant},
169    },
170    strum::VariantNames,
171    strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
172    thiserror::Error,
173    tokio::runtime::Runtime as TokioRuntime,
174    tokio_util::sync::CancellationToken,
175};
176
177const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
178const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
179// Right now since we reuse the wait for supermajority code, the
180// following threshold should always greater than or equal to
181// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
182const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
183    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
184
185#[derive(
186    Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
187)]
188#[strum(serialize_all = "kebab-case")]
189pub enum BlockVerificationMethod {
190    BlockstoreProcessor,
191    #[default]
192    UnifiedScheduler,
193}
194
195impl BlockVerificationMethod {
196    pub const fn cli_names() -> &'static [&'static str] {
197        Self::VARIANTS
198    }
199
200    pub fn cli_message() -> &'static str {
201        "Switch transaction scheduling method for verifying ledger entries"
202    }
203}
204
205#[derive(
206    Clone,
207    Debug,
208    EnumString,
209    EnumVariantNames,
210    Default,
211    IntoStaticStr,
212    Display,
213    Serialize,
214    Deserialize,
215    PartialEq,
216    Eq,
217)]
218#[strum(serialize_all = "kebab-case")]
219#[serde(rename_all = "kebab-case")]
220pub enum BlockProductionMethod {
221    CentralScheduler,
222    #[default]
223    CentralSchedulerGreedy,
224}
225
226impl BlockProductionMethod {
227    pub const fn cli_names() -> &'static [&'static str] {
228        Self::VARIANTS
229    }
230
231    pub fn cli_message() -> &'static str {
232        "Switch transaction scheduling method for producing ledger entries"
233    }
234}
235
236#[derive(
237    Clone,
238    Debug,
239    EnumString,
240    EnumVariantNames,
241    Default,
242    IntoStaticStr,
243    Display,
244    Serialize,
245    Deserialize,
246    PartialEq,
247    Eq,
248)]
249#[strum(serialize_all = "kebab-case")]
250#[serde(rename_all = "kebab-case")]
251pub enum TransactionStructure {
252    Sdk,
253    #[default]
254    View,
255}
256
257impl TransactionStructure {
258    pub const fn cli_names() -> &'static [&'static str] {
259        Self::VARIANTS
260    }
261
262    pub fn cli_message() -> &'static str {
263        "DEPRECATED: has no impact on banking stage; will be removed in a future version"
264    }
265}
266
267#[derive(
268    Clone, Debug, EnumVariantNames, IntoStaticStr, Display, Serialize, Deserialize, PartialEq, Eq,
269)]
270#[strum(serialize_all = "kebab-case")]
271#[serde(rename_all = "kebab-case")]
272pub enum SchedulerPacing {
273    Disabled,
274    FillTimeMillis(NonZeroU64),
275}
276
277impl FromStr for SchedulerPacing {
278    type Err = String;
279
280    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
281        if s.eq_ignore_ascii_case("disabled") {
282            Ok(SchedulerPacing::Disabled)
283        } else {
284            match s.parse::<u64>() {
285                Ok(v) if v > 0 => Ok(SchedulerPacing::FillTimeMillis(
286                    NonZeroU64::new(v).ok_or_else(|| "value must be non-zero".to_string())?,
287                )),
288                _ => Err("value must be a positive integer or 'disabled'".to_string()),
289            }
290        }
291    }
292}
293
294impl SchedulerPacing {
295    pub fn fill_time(&self) -> Option<Duration> {
296        match self {
297            SchedulerPacing::Disabled => None,
298            SchedulerPacing::FillTimeMillis(millis) => Some(Duration::from_millis(millis.get())),
299        }
300    }
301}
302
303/// Configuration for the block generator invalidator for replay.
304#[derive(Clone, Debug)]
305pub struct GeneratorConfig {
306    pub accounts_path: String,
307    pub starting_keypairs: Arc<Vec<Keypair>>,
308}
309
310pub struct ValidatorConfig {
311    pub halt_at_slot: Option<Slot>,
312    pub expected_genesis_hash: Option<Hash>,
313    pub expected_bank_hash: Option<Hash>,
314    pub expected_shred_version: Option<u16>,
315    pub voting_disabled: bool,
316    pub account_paths: Vec<PathBuf>,
317    pub account_snapshot_paths: Vec<PathBuf>,
318    pub rpc_config: JsonRpcConfig,
319    /// Specifies which plugins to start up with
320    pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
321    pub geyser_plugin_always_enabled: bool,
322    pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
323    pub pubsub_config: PubSubConfig,
324    pub snapshot_config: SnapshotConfig,
325    pub max_ledger_shreds: Option<u64>,
326    pub blockstore_options: BlockstoreOptions,
327    pub broadcast_stage_type: BroadcastStageType,
328    pub turbine_disabled: Arc<AtomicBool>,
329    pub fixed_leader_schedule: Option<FixedSchedule>,
330    pub wait_for_supermajority: Option<Slot>,
331    pub new_hard_forks: Option<Vec<Slot>>,
332    pub known_validators: Option<HashSet<Pubkey>>, // None = trust all
333    pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
334    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, // Empty = repair with all
335    pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
336    pub max_genesis_archive_unpacked_size: u64,
337    /// Run PoH, transaction signature and other transaction verifications during blockstore
338    /// processing.
339    pub run_verification: bool,
340    pub require_tower: bool,
341    pub tower_storage: Arc<dyn TowerStorage>,
342    pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
343    pub contact_debug_interval: u64,
344    pub contact_save_interval: u64,
345    pub send_transaction_service_config: SendTransactionServiceConfig,
346    pub no_poh_speed_test: bool,
347    pub no_os_memory_stats_reporting: bool,
348    pub no_os_network_stats_reporting: bool,
349    pub no_os_cpu_stats_reporting: bool,
350    pub no_os_disk_stats_reporting: bool,
351    pub enforce_ulimit_nofile: bool,
352    pub poh_pinned_cpu_core: usize,
353    pub poh_hashes_per_batch: u64,
354    pub process_ledger_before_services: bool,
355    pub accounts_db_config: AccountsDbConfig,
356    pub warp_slot: Option<Slot>,
357    pub accounts_db_skip_shrink: bool,
358    pub accounts_db_force_initial_clean: bool,
359    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
360    pub validator_exit: Arc<RwLock<Exit>>,
361    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
362    pub no_wait_for_vote_to_start_leader: bool,
363    pub wait_to_vote_slot: Option<Slot>,
364    pub runtime_config: RuntimeConfig,
365    pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
366    pub block_verification_method: BlockVerificationMethod,
367    pub block_production_method: BlockProductionMethod,
368    pub block_production_num_workers: NonZeroUsize,
369    pub block_production_scheduler_config: SchedulerConfig,
370    pub enable_block_production_forwarding: bool,
371    pub generator_config: Option<GeneratorConfig>,
372    pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
373    pub wen_restart_proto_path: Option<PathBuf>,
374    pub wen_restart_coordinator: Option<Pubkey>,
375    pub unified_scheduler_handler_threads: Option<usize>,
376    pub ip_echo_server_threads: NonZeroUsize,
377    pub rayon_global_threads: NonZeroUsize,
378    pub replay_forks_threads: NonZeroUsize,
379    pub replay_transactions_threads: NonZeroUsize,
380    pub tvu_shred_sigverify_threads: NonZeroUsize,
381    pub delay_leader_block_for_pending_fork: bool,
382    pub use_tpu_client_next: bool,
383    pub retransmit_xdp: Option<XdpConfig>,
384    pub repair_handler_type: RepairHandlerType,
385}
386
387impl ValidatorConfig {
388    pub fn default_for_test() -> Self {
389        let max_thread_count =
390            NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
391
392        Self {
393            halt_at_slot: None,
394            expected_genesis_hash: None,
395            expected_bank_hash: None,
396            expected_shred_version: None,
397            voting_disabled: false,
398            max_ledger_shreds: None,
399            blockstore_options: BlockstoreOptions::default_for_tests(),
400            account_paths: Vec::new(),
401            account_snapshot_paths: Vec::new(),
402            rpc_config: JsonRpcConfig::default_for_test(),
403            on_start_geyser_plugin_config_files: None,
404            geyser_plugin_always_enabled: false,
405            rpc_addrs: None,
406            pubsub_config: PubSubConfig::default(),
407            snapshot_config: SnapshotConfig::new_load_only(),
408            broadcast_stage_type: BroadcastStageType::Standard,
409            turbine_disabled: Arc::<AtomicBool>::default(),
410            fixed_leader_schedule: None,
411            wait_for_supermajority: None,
412            new_hard_forks: None,
413            known_validators: None,
414            repair_validators: None,
415            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
416            gossip_validators: None,
417            max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
418            run_verification: true,
419            require_tower: false,
420            tower_storage: Arc::new(NullTowerStorage::default()),
421            debug_keys: None,
422            contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
423            contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
424            send_transaction_service_config: SendTransactionServiceConfig::default(),
425            no_poh_speed_test: true,
426            no_os_memory_stats_reporting: true,
427            no_os_network_stats_reporting: true,
428            no_os_cpu_stats_reporting: true,
429            no_os_disk_stats_reporting: true,
430            // No need to enforce nofile limit in tests
431            enforce_ulimit_nofile: false,
432            poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
433            poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
434            process_ledger_before_services: false,
435            warp_slot: None,
436            accounts_db_skip_shrink: false,
437            accounts_db_force_initial_clean: false,
438            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
439            validator_exit: Arc::new(RwLock::new(Exit::default())),
440            validator_exit_backpressure: HashMap::default(),
441            no_wait_for_vote_to_start_leader: true,
442            accounts_db_config: ACCOUNTS_DB_CONFIG_FOR_TESTING,
443            wait_to_vote_slot: None,
444            runtime_config: RuntimeConfig::default(),
445            banking_trace_dir_byte_limit: 0,
446            block_verification_method: BlockVerificationMethod::default(),
447            block_production_method: BlockProductionMethod::default(),
448            block_production_num_workers: BankingStage::default_num_workers(),
449            block_production_scheduler_config: SchedulerConfig::default(),
450            // enable forwarding by default for tests
451            enable_block_production_forwarding: true,
452            generator_config: None,
453            use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
454            wen_restart_proto_path: None,
455            wen_restart_coordinator: None,
456            unified_scheduler_handler_threads: None,
457            ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
458            rayon_global_threads: max_thread_count,
459            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
460            replay_transactions_threads: max_thread_count,
461            tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
462                .expect("thread count is non-zero"),
463            delay_leader_block_for_pending_fork: false,
464            use_tpu_client_next: true,
465            retransmit_xdp: None,
466            repair_handler_type: RepairHandlerType::default(),
467        }
468    }
469
470    pub fn enable_default_rpc_block_subscribe(&mut self) {
471        let pubsub_config = PubSubConfig {
472            enable_block_subscription: true,
473            ..PubSubConfig::default()
474        };
475        let rpc_config = JsonRpcConfig {
476            enable_rpc_transaction_history: true,
477            ..JsonRpcConfig::default_for_test()
478        };
479
480        self.pubsub_config = pubsub_config;
481        self.rpc_config = rpc_config;
482    }
483}
484
485// `ValidatorStartProgress` contains status information that is surfaced to the node operator over
486// the admin RPC channel to help them to follow the general progress of node startup without
487// having to watch log messages.
488#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
489pub enum ValidatorStartProgress {
490    #[default]
491    Initializing, // Catch all, default state
492    SearchingForRpcService,
493    DownloadingSnapshot {
494        slot: Slot,
495        rpc_addr: SocketAddr,
496    },
497    CleaningBlockStore,
498    CleaningAccounts,
499    LoadingLedger,
500    ProcessingLedger {
501        slot: Slot,
502        max_slot: Slot,
503    },
504    StartingServices,
505    Halted, // Validator halted due to `--dev-halt-at-slot` argument
506    WaitingForSupermajority {
507        slot: Slot,
508        gossip_stake_percent: u64,
509    },
510
511    // `Running` is the terminal state once the validator fully starts and all services are
512    // operational
513    Running,
514}
515
516struct BlockstoreRootScan {
517    thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
518}
519
520impl BlockstoreRootScan {
521    fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
522        let thread = if config.rpc_addrs.is_some()
523            && config.rpc_config.enable_rpc_transaction_history
524            && config.rpc_config.rpc_scan_and_fix_roots
525        {
526            Some(
527                Builder::new()
528                    .name("solBStoreRtScan".to_string())
529                    .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
530                    .unwrap(),
531            )
532        } else {
533            None
534        };
535        Self { thread }
536    }
537
538    fn join(self) {
539        if let Some(blockstore_root_scan) = self.thread {
540            if let Err(err) = blockstore_root_scan.join() {
541                warn!("blockstore_root_scan failed to join {err:?}");
542            }
543        }
544    }
545}
546
547#[derive(Default)]
548struct TransactionHistoryServices {
549    transaction_status_sender: Option<TransactionStatusSender>,
550    transaction_status_service: Option<TransactionStatusService>,
551    max_complete_transaction_status_slot: Arc<AtomicU64>,
552}
553
554/// A struct easing passing Validator TPU Configurations
555pub struct ValidatorTpuConfig {
556    /// Controls if to use QUIC for sending regular TPU transaction
557    pub use_quic: bool,
558    /// Controls if to use QUIC for sending TPU votes
559    pub vote_use_quic: bool,
560    /// Controls the connection cache pool size
561    pub tpu_connection_pool_size: usize,
562    /// Controls if to enable UDP for TPU tansactions.
563    pub tpu_enable_udp: bool,
564    /// QUIC server config for regular TPU
565    pub tpu_quic_server_config: SwQosQuicStreamerConfig,
566    /// QUIC server config for TPU forward
567    pub tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
568    /// QUIC server config for Vote
569    pub vote_quic_server_config: SimpleQosQuicStreamerConfig,
570}
571
572impl ValidatorTpuConfig {
573    /// A convenient function to build a ValidatorTpuConfig for testing with good
574    /// default.
575    pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
576        let tpu_quic_server_config = SwQosQuicStreamerConfig {
577            quic_streamer_config: QuicStreamerConfig {
578                max_connections_per_ipaddr_per_min: 32,
579                accumulator_channel_size: 100_000, // smaller channel size for faster test
580                ..Default::default()
581            },
582            qos_config: SwQosConfig::default(),
583        };
584
585        let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
586            quic_streamer_config: QuicStreamerConfig {
587                max_connections_per_ipaddr_per_min: 32,
588                max_unstaked_connections: 0,
589                accumulator_channel_size: 100_000, // smaller channel size for faster test
590                ..Default::default()
591            },
592            qos_config: SwQosConfig::default(),
593        };
594
595        // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
596        let vote_quic_server_config = SimpleQosQuicStreamerConfig {
597            quic_streamer_config: QuicStreamerConfig {
598                max_connections_per_ipaddr_per_min: 32,
599                max_unstaked_connections: 0,
600                accumulator_channel_size: 100_000, // smaller channel size for faster test
601                ..Default::default()
602            },
603            qos_config: SimpleQosConfig::default(),
604        };
605
606        ValidatorTpuConfig {
607            use_quic: DEFAULT_TPU_USE_QUIC,
608            vote_use_quic: DEFAULT_VOTE_USE_QUIC,
609            tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
610            tpu_enable_udp,
611            tpu_quic_server_config,
612            tpu_fwd_quic_server_config,
613            vote_quic_server_config,
614        }
615    }
616}
617
618pub struct Validator {
619    validator_exit: Arc<RwLock<Exit>>,
620    json_rpc_service: Option<JsonRpcService>,
621    pubsub_service: Option<PubSubService>,
622    rpc_completed_slots_service: Option<JoinHandle<()>>,
623    optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
624    transaction_status_service: Option<TransactionStatusService>,
625    entry_notifier_service: Option<EntryNotifierService>,
626    system_monitor_service: Option<SystemMonitorService>,
627    sample_performance_service: Option<SamplePerformanceService>,
628    stats_reporter_service: StatsReporterService,
629    gossip_service: GossipService,
630    serve_repair_service: ServeRepairService,
631    completed_data_sets_service: Option<CompletedDataSetsService>,
632    snapshot_packager_service: Option<SnapshotPackagerService>,
633    poh_recorder: Arc<RwLock<PohRecorder>>,
634    poh_service: PohService,
635    tpu: Tpu,
636    tvu: Tvu,
637    ip_echo_server: Option<solana_net_utils::IpEchoServer>,
638    pub cluster_info: Arc<ClusterInfo>,
639    pub bank_forks: Arc<RwLock<BankForks>>,
640    pub blockstore: Arc<Blockstore>,
641    geyser_plugin_service: Option<GeyserPluginService>,
642    blockstore_metric_report_service: BlockstoreMetricReportService,
643    accounts_background_service: AccountsBackgroundService,
644    turbine_quic_endpoint: Option<Endpoint>,
645    turbine_quic_endpoint_runtime: Option<TokioRuntime>,
646    turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
647    repair_quic_endpoints: Option<[Endpoint; 3]>,
648    repair_quic_endpoints_runtime: Option<TokioRuntime>,
649    repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
650    xdp_retransmitter: Option<XdpRetransmitter>,
651    // This runtime is used to run the client owned by SendTransactionService.
652    // We don't wait for its JoinHandle here because ownership and shutdown
653    // are managed elsewhere. This variable is intentionally unused.
654    _tpu_client_next_runtime: Option<TokioRuntime>,
655}
656
657impl Validator {
658    #[allow(clippy::too_many_arguments)]
659    pub fn new(
660        mut node: Node,
661        identity_keypair: Arc<Keypair>,
662        ledger_path: &Path,
663        vote_account: &Pubkey,
664        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
665        cluster_entrypoints: Vec<ContactInfo>,
666        config: &ValidatorConfig,
667        should_check_duplicate_instance: bool,
668        rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
669        start_progress: Arc<RwLock<ValidatorStartProgress>>,
670        socket_addr_space: SocketAddrSpace,
671        tpu_config: ValidatorTpuConfig,
672        admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
673    ) -> Result<Self> {
674        #[cfg(debug_assertions)]
675        const DEBUG_ASSERTION_STATUS: &str = "enabled";
676        #[cfg(not(debug_assertions))]
677        const DEBUG_ASSERTION_STATUS: &str = "disabled";
678        info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
679
680        let ValidatorTpuConfig {
681            use_quic,
682            vote_use_quic,
683            tpu_connection_pool_size,
684            tpu_enable_udp,
685            tpu_quic_server_config,
686            tpu_fwd_quic_server_config,
687            vote_quic_server_config,
688        } = tpu_config;
689
690        let start_time = Instant::now();
691
692        adjust_nofile_limit(config.enforce_ulimit_nofile)?;
693
694        // Initialize the global rayon pool first to ensure the value in config
695        // is honored. Otherwise, some code accessing the global pool could
696        // cause it to get initialized with Rayon's default (not ours)
697        if rayon::ThreadPoolBuilder::new()
698            .thread_name(|i| format!("solRayonGlob{i:02}"))
699            .num_threads(config.rayon_global_threads.get())
700            .build_global()
701            .is_err()
702        {
703            warn!("Rayon global thread pool already initialized");
704        }
705
706        let id = identity_keypair.pubkey();
707        assert_eq!(&id, node.info.pubkey());
708
709        info!("identity pubkey: {id}");
710        info!("vote account pubkey: {vote_account}");
711
712        if !config.no_os_network_stats_reporting {
713            verify_net_stats_access().map_err(|e| {
714                ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
715            })?;
716        }
717
718        let mut bank_notification_senders = Vec::new();
719
720        let exit = Arc::new(AtomicBool::new(false));
721
722        let geyser_plugin_config_files = config
723            .on_start_geyser_plugin_config_files
724            .as_ref()
725            .map(Cow::Borrowed)
726            .or_else(|| {
727                config
728                    .geyser_plugin_always_enabled
729                    .then_some(Cow::Owned(vec![]))
730            });
731        let geyser_plugin_service =
732            if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
733                let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
734                bank_notification_senders.push(confirmed_bank_sender);
735                let rpc_to_plugin_manager_receiver_and_exit =
736                    rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
737                Some(
738                    GeyserPluginService::new_with_receiver(
739                        confirmed_bank_receiver,
740                        config.geyser_plugin_always_enabled,
741                        geyser_plugin_config_files.as_ref(),
742                        rpc_to_plugin_manager_receiver_and_exit,
743                    )
744                    .map_err(|err| {
745                        ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
746                    })?,
747                )
748            } else {
749                None
750            };
751
752        if config.voting_disabled {
753            warn!("voting disabled");
754            authorized_voter_keypairs.write().unwrap().clear();
755        } else {
756            for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
757                warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
758            }
759        }
760
761        for cluster_entrypoint in &cluster_entrypoints {
762            info!("entrypoint: {cluster_entrypoint:?}");
763        }
764
765        if solana_perf::perf_libs::api().is_some() {
766            info!("Initializing sigverify, this could take a while...");
767        } else {
768            info!("Initializing sigverify...");
769        }
770        sigverify::init();
771        info!("Initializing sigverify done.");
772
773        validate_memlock_limit_for_disk_io(config.accounts_db_config.memlock_budget_size)?;
774
775        if !ledger_path.is_dir() {
776            return Err(anyhow!(
777                "ledger directory does not exist or is not accessible: {ledger_path:?}"
778            ));
779        }
780        let genesis_config = load_genesis(config, ledger_path)?;
781        metrics_config_sanity_check(genesis_config.cluster_type)?;
782
783        info!("Cleaning accounts paths..");
784        *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
785        let mut timer = Measure::start("clean_accounts_paths");
786        cleanup_accounts_paths(config);
787        timer.stop();
788        info!("Cleaning accounts paths done. {timer}");
789
790        snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
791        snapshot_utils::purge_old_bank_snapshots_at_startup(
792            &config.snapshot_config.bank_snapshots_dir,
793        );
794
795        info!("Cleaning orphaned account snapshot directories..");
796        let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
797        clean_orphaned_account_snapshot_dirs(
798            &config.snapshot_config.bank_snapshots_dir,
799            &config.account_snapshot_paths,
800        )
801        .context("failed to clean orphaned account snapshot directories")?;
802        timer.stop();
803        info!("Cleaning orphaned account snapshot directories done. {timer}");
804
805        // token used to cancel tpu-client-next and streamer.
806        let cancel = CancellationToken::new();
807        {
808            let exit = exit.clone();
809            config
810                .validator_exit
811                .write()
812                .unwrap()
813                .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
814            let cancel = cancel.clone();
815            config
816                .validator_exit
817                .write()
818                .unwrap()
819                .register_exit(Box::new(move || cancel.cancel()));
820        }
821
822        let (
823            accounts_update_notifier,
824            transaction_notifier,
825            entry_notifier,
826            block_metadata_notifier,
827            slot_status_notifier,
828        ) = if let Some(service) = &geyser_plugin_service {
829            (
830                service.get_accounts_update_notifier(),
831                service.get_transaction_notifier(),
832                service.get_entry_notifier(),
833                service.get_block_metadata_notifier(),
834                service.get_slot_status_notifier(),
835            )
836        } else {
837            (None, None, None, None, None)
838        };
839
840        info!(
841            "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
842             entry_notifier: {}",
843            accounts_update_notifier.is_some(),
844            transaction_notifier.is_some(),
845            entry_notifier.is_some()
846        );
847
848        let system_monitor_service = Some(SystemMonitorService::new(
849            exit.clone(),
850            SystemMonitorStatsReportConfig {
851                report_os_memory_stats: !config.no_os_memory_stats_reporting,
852                report_os_network_stats: !config.no_os_network_stats_reporting,
853                report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
854                report_os_disk_stats: !config.no_os_disk_stats_reporting,
855            },
856        ));
857
858        let dependency_tracker = Arc::new(DependencyTracker::default());
859
860        let (
861            bank_forks,
862            blockstore,
863            original_blockstore_root,
864            ledger_signal_receiver,
865            leader_schedule_cache,
866            starting_snapshot_hashes,
867            TransactionHistoryServices {
868                transaction_status_sender,
869                transaction_status_service,
870                max_complete_transaction_status_slot,
871            },
872            blockstore_process_options,
873            blockstore_root_scan,
874            pruned_banks_receiver,
875            entry_notifier_service,
876        ) = load_blockstore(
877            config,
878            ledger_path,
879            &genesis_config,
880            exit.clone(),
881            &start_progress,
882            accounts_update_notifier,
883            transaction_notifier,
884            entry_notifier,
885            config
886                .rpc_addrs
887                .is_some()
888                .then(|| dependency_tracker.clone()),
889        )
890        .map_err(ValidatorError::Other)?;
891
892        if !config.no_poh_speed_test {
893            check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
894        }
895
896        let (root_slot, hard_forks) = {
897            let root_bank = bank_forks.read().unwrap().root_bank();
898            (root_bank.slot(), root_bank.hard_forks())
899        };
900        let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
901        info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
902
903        if let Some(expected_shred_version) = config.expected_shred_version {
904            if expected_shred_version != shred_version {
905                return Err(ValidatorError::ShredVersionMismatch {
906                    actual: shred_version,
907                    expected: expected_shred_version,
908                }
909                .into());
910            }
911        }
912
913        if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
914            config,
915            &blockstore,
916            root_slot,
917            &hard_forks,
918        )? {
919            *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
920            cleanup_blockstore_incorrect_shred_versions(
921                &blockstore,
922                config,
923                start_slot,
924                shred_version,
925            )?;
926        } else {
927            info!("Skipping the blockstore check for shreds with incorrect version");
928        }
929
930        node.info.set_shred_version(shred_version);
931        node.info.set_wallclock(timestamp());
932        Self::print_node_info(&node);
933
934        let mut cluster_info = ClusterInfo::new(
935            node.info.clone(),
936            identity_keypair.clone(),
937            socket_addr_space,
938        );
939        cluster_info.set_contact_debug_interval(config.contact_debug_interval);
940        if let Some(known_validators) = &config.known_validators {
941            cluster_info
942                .set_trim_keep_pubkeys(known_validators.iter().copied())
943                .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
944        }
945        cluster_info.set_entrypoints(cluster_entrypoints);
946        cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
947        cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
948        let cluster_info = Arc::new(cluster_info);
949        let node_multihoming = Arc::new(NodeMultihoming::from(&node));
950
951        assert!(is_snapshot_config_valid(&config.snapshot_config));
952
953        let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
954        let snapshot_controller = Arc::new(SnapshotController::new(
955            snapshot_request_sender.clone(),
956            config.snapshot_config.clone(),
957            bank_forks.read().unwrap().root(),
958        ));
959
960        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
961        let snapshot_packager_service = if snapshot_controller
962            .snapshot_config()
963            .should_generate_snapshots()
964        {
965            let exit_backpressure = config
966                .validator_exit_backpressure
967                .get(SnapshotPackagerService::NAME)
968                .cloned();
969            let enable_gossip_push = true;
970            let snapshot_packager_service = SnapshotPackagerService::new(
971                pending_snapshot_packages.clone(),
972                starting_snapshot_hashes,
973                exit.clone(),
974                exit_backpressure,
975                cluster_info.clone(),
976                snapshot_controller.clone(),
977                enable_gossip_push,
978            );
979            Some(snapshot_packager_service)
980        } else {
981            None
982        };
983
984        let snapshot_request_handler = SnapshotRequestHandler {
985            snapshot_controller: snapshot_controller.clone(),
986            snapshot_request_receiver,
987            pending_snapshot_packages,
988        };
989        let pruned_banks_request_handler = PrunedBanksRequestHandler {
990            pruned_banks_receiver,
991        };
992        let accounts_background_service = AccountsBackgroundService::new(
993            bank_forks.clone(),
994            exit.clone(),
995            AbsRequestHandlers {
996                snapshot_request_handler,
997                pruned_banks_request_handler,
998            },
999        );
1000        info!(
1001            "Using: block-verification-method: {}, block-production-method: {}",
1002            config.block_verification_method, config.block_production_method,
1003        );
1004
1005        let (replay_vote_sender, replay_vote_receiver) = unbounded();
1006
1007        // block min prioritization fee cache should be readable by RPC, and writable by validator
1008        // (by both replay stage and banking stage)
1009        let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
1010
1011        let leader_schedule_cache = Arc::new(leader_schedule_cache);
1012        let (poh_recorder, entry_receiver) = {
1013            let bank = &bank_forks.read().unwrap().working_bank();
1014            PohRecorder::new_with_clear_signal(
1015                bank.tick_height(),
1016                bank.last_blockhash(),
1017                bank.clone(),
1018                None,
1019                bank.ticks_per_slot(),
1020                config.delay_leader_block_for_pending_fork,
1021                blockstore.clone(),
1022                blockstore.get_new_shred_signal(0),
1023                &leader_schedule_cache,
1024                &genesis_config.poh_config,
1025                exit.clone(),
1026            )
1027        };
1028        let (record_sender, record_receiver) = record_channels(transaction_status_sender.is_some());
1029        let transaction_recorder = TransactionRecorder::new(record_sender);
1030        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
1031        let (poh_controller, poh_service_message_receiver) = PohController::new();
1032
1033        let (banking_tracer, tracer_thread) =
1034            BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
1035                &blockstore.banking_trace_path(),
1036                exit.clone(),
1037                config.banking_trace_dir_byte_limit,
1038            )))?;
1039        if banking_tracer.is_enabled() {
1040            info!(
1041                "Enabled banking trace (dir_byte_limit: {})",
1042                config.banking_trace_dir_byte_limit
1043            );
1044        } else {
1045            info!("Disabled banking trace");
1046        }
1047        let banking_tracer_channels = banking_tracer.create_channels(false);
1048
1049        match &config.block_verification_method {
1050            BlockVerificationMethod::BlockstoreProcessor => {
1051                info!("no scheduler pool is installed for block verification...");
1052                if let Some(count) = config.unified_scheduler_handler_threads {
1053                    warn!(
1054                        "--unified-scheduler-handler-threads={count} is ignored because unified \
1055                         scheduler isn't enabled"
1056                    );
1057                }
1058            }
1059            BlockVerificationMethod::UnifiedScheduler => {
1060                let scheduler_pool = DefaultSchedulerPool::new_dyn(
1061                    config.unified_scheduler_handler_threads,
1062                    config.runtime_config.log_messages_bytes_limit,
1063                    transaction_status_sender.clone(),
1064                    Some(replay_vote_sender.clone()),
1065                    prioritization_fee_cache.clone(),
1066                );
1067                bank_forks
1068                    .write()
1069                    .unwrap()
1070                    .install_scheduler_pool(scheduler_pool);
1071            }
1072        }
1073
1074        let entry_notification_sender = entry_notifier_service
1075            .as_ref()
1076            .map(|service| service.sender());
1077        let mut process_blockstore = ProcessBlockStore::new(
1078            &id,
1079            vote_account,
1080            &start_progress,
1081            &blockstore,
1082            original_blockstore_root,
1083            &bank_forks,
1084            &leader_schedule_cache,
1085            &blockstore_process_options,
1086            transaction_status_sender.as_ref(),
1087            entry_notification_sender,
1088            blockstore_root_scan,
1089            &snapshot_controller,
1090            config,
1091        );
1092
1093        maybe_warp_slot(
1094            config,
1095            &mut process_blockstore,
1096            ledger_path,
1097            &bank_forks,
1098            &leader_schedule_cache,
1099            &snapshot_controller,
1100        )
1101        .map_err(ValidatorError::Other)?;
1102
1103        if config.process_ledger_before_services {
1104            process_blockstore
1105                .process()
1106                .map_err(ValidatorError::Other)?;
1107        }
1108        *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1109
1110        let sample_performance_service =
1111            if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1112                Some(SamplePerformanceService::new(
1113                    &bank_forks,
1114                    blockstore.clone(),
1115                    exit.clone(),
1116                ))
1117            } else {
1118                None
1119            };
1120
1121        let mut block_commitment_cache = BlockCommitmentCache::default();
1122        let bank_forks_guard = bank_forks.read().unwrap();
1123        block_commitment_cache.initialize_slots(
1124            bank_forks_guard.working_bank().slot(),
1125            bank_forks_guard.root(),
1126        );
1127        drop(bank_forks_guard);
1128        let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1129
1130        let optimistically_confirmed_bank =
1131            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1132
1133        let max_slots = Arc::new(MaxSlots::default());
1134
1135        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1136
1137        let mut tpu_transactions_forwards_client_sockets =
1138            Some(node.sockets.tpu_transaction_forwarding_clients);
1139        let connection_cache = match (config.use_tpu_client_next, use_quic) {
1140            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1141                "connection_cache_tpu_quic",
1142                tpu_connection_pool_size,
1143                Some({
1144                    // this conversion is not beautiful but rust does not allow popping single
1145                    // elements from a boxed slice
1146                    let socketbox: Box<[_; 1]> = tpu_transactions_forwards_client_sockets
1147                        .take()
1148                        .unwrap()
1149                        .try_into()
1150                        .expect("Multihoming support for connection cache is not available");
1151                    let [sock] = *socketbox;
1152                    sock
1153                }),
1154                Some((
1155                    &identity_keypair,
1156                    node.info
1157                        .tpu(Protocol::UDP)
1158                        .ok_or_else(|| {
1159                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1160                        })?
1161                        .ip(),
1162                )),
1163                Some((&staked_nodes, &identity_keypair.pubkey())),
1164            ))),
1165            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1166                "connection_cache_tpu_udp",
1167                tpu_connection_pool_size,
1168            ))),
1169            (true, _) => None,
1170        };
1171
1172        let vote_connection_cache = if vote_use_quic {
1173            let vote_connection_cache = ConnectionCache::new_with_client_options(
1174                "connection_cache_vote_quic",
1175                tpu_connection_pool_size,
1176                Some(node.sockets.quic_vote_client),
1177                Some((
1178                    &identity_keypair,
1179                    node.info
1180                        .tpu_vote(Protocol::QUIC)
1181                        .ok_or_else(|| {
1182                            ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1183                        })?
1184                        .ip(),
1185                )),
1186                Some((&staked_nodes, &identity_keypair.pubkey())),
1187            );
1188            Arc::new(vote_connection_cache)
1189        } else {
1190            Arc::new(ConnectionCache::with_udp(
1191                "connection_cache_vote_udp",
1192                tpu_connection_pool_size,
1193            ))
1194        };
1195
1196        // test-validator crate may start the validator in a tokio runtime
1197        // context which forces us to use the same runtime because a nested
1198        // runtime will cause panic at drop. Outside test-validator crate, we
1199        // always need a tokio runtime (and the respective handle) to initialize
1200        // the turbine QUIC endpoint.
1201        let current_runtime_handle = tokio::runtime::Handle::try_current();
1202        let tpu_client_next_runtime =
1203            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1204                tokio::runtime::Builder::new_multi_thread()
1205                    .enable_all()
1206                    .worker_threads(2)
1207                    .thread_name("solTpuClientRt")
1208                    .build()
1209                    .unwrap()
1210            });
1211
1212        let rpc_override_health_check =
1213            Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1214        let (
1215            json_rpc_service,
1216            rpc_subscriptions,
1217            pubsub_service,
1218            completed_data_sets_sender,
1219            completed_data_sets_service,
1220            rpc_completed_slots_service,
1221            optimistically_confirmed_bank_tracker,
1222            bank_notification_sender,
1223        ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1224            assert_eq!(
1225                node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1226                node.info
1227                    .rpc_pubsub()
1228                    .map(|addr| socket_addr_space.check(&addr))
1229            );
1230            let (bank_notification_sender, bank_notification_receiver) = unbounded();
1231            let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1232                Some(Arc::new(RwLock::new(bank_notification_senders)))
1233            } else {
1234                None
1235            };
1236
1237            let client_option = if config.use_tpu_client_next {
1238                let runtime_handle = tpu_client_next_runtime
1239                    .as_ref()
1240                    .map(TokioRuntime::handle)
1241                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1242                ClientOption::TpuClientNext(
1243                    Arc::as_ref(&identity_keypair),
1244                    node.sockets.rpc_sts_client,
1245                    runtime_handle.clone(),
1246                    cancel.clone(),
1247                )
1248            } else {
1249                let Some(connection_cache) = &connection_cache else {
1250                    panic!("ConnectionCache should exist by construction.");
1251                };
1252                ClientOption::ConnectionCache(connection_cache.clone())
1253            };
1254            let rpc_svc_config = JsonRpcServiceConfig {
1255                rpc_addr,
1256                rpc_config: config.rpc_config.clone(),
1257                snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1258                bank_forks: bank_forks.clone(),
1259                block_commitment_cache: block_commitment_cache.clone(),
1260                blockstore: blockstore.clone(),
1261                cluster_info: cluster_info.clone(),
1262                poh_recorder: Some(poh_recorder.clone()),
1263                genesis_hash: genesis_config.hash(),
1264                ledger_path: ledger_path.to_path_buf(),
1265                validator_exit: config.validator_exit.clone(),
1266                exit: exit.clone(),
1267                override_health_check: rpc_override_health_check.clone(),
1268                optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1269                send_transaction_service_config: config.send_transaction_service_config.clone(),
1270                max_slots: max_slots.clone(),
1271                leader_schedule_cache: leader_schedule_cache.clone(),
1272                max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1273                prioritization_fee_cache: prioritization_fee_cache.clone(),
1274                client_option,
1275            };
1276            let json_rpc_service =
1277                JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1278            let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1279                exit.clone(),
1280                max_complete_transaction_status_slot,
1281                blockstore.clone(),
1282                bank_forks.clone(),
1283                block_commitment_cache.clone(),
1284                optimistically_confirmed_bank.clone(),
1285                &config.pubsub_config,
1286                None,
1287            ));
1288            let pubsub_service = if !config.rpc_config.full_api {
1289                None
1290            } else {
1291                let (trigger, pubsub_service) = PubSubService::new(
1292                    config.pubsub_config.clone(),
1293                    &rpc_subscriptions,
1294                    rpc_pubsub_addr,
1295                );
1296                config
1297                    .validator_exit
1298                    .write()
1299                    .unwrap()
1300                    .register_exit(Box::new(move || trigger.cancel()));
1301
1302                Some(pubsub_service)
1303            };
1304
1305            let (completed_data_sets_sender, completed_data_sets_service) =
1306                if !config.rpc_config.full_api {
1307                    (None, None)
1308                } else {
1309                    let (completed_data_sets_sender, completed_data_sets_receiver) =
1310                        bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1311                    let completed_data_sets_service = CompletedDataSetsService::new(
1312                        completed_data_sets_receiver,
1313                        blockstore.clone(),
1314                        rpc_subscriptions.clone(),
1315                        exit.clone(),
1316                        max_slots.clone(),
1317                    );
1318                    (
1319                        Some(completed_data_sets_sender),
1320                        Some(completed_data_sets_service),
1321                    )
1322                };
1323
1324            let rpc_completed_slots_service =
1325                if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1326                    let (completed_slots_sender, completed_slots_receiver) =
1327                        bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1328                    blockstore.add_completed_slots_signal(completed_slots_sender);
1329
1330                    Some(RpcCompletedSlotsService::spawn(
1331                        completed_slots_receiver,
1332                        rpc_subscriptions.clone(),
1333                        slot_status_notifier.clone(),
1334                        exit.clone(),
1335                    ))
1336                } else {
1337                    None
1338                };
1339
1340            let dependency_tracker = transaction_status_sender
1341                .is_some()
1342                .then_some(dependency_tracker);
1343            let optimistically_confirmed_bank_tracker =
1344                Some(OptimisticallyConfirmedBankTracker::new(
1345                    bank_notification_receiver,
1346                    exit.clone(),
1347                    bank_forks.clone(),
1348                    optimistically_confirmed_bank,
1349                    rpc_subscriptions.clone(),
1350                    confirmed_bank_subscribers,
1351                    prioritization_fee_cache.clone(),
1352                    dependency_tracker.clone(),
1353                ));
1354            let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1355                sender: bank_notification_sender,
1356                should_send_parents: geyser_plugin_service.is_some(),
1357                dependency_tracker,
1358            });
1359            (
1360                Some(json_rpc_service),
1361                Some(rpc_subscriptions),
1362                pubsub_service,
1363                completed_data_sets_sender,
1364                completed_data_sets_service,
1365                rpc_completed_slots_service,
1366                optimistically_confirmed_bank_tracker,
1367                bank_notification_sender_config,
1368            )
1369        } else {
1370            (None, None, None, None, None, None, None, None)
1371        };
1372
1373        if config.halt_at_slot.is_some() {
1374            // Simulate a confirmed root to avoid RPC errors with CommitmentConfig::finalized() and
1375            // to ensure RPC endpoints like getConfirmedBlock, which require a confirmed root, work
1376            block_commitment_cache
1377                .write()
1378                .unwrap()
1379                .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1380
1381            // Park with the RPC service running, ready for inspection!
1382            warn!("Validator halted");
1383            *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1384            std::thread::park();
1385        }
1386        let ip_echo_server = match node.sockets.ip_echo {
1387            None => None,
1388            Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1389                tcp_listener,
1390                config.ip_echo_server_threads,
1391                Some(node.info.shred_version()),
1392            )),
1393        };
1394
1395        let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1396
1397        let stats_reporter_service =
1398            StatsReporterService::new(stats_reporter_receiver, exit.clone());
1399
1400        let gossip_service = GossipService::new(
1401            &cluster_info,
1402            Some(bank_forks.clone()),
1403            node.sockets.gossip.clone(),
1404            config.gossip_validators.clone(),
1405            should_check_duplicate_instance,
1406            Some(stats_reporter_sender.clone()),
1407            exit.clone(),
1408        );
1409        let serve_repair = config.repair_handler_type.create_serve_repair(
1410            blockstore.clone(),
1411            cluster_info.clone(),
1412            bank_forks.read().unwrap().sharable_banks(),
1413            config.repair_whitelist.clone(),
1414        );
1415        let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1416        let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1417        let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1418            unbounded();
1419
1420        let waited_for_supermajority = wait_for_supermajority(
1421            config,
1422            Some(&mut process_blockstore),
1423            &bank_forks,
1424            &cluster_info,
1425            rpc_override_health_check,
1426            &start_progress,
1427        )?;
1428
1429        let blockstore_metric_report_service =
1430            BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1431
1432        let wait_for_vote_to_start_leader =
1433            !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1434
1435        let poh_service = PohService::new(
1436            poh_recorder.clone(),
1437            &genesis_config.poh_config,
1438            exit.clone(),
1439            bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1440            config.poh_pinned_cpu_core,
1441            config.poh_hashes_per_batch,
1442            record_receiver,
1443            poh_service_message_receiver,
1444        );
1445        assert_eq!(
1446            blockstore.get_new_shred_signals_len(),
1447            1,
1448            "New shred signal for the TVU should be the same as the clear bank signal."
1449        );
1450
1451        let vote_tracker = Arc::<VoteTracker>::default();
1452
1453        let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1454        let (verified_vote_sender, verified_vote_receiver) = unbounded();
1455        let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1456        let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1457
1458        let entry_notification_sender = entry_notifier_service
1459            .as_ref()
1460            .map(|service| service.sender_cloned());
1461
1462        let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1463            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1464            .then(|| {
1465                tokio::runtime::Builder::new_multi_thread()
1466                    .enable_all()
1467                    .thread_name("solTurbineQuic")
1468                    .build()
1469                    .unwrap()
1470            });
1471        let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1472        let (
1473            turbine_quic_endpoint,
1474            turbine_quic_endpoint_sender,
1475            turbine_quic_endpoint_join_handle,
1476        ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1477            let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1478            (None, sender, None)
1479        } else {
1480            solana_turbine::quic_endpoint::new_quic_endpoint(
1481                turbine_quic_endpoint_runtime
1482                    .as_ref()
1483                    .map(TokioRuntime::handle)
1484                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1485                &identity_keypair,
1486                node.sockets.tvu_quic,
1487                turbine_quic_endpoint_sender,
1488                bank_forks.clone(),
1489            )
1490            .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1491            .unwrap()
1492        };
1493
1494        // Repair quic endpoint.
1495        let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1496            && genesis_config.cluster_type != ClusterType::MainnetBeta)
1497            .then(|| {
1498                tokio::runtime::Builder::new_multi_thread()
1499                    .enable_all()
1500                    .thread_name("solRepairQuic")
1501                    .build()
1502                    .unwrap()
1503            });
1504        let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1505            if genesis_config.cluster_type == ClusterType::MainnetBeta {
1506                (None, RepairQuicAsyncSenders::new_dummy(), None)
1507            } else {
1508                let repair_quic_sockets = RepairQuicSockets {
1509                    repair_server_quic_socket: node.sockets.serve_repair_quic,
1510                    repair_client_quic_socket: node.sockets.repair_quic,
1511                    ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1512                };
1513                let repair_quic_senders = RepairQuicSenders {
1514                    repair_request_quic_sender: repair_request_quic_sender.clone(),
1515                    repair_response_quic_sender,
1516                    ancestor_hashes_response_quic_sender,
1517                };
1518                repair::quic_endpoint::new_quic_endpoints(
1519                    repair_quic_endpoints_runtime
1520                        .as_ref()
1521                        .map(TokioRuntime::handle)
1522                        .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1523                    &identity_keypair,
1524                    repair_quic_sockets,
1525                    repair_quic_senders,
1526                    bank_forks.clone(),
1527                )
1528                .map(|(endpoints, senders, join_handle)| {
1529                    (Some(endpoints), senders, Some(join_handle))
1530                })
1531                .unwrap()
1532            };
1533        let serve_repair_service = ServeRepairService::new(
1534            serve_repair,
1535            // Incoming UDP repair requests are adapted into RemoteRequest
1536            // and also sent through the same channel.
1537            repair_request_quic_sender,
1538            repair_request_quic_receiver,
1539            repair_quic_async_senders.repair_response_quic_sender,
1540            node.sockets.serve_repair,
1541            socket_addr_space,
1542            stats_reporter_sender,
1543            exit.clone(),
1544        );
1545
1546        let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1547        let wen_restart_repair_slots = if in_wen_restart {
1548            Some(Arc::new(RwLock::new(Vec::new())))
1549        } else {
1550            None
1551        };
1552        let tower = match process_blockstore.process_to_create_tower() {
1553            Ok(tower) => {
1554                info!("Tower state: {tower:?}");
1555                tower
1556            }
1557            Err(e) => {
1558                warn!("Unable to retrieve tower: {e:?} creating default tower....");
1559                Tower::default()
1560            }
1561        };
1562        let last_vote = tower.last_vote();
1563
1564        let outstanding_repair_requests =
1565            Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1566        let root_bank = bank_forks.read().unwrap().root_bank();
1567        let cluster_slots = Arc::new({
1568            crate::cluster_slots_service::cluster_slots::ClusterSlots::new(
1569                &root_bank,
1570                &cluster_info,
1571            )
1572        });
1573
1574        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
1575        let connection_cache_for_warmup =
1576            if json_rpc_service.is_some() && connection_cache.is_some() {
1577                connection_cache.as_ref()
1578            } else {
1579                None
1580            };
1581        let (xdp_retransmitter, xdp_sender) =
1582            if let Some(xdp_config) = config.retransmit_xdp.clone() {
1583                let src_port = node.sockets.retransmit_sockets[0]
1584                    .local_addr()
1585                    .expect("failed to get local address")
1586                    .port();
1587                let src_ip = match node.bind_ip_addrs.active() {
1588                    IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
1589                    IpAddr::V4(_unspecified) => xdp_config
1590                        .interface
1591                        .as_ref()
1592                        .and_then(|iface| master_ip_if_bonded(iface)),
1593                    _ => panic!("IPv6 not supported"),
1594                };
1595                let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip)
1596                    .expect("failed to create xdp retransmitter");
1597                (Some(rtx), Some(sender))
1598            } else {
1599                (None, None)
1600            };
1601
1602        // disable all2all tests if not allowed for a given cluster type
1603        let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1604            || genesis_config.cluster_type == ClusterType::Development
1605        {
1606            node.sockets.alpenglow
1607        } else {
1608            None
1609        };
1610
1611        let tvu = Tvu::new(
1612            vote_account,
1613            authorized_voter_keypairs,
1614            &bank_forks,
1615            &cluster_info,
1616            TvuSockets {
1617                repair: node.sockets.repair.try_clone().unwrap(),
1618                retransmit: node.sockets.retransmit_sockets,
1619                fetch: node.sockets.tvu,
1620                ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1621                alpenglow: alpenglow_socket,
1622            },
1623            blockstore.clone(),
1624            ledger_signal_receiver,
1625            rpc_subscriptions.clone(),
1626            &poh_recorder,
1627            poh_controller,
1628            tower,
1629            config.tower_storage.clone(),
1630            &leader_schedule_cache,
1631            exit.clone(),
1632            block_commitment_cache,
1633            config.turbine_disabled.clone(),
1634            transaction_status_sender.clone(),
1635            entry_notification_sender.clone(),
1636            vote_tracker.clone(),
1637            retransmit_slots_sender,
1638            gossip_verified_vote_hash_receiver,
1639            verified_vote_receiver,
1640            replay_vote_sender.clone(),
1641            completed_data_sets_sender,
1642            bank_notification_sender.clone(),
1643            duplicate_confirmed_slots_receiver,
1644            TvuConfig {
1645                max_ledger_shreds: config.max_ledger_shreds,
1646                shred_version: node.info.shred_version(),
1647                repair_validators: config.repair_validators.clone(),
1648                repair_whitelist: config.repair_whitelist.clone(),
1649                wait_for_vote_to_start_leader,
1650                replay_forks_threads: config.replay_forks_threads,
1651                replay_transactions_threads: config.replay_transactions_threads,
1652                shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1653                xdp_sender: xdp_sender.clone(),
1654            },
1655            &max_slots,
1656            block_metadata_notifier,
1657            config.wait_to_vote_slot,
1658            Some(snapshot_controller.clone()),
1659            config.runtime_config.log_messages_bytes_limit,
1660            connection_cache_for_warmup,
1661            &prioritization_fee_cache,
1662            banking_tracer.clone(),
1663            turbine_quic_endpoint_sender.clone(),
1664            turbine_quic_endpoint_receiver,
1665            repair_response_quic_receiver,
1666            repair_quic_async_senders.repair_request_quic_sender,
1667            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1668            ancestor_hashes_response_quic_receiver,
1669            outstanding_repair_requests.clone(),
1670            cluster_slots.clone(),
1671            wen_restart_repair_slots.clone(),
1672            slot_status_notifier,
1673            vote_connection_cache,
1674        )
1675        .map_err(ValidatorError::Other)?;
1676
1677        if in_wen_restart {
1678            info!("Waiting for wen_restart to finish");
1679            wait_for_wen_restart(WenRestartConfig {
1680                wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1681                wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1682                last_vote,
1683                blockstore: blockstore.clone(),
1684                cluster_info: cluster_info.clone(),
1685                bank_forks: bank_forks.clone(),
1686                wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1687                wait_for_supermajority_threshold_percent:
1688                    WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1689                snapshot_controller: Some(snapshot_controller.clone()),
1690                abs_status: accounts_background_service.status().clone(),
1691                genesis_config_hash: genesis_config.hash(),
1692                exit: exit.clone(),
1693            })?;
1694            return Err(ValidatorError::WenRestartFinished.into());
1695        }
1696
1697        let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1698        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1699            ForwardingClientOption::ConnectionCache(connection_cache.clone())
1700        } else {
1701            let runtime_handle = tpu_client_next_runtime
1702                .as_ref()
1703                .map(TokioRuntime::handle)
1704                .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1705            ForwardingClientOption::TpuClientNext((
1706                Arc::as_ref(&identity_keypair),
1707                tpu_transactions_forwards_client_sockets.take().unwrap(),
1708                runtime_handle.clone(),
1709                cancel.clone(),
1710                node_multihoming.clone(),
1711            ))
1712        };
1713        let tpu = Tpu::new_with_client(
1714            &cluster_info,
1715            &poh_recorder,
1716            transaction_recorder,
1717            entry_receiver,
1718            retransmit_slots_receiver,
1719            TpuSockets {
1720                transactions: node.sockets.tpu,
1721                transaction_forwards: node.sockets.tpu_forwards,
1722                vote: node.sockets.tpu_vote,
1723                broadcast: node.sockets.broadcast,
1724                transactions_quic: node.sockets.tpu_quic,
1725                transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1726                vote_quic: node.sockets.tpu_vote_quic,
1727                vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1728                vortexor_receivers: node.sockets.vortexor_receivers,
1729            },
1730            rpc_subscriptions.clone(),
1731            transaction_status_sender,
1732            entry_notification_sender,
1733            blockstore.clone(),
1734            &config.broadcast_stage_type,
1735            xdp_sender,
1736            exit,
1737            node.info.shred_version(),
1738            vote_tracker,
1739            bank_forks.clone(),
1740            verified_vote_sender,
1741            gossip_verified_vote_hash_sender,
1742            replay_vote_receiver,
1743            replay_vote_sender,
1744            bank_notification_sender,
1745            duplicate_confirmed_slot_sender,
1746            forwarding_tpu_client,
1747            turbine_quic_endpoint_sender,
1748            &identity_keypair,
1749            config.runtime_config.log_messages_bytes_limit,
1750            &staked_nodes,
1751            config.staked_nodes_overrides.clone(),
1752            banking_tracer_channels,
1753            tracer_thread,
1754            tpu_enable_udp,
1755            tpu_quic_server_config,
1756            tpu_fwd_quic_server_config,
1757            vote_quic_server_config,
1758            &prioritization_fee_cache,
1759            config.block_production_method.clone(),
1760            config.block_production_num_workers,
1761            config.block_production_scheduler_config.clone(),
1762            config.enable_block_production_forwarding,
1763            config.generator_config.clone(),
1764            key_notifiers.clone(),
1765            cancel,
1766        );
1767
1768        datapoint_info!(
1769            "validator-new",
1770            ("id", id.to_string(), String),
1771            ("version", solana_version::version!(), String),
1772            ("cluster_type", genesis_config.cluster_type as u32, i64),
1773            ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1774            ("waited_for_supermajority", waited_for_supermajority, bool),
1775            ("shred_version", shred_version as i64, i64),
1776        );
1777
1778        *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1779        if config.use_tpu_client_next {
1780            if let Some(json_rpc_service) = &json_rpc_service {
1781                key_notifiers.write().unwrap().add(
1782                    KeyUpdaterType::RpcService,
1783                    json_rpc_service.get_client_key_updater(),
1784                );
1785            }
1786            // note, that we don't need to add ConnectionClient to key_notifiers
1787            // because it is added inside Tpu.
1788        }
1789
1790        *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1791            bank_forks: bank_forks.clone(),
1792            cluster_info: cluster_info.clone(),
1793            vote_account: *vote_account,
1794            repair_whitelist: config.repair_whitelist.clone(),
1795            notifies: key_notifiers,
1796            repair_socket: Arc::new(node.sockets.repair),
1797            outstanding_repair_requests,
1798            cluster_slots,
1799            node: Some(node_multihoming),
1800            banking_stage: tpu.banking_stage(),
1801        });
1802
1803        Ok(Self {
1804            stats_reporter_service,
1805            gossip_service,
1806            serve_repair_service,
1807            json_rpc_service,
1808            pubsub_service,
1809            rpc_completed_slots_service,
1810            optimistically_confirmed_bank_tracker,
1811            transaction_status_service,
1812            entry_notifier_service,
1813            system_monitor_service,
1814            sample_performance_service,
1815            snapshot_packager_service,
1816            completed_data_sets_service,
1817            tpu,
1818            tvu,
1819            poh_service,
1820            poh_recorder,
1821            ip_echo_server,
1822            validator_exit: config.validator_exit.clone(),
1823            cluster_info,
1824            bank_forks,
1825            blockstore,
1826            geyser_plugin_service,
1827            blockstore_metric_report_service,
1828            accounts_background_service,
1829            turbine_quic_endpoint,
1830            turbine_quic_endpoint_runtime,
1831            turbine_quic_endpoint_join_handle,
1832            repair_quic_endpoints,
1833            repair_quic_endpoints_runtime,
1834            repair_quic_endpoints_join_handle,
1835            xdp_retransmitter,
1836            _tpu_client_next_runtime: tpu_client_next_runtime,
1837        })
1838    }
1839
1840    // Used for notifying many nodes in parallel to exit
1841    pub fn exit(&mut self) {
1842        self.validator_exit.write().unwrap().exit();
1843
1844        // drop all signals in blockstore
1845        self.blockstore.drop_signal();
1846    }
1847
1848    pub fn close(mut self) {
1849        self.exit();
1850        self.join();
1851    }
1852
1853    fn print_node_info(node: &Node) {
1854        info!("{:?}", node.info);
1855        info!(
1856            "local gossip address: {}",
1857            node.sockets.gossip[0].local_addr().unwrap()
1858        );
1859        info!(
1860            "local broadcast address: {}",
1861            node.sockets
1862                .broadcast
1863                .first()
1864                .unwrap()
1865                .local_addr()
1866                .unwrap()
1867        );
1868        info!(
1869            "local repair address: {}",
1870            node.sockets.repair.local_addr().unwrap()
1871        );
1872        info!(
1873            "local retransmit address: {}",
1874            node.sockets.retransmit_sockets[0].local_addr().unwrap()
1875        );
1876    }
1877
1878    pub fn join(self) {
1879        drop(self.bank_forks);
1880        drop(self.cluster_info);
1881
1882        self.poh_service.join().expect("poh_service");
1883        drop(self.poh_recorder);
1884
1885        if let Some(json_rpc_service) = self.json_rpc_service {
1886            json_rpc_service.join().expect("rpc_service");
1887        }
1888
1889        if let Some(pubsub_service) = self.pubsub_service {
1890            pubsub_service.join().expect("pubsub_service");
1891        }
1892
1893        if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1894            rpc_completed_slots_service
1895                .join()
1896                .expect("rpc_completed_slots_service");
1897        }
1898
1899        if let Some(optimistically_confirmed_bank_tracker) =
1900            self.optimistically_confirmed_bank_tracker
1901        {
1902            optimistically_confirmed_bank_tracker
1903                .join()
1904                .expect("optimistically_confirmed_bank_tracker");
1905        }
1906
1907        if let Some(transaction_status_service) = self.transaction_status_service {
1908            transaction_status_service
1909                .join()
1910                .expect("transaction_status_service");
1911        }
1912
1913        if let Some(system_monitor_service) = self.system_monitor_service {
1914            system_monitor_service
1915                .join()
1916                .expect("system_monitor_service");
1917        }
1918
1919        if let Some(sample_performance_service) = self.sample_performance_service {
1920            sample_performance_service
1921                .join()
1922                .expect("sample_performance_service");
1923        }
1924
1925        if let Some(entry_notifier_service) = self.entry_notifier_service {
1926            entry_notifier_service
1927                .join()
1928                .expect("entry_notifier_service");
1929        }
1930
1931        if let Some(s) = self.snapshot_packager_service {
1932            s.join().expect("snapshot_packager_service");
1933        }
1934
1935        self.gossip_service.join().expect("gossip_service");
1936        self.repair_quic_endpoints
1937            .iter()
1938            .flatten()
1939            .for_each(repair::quic_endpoint::close_quic_endpoint);
1940        self.serve_repair_service
1941            .join()
1942            .expect("serve_repair_service");
1943        if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1944            self.repair_quic_endpoints_runtime
1945                .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1946                .transpose()
1947                .unwrap();
1948        }
1949        self.stats_reporter_service
1950            .join()
1951            .expect("stats_reporter_service");
1952        self.blockstore_metric_report_service
1953            .join()
1954            .expect("ledger_metric_report_service");
1955        self.accounts_background_service
1956            .join()
1957            .expect("accounts_background_service");
1958        if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1959            solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1960        }
1961        if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1962            xdp_retransmitter.join().expect("xdp_retransmitter");
1963        }
1964        self.tpu.join().expect("tpu");
1965        self.tvu.join().expect("tvu");
1966        if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1967            self.turbine_quic_endpoint_runtime
1968                .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1969                .transpose()
1970                .unwrap();
1971        }
1972        if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1973            completed_data_sets_service
1974                .join()
1975                .expect("completed_data_sets_service");
1976        }
1977        if let Some(ip_echo_server) = self.ip_echo_server {
1978            ip_echo_server.shutdown_background();
1979        }
1980
1981        if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1982            geyser_plugin_service.join().expect("geyser_plugin_service");
1983        }
1984    }
1985}
1986
1987fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1988    if let Some(account) = &bank.get_account(vote_account) {
1989        if let Ok(vote_state) = VoteStateV4::deserialize(account.data(), vote_account) {
1990            return !vote_state.votes.is_empty();
1991        }
1992    }
1993    false
1994}
1995
1996fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1997    let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1998        warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1999        return Ok(());
2000    };
2001
2002    let ticks_per_slot = bank.ticks_per_slot();
2003    let hashes_per_slot = hashes_per_tick * ticks_per_slot;
2004    let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
2005
2006    let hash_time = compute_hash_time(hash_samples);
2007    let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
2008
2009    let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
2010    let target_hashes_per_second =
2011        (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
2012
2013    info!(
2014        "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
2015         second {target_hashes_per_second}"
2016    );
2017    if my_hashes_per_second < target_hashes_per_second {
2018        return Err(ValidatorError::PohTooSlow {
2019            mine: my_hashes_per_second,
2020            target: target_hashes_per_second,
2021        });
2022    }
2023
2024    Ok(())
2025}
2026
2027fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
2028    // detect cluster restart (hard fork) indirectly via wait_for_supermajority...
2029    if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
2030        if wait_slot_for_supermajority == root_slot {
2031            return Some(wait_slot_for_supermajority);
2032        }
2033    }
2034
2035    None
2036}
2037
2038fn post_process_restored_tower(
2039    restored_tower: crate::consensus::Result<Tower>,
2040    validator_identity: &Pubkey,
2041    vote_account: &Pubkey,
2042    config: &ValidatorConfig,
2043    bank_forks: &BankForks,
2044) -> Result<Tower, String> {
2045    let mut should_require_tower = config.require_tower;
2046
2047    let restored_tower = restored_tower.and_then(|tower| {
2048        let root_bank = bank_forks.root_bank();
2049        let slot_history = root_bank.get_slot_history();
2050        // make sure tower isn't corrupted first before the following hard fork check
2051        let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
2052
2053        if let Some(hard_fork_restart_slot) =
2054            maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
2055        {
2056            // intentionally fail to restore tower; we're supposedly in a new hard fork; past
2057            // out-of-chain vote state doesn't make sense at all
2058            // what if --wait-for-supermajority again if the validator restarted?
2059            let message =
2060                format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
2061            datapoint_error!("tower_error", ("error", message, String),);
2062            error!("{message}");
2063
2064            // unconditionally relax tower requirement so that we can always restore tower
2065            // from root bank.
2066            should_require_tower = false;
2067            return Err(crate::consensus::TowerError::HardFork(
2068                hard_fork_restart_slot,
2069            ));
2070        }
2071
2072        if let Some(warp_slot) = config.warp_slot {
2073            // unconditionally relax tower requirement so that we can always restore tower
2074            // from root bank after the warp
2075            should_require_tower = false;
2076            return Err(crate::consensus::TowerError::HardFork(warp_slot));
2077        }
2078
2079        tower
2080    });
2081
2082    let restored_tower = match restored_tower {
2083        Ok(tower) => tower,
2084        Err(err) => {
2085            let voting_has_been_active =
2086                active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
2087            if !err.is_file_missing() {
2088                datapoint_error!(
2089                    "tower_error",
2090                    ("error", format!("Unable to restore tower: {err}"), String),
2091                );
2092            }
2093            if should_require_tower && voting_has_been_active {
2094                return Err(format!(
2095                    "Requested mandatory tower restore failed: {err}. And there is an existing \
2096                     vote_account containing actual votes. Aborting due to possible conflicting \
2097                     duplicate votes"
2098                ));
2099            }
2100            if err.is_file_missing() && !voting_has_been_active {
2101                // Currently, don't protect against spoofed snapshots with no tower at all
2102                info!(
2103                    "Ignoring expected failed tower restore because this is the initial validator \
2104                     start with the vote account..."
2105                );
2106            } else {
2107                error!(
2108                    "Rebuilding a new tower from the latest vote account due to failed tower \
2109                     restore: {err}"
2110                );
2111            }
2112
2113            Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2114        }
2115    };
2116
2117    Ok(restored_tower)
2118}
2119
2120fn load_genesis(
2121    config: &ValidatorConfig,
2122    ledger_path: &Path,
2123) -> Result<GenesisConfig, ValidatorError> {
2124    let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2125        .map_err(ValidatorError::OpenGenesisConfig)?;
2126
2127    // This needs to be limited otherwise the state in the VoteAccount data
2128    // grows too large
2129    let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2130    let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2131    let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2132    assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2133
2134    let genesis_hash = genesis_config.hash();
2135    info!("genesis hash: {genesis_hash}");
2136
2137    if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2138        if genesis_hash != expected_genesis_hash {
2139            return Err(ValidatorError::GenesisHashMismatch(
2140                genesis_hash,
2141                expected_genesis_hash,
2142            ));
2143        }
2144    }
2145
2146    Ok(genesis_config)
2147}
2148
2149#[allow(clippy::type_complexity)]
2150fn load_blockstore(
2151    config: &ValidatorConfig,
2152    ledger_path: &Path,
2153    genesis_config: &GenesisConfig,
2154    exit: Arc<AtomicBool>,
2155    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2156    accounts_update_notifier: Option<AccountsUpdateNotifier>,
2157    transaction_notifier: Option<TransactionNotifierArc>,
2158    entry_notifier: Option<EntryNotifierArc>,
2159    dependency_tracker: Option<Arc<DependencyTracker>>,
2160) -> Result<
2161    (
2162        Arc<RwLock<BankForks>>,
2163        Arc<Blockstore>,
2164        Slot,
2165        Receiver<bool>,
2166        LeaderScheduleCache,
2167        Option<StartingSnapshotHashes>,
2168        TransactionHistoryServices,
2169        blockstore_processor::ProcessOptions,
2170        BlockstoreRootScan,
2171        DroppedSlotsReceiver,
2172        Option<EntryNotifierService>,
2173    ),
2174    String,
2175> {
2176    info!("loading ledger from {ledger_path:?}...");
2177    *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2178
2179    let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2180        .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2181
2182    let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2183    blockstore.add_new_shred_signal(ledger_signal_sender);
2184
2185    // following boot sequence (esp BankForks) could set root. so stash the original value
2186    // of blockstore root away here as soon as possible.
2187    let original_blockstore_root = blockstore.max_root();
2188
2189    let blockstore = Arc::new(blockstore);
2190    let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2191    let halt_at_slot = config
2192        .halt_at_slot
2193        .or_else(|| blockstore.highest_slot().unwrap_or(None));
2194
2195    let process_options = blockstore_processor::ProcessOptions {
2196        run_verification: config.run_verification,
2197        halt_at_slot,
2198        new_hard_forks: config.new_hard_forks.clone(),
2199        debug_keys: config.debug_keys.clone(),
2200        accounts_db_config: config.accounts_db_config.clone(),
2201        accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2202        accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2203        runtime_config: config.runtime_config.clone(),
2204        use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2205        ..blockstore_processor::ProcessOptions::default()
2206    };
2207
2208    let enable_rpc_transaction_history =
2209        config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2210    let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2211    let transaction_history_services =
2212        if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2213            initialize_rpc_transaction_history_services(
2214                blockstore.clone(),
2215                exit.clone(),
2216                enable_rpc_transaction_history,
2217                config.rpc_config.enable_extended_tx_metadata_storage,
2218                transaction_notifier,
2219                dependency_tracker,
2220            )
2221        } else {
2222            TransactionHistoryServices::default()
2223        };
2224
2225    let entry_notifier_service = entry_notifier
2226        .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2227
2228    let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2229        bank_forks_utils::load_bank_forks(
2230            genesis_config,
2231            &blockstore,
2232            config.account_paths.clone(),
2233            &config.snapshot_config,
2234            &process_options,
2235            transaction_history_services
2236                .transaction_status_sender
2237                .as_ref(),
2238            entry_notifier_service
2239                .as_ref()
2240                .map(|service| service.sender()),
2241            accounts_update_notifier,
2242            exit,
2243        )
2244        .map_err(|err| err.to_string())?;
2245
2246    // Before replay starts, set the callbacks in each of the banks in BankForks so that
2247    // all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
2248    // drop behavior can be safely synchronized with any other ongoing accounts activity like
2249    // cache flush, clean, shrink, as long as the same thread performing those activities also
2250    // is processing the dropped banks from the `pruned_banks_receiver` channel.
2251    let pruned_banks_receiver =
2252        AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2253
2254    leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2255
2256    Ok((
2257        bank_forks,
2258        blockstore,
2259        original_blockstore_root,
2260        ledger_signal_receiver,
2261        leader_schedule_cache,
2262        starting_snapshot_hashes,
2263        transaction_history_services,
2264        process_options,
2265        blockstore_root_scan,
2266        pruned_banks_receiver,
2267        entry_notifier_service,
2268    ))
2269}
2270
2271pub struct ProcessBlockStore<'a> {
2272    id: &'a Pubkey,
2273    vote_account: &'a Pubkey,
2274    start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2275    blockstore: &'a Blockstore,
2276    original_blockstore_root: Slot,
2277    bank_forks: &'a Arc<RwLock<BankForks>>,
2278    leader_schedule_cache: &'a LeaderScheduleCache,
2279    process_options: &'a blockstore_processor::ProcessOptions,
2280    transaction_status_sender: Option<&'a TransactionStatusSender>,
2281    entry_notification_sender: Option<&'a EntryNotifierSender>,
2282    blockstore_root_scan: Option<BlockstoreRootScan>,
2283    snapshot_controller: &'a SnapshotController,
2284    config: &'a ValidatorConfig,
2285    tower: Option<Tower>,
2286}
2287
2288impl<'a> ProcessBlockStore<'a> {
2289    #[allow(clippy::too_many_arguments)]
2290    fn new(
2291        id: &'a Pubkey,
2292        vote_account: &'a Pubkey,
2293        start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2294        blockstore: &'a Blockstore,
2295        original_blockstore_root: Slot,
2296        bank_forks: &'a Arc<RwLock<BankForks>>,
2297        leader_schedule_cache: &'a LeaderScheduleCache,
2298        process_options: &'a blockstore_processor::ProcessOptions,
2299        transaction_status_sender: Option<&'a TransactionStatusSender>,
2300        entry_notification_sender: Option<&'a EntryNotifierSender>,
2301        blockstore_root_scan: BlockstoreRootScan,
2302        snapshot_controller: &'a SnapshotController,
2303        config: &'a ValidatorConfig,
2304    ) -> Self {
2305        Self {
2306            id,
2307            vote_account,
2308            start_progress,
2309            blockstore,
2310            original_blockstore_root,
2311            bank_forks,
2312            leader_schedule_cache,
2313            process_options,
2314            transaction_status_sender,
2315            entry_notification_sender,
2316            blockstore_root_scan: Some(blockstore_root_scan),
2317            snapshot_controller,
2318            config,
2319            tower: None,
2320        }
2321    }
2322
2323    pub(crate) fn process(&mut self) -> Result<(), String> {
2324        if self.tower.is_none() {
2325            let previous_start_process = *self.start_progress.read().unwrap();
2326            *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2327
2328            let exit = Arc::new(AtomicBool::new(false));
2329            if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2330                let bank_forks = self.bank_forks.clone();
2331                let exit = exit.clone();
2332                let start_progress = self.start_progress.clone();
2333
2334                let _ = Builder::new()
2335                    .name("solRptLdgrStat".to_string())
2336                    .spawn(move || {
2337                        while !exit.load(Ordering::Relaxed) {
2338                            let slot = bank_forks.read().unwrap().working_bank().slot();
2339                            *start_progress.write().unwrap() =
2340                                ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2341                            sleep(Duration::from_secs(2));
2342                        }
2343                    })
2344                    .unwrap();
2345            }
2346            blockstore_processor::process_blockstore_from_root(
2347                self.blockstore,
2348                self.bank_forks,
2349                self.leader_schedule_cache,
2350                self.process_options,
2351                self.transaction_status_sender,
2352                self.entry_notification_sender,
2353                Some(self.snapshot_controller),
2354            )
2355            .map_err(|err| {
2356                exit.store(true, Ordering::Relaxed);
2357                format!("Failed to load ledger: {err:?}")
2358            })?;
2359            exit.store(true, Ordering::Relaxed);
2360
2361            if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2362                blockstore_root_scan.join();
2363            }
2364
2365            self.tower = Some({
2366                let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2367                if let Ok(tower) = &restored_tower {
2368                    // reconciliation attempt 1 of 2 with tower
2369                    reconcile_blockstore_roots_with_external_source(
2370                        ExternalRootSource::Tower(tower.root()),
2371                        self.blockstore,
2372                        &mut self.original_blockstore_root,
2373                    )
2374                    .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2375                }
2376
2377                post_process_restored_tower(
2378                    restored_tower,
2379                    self.id,
2380                    self.vote_account,
2381                    self.config,
2382                    &self.bank_forks.read().unwrap(),
2383                )?
2384            });
2385
2386            if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2387                self.config,
2388                self.bank_forks.read().unwrap().root(),
2389            ) {
2390                // reconciliation attempt 2 of 2 with hard fork
2391                // this should be #2 because hard fork root > tower root in almost all cases
2392                reconcile_blockstore_roots_with_external_source(
2393                    ExternalRootSource::HardFork(hard_fork_restart_slot),
2394                    self.blockstore,
2395                    &mut self.original_blockstore_root,
2396                )
2397                .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2398            }
2399
2400            *self.start_progress.write().unwrap() = previous_start_process;
2401        }
2402        Ok(())
2403    }
2404
2405    pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2406        self.process()?;
2407        Ok(self.tower.unwrap())
2408    }
2409}
2410
2411fn maybe_warp_slot(
2412    config: &ValidatorConfig,
2413    process_blockstore: &mut ProcessBlockStore,
2414    ledger_path: &Path,
2415    bank_forks: &RwLock<BankForks>,
2416    leader_schedule_cache: &LeaderScheduleCache,
2417    snapshot_controller: &SnapshotController,
2418) -> Result<(), String> {
2419    if let Some(warp_slot) = config.warp_slot {
2420        let mut bank_forks = bank_forks.write().unwrap();
2421
2422        let working_bank = bank_forks.working_bank();
2423
2424        if warp_slot <= working_bank.slot() {
2425            return Err(format!(
2426                "warp slot ({}) cannot be less than the working bank slot ({})",
2427                warp_slot,
2428                working_bank.slot()
2429            ));
2430        }
2431        info!("warping to slot {warp_slot}");
2432
2433        let root_bank = bank_forks.root_bank();
2434
2435        // An accounts hash calculation from storages will occur in warp_from_parent() below.  This
2436        // requires that the accounts cache has been flushed, which requires the parent slot to be
2437        // rooted.
2438        root_bank.squash();
2439        root_bank.force_flush_accounts_cache();
2440
2441        bank_forks.insert(Bank::warp_from_parent(
2442            root_bank,
2443            &Pubkey::default(),
2444            warp_slot,
2445        ));
2446        bank_forks.set_root(warp_slot, Some(snapshot_controller), Some(warp_slot));
2447        leader_schedule_cache.set_root(&bank_forks.root_bank());
2448
2449        let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2450            ledger_path,
2451            &bank_forks.root_bank(),
2452            None,
2453            &config.snapshot_config.full_snapshot_archives_dir,
2454            &config.snapshot_config.incremental_snapshot_archives_dir,
2455            config.snapshot_config.archive_format,
2456        ) {
2457            Ok(archive_info) => archive_info,
2458            Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2459        };
2460        info!(
2461            "created snapshot: {}",
2462            full_snapshot_archive_info.path().display()
2463        );
2464
2465        drop(bank_forks);
2466        // Process blockstore after warping bank forks to make sure tower and
2467        // bank forks are in sync.
2468        process_blockstore.process()?;
2469    }
2470    Ok(())
2471}
2472
2473/// Returns the starting slot at which the blockstore should be scanned for
2474/// shreds with an incorrect shred version, or None if the check is unnecessary
2475fn should_cleanup_blockstore_incorrect_shred_versions(
2476    config: &ValidatorConfig,
2477    blockstore: &Blockstore,
2478    root_slot: Slot,
2479    hard_forks: &HardForks,
2480) -> Result<Option<Slot>, BlockstoreError> {
2481    // Perform the check if we are booting as part of a cluster restart at slot root_slot
2482    let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2483    if maybe_cluster_restart_slot.is_some() {
2484        return Ok(Some(root_slot + 1));
2485    }
2486
2487    // If there are no hard forks, the shred version cannot have changed
2488    let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2489        return Ok(None);
2490    };
2491
2492    // If the blockstore is empty, there are certainly no shreds with an incorrect version
2493    let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2494        return Ok(None);
2495    };
2496    let blockstore_min_slot = blockstore.lowest_slot();
2497    info!(
2498        "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2499         latest hard fork is {latest_hard_fork}"
2500    );
2501
2502    if latest_hard_fork < blockstore_min_slot {
2503        // latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
2504        //
2505        // All slots in the blockstore are newer than the latest hard fork, and only shreds with
2506        // the correct shred version should have been inserted since the latest hard fork
2507        //
2508        // This is the normal case where the last cluster restart & hard fork was a while ago; we
2509        // can skip the check for this case
2510        Ok(None)
2511    } else if latest_hard_fork < blockstore_max_slot {
2512        // blockstore_min_slot < latest_hard_fork < blockstore_max_slot
2513        //
2514        // This could be a case where there was a cluster restart, but this node was not part of
2515        // the supermajority that actually restarted the cluster. Rather, this node likely
2516        // downloaded a new snapshot while retaining the blockstore, including slots beyond the
2517        // chosen restart slot. We need to perform the blockstore check for this case
2518        //
2519        // Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
2520        // fork slot. Even though this node will only replay slots after root_slot, start the check
2521        // at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
2522        Ok(Some(latest_hard_fork + 1))
2523    } else {
2524        // blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
2525        //
2526        // All slots in the blockstore are older than the latest hard fork. The blockstore check
2527        // would start from latest_hard_fork + 1; skip the check as there are no slots to check
2528        //
2529        // This is kind of an unusual case to hit, maybe a node has been offline for a long time
2530        // and just restarted with a new downloaded snapshot but the old blockstore
2531        Ok(None)
2532    }
2533}
2534
2535/// Searches the blockstore for data shreds with a shred version that differs
2536/// from the passed `expected_shred_version`
2537fn scan_blockstore_for_incorrect_shred_version(
2538    blockstore: &Blockstore,
2539    start_slot: Slot,
2540    expected_shred_version: u16,
2541) -> Result<Option<u16>, BlockstoreError> {
2542    const TIMEOUT: Duration = Duration::from_secs(60);
2543    let timer = Instant::now();
2544    // Search for shreds with incompatible version in blockstore
2545    let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2546
2547    info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2548    for (slot, _meta) in slot_meta_iterator {
2549        let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2550        for shred in &shreds {
2551            if shred.version() != expected_shred_version {
2552                return Ok(Some(shred.version()));
2553            }
2554        }
2555        if timer.elapsed() > TIMEOUT {
2556            info!("Didn't find incorrect shreds after 60 seconds, aborting");
2557            break;
2558        }
2559    }
2560    Ok(None)
2561}
2562
2563/// If the blockstore contains any shreds with the incorrect shred version,
2564/// copy them to a backup blockstore and purge them from the actual blockstore.
2565fn cleanup_blockstore_incorrect_shred_versions(
2566    blockstore: &Blockstore,
2567    config: &ValidatorConfig,
2568    start_slot: Slot,
2569    expected_shred_version: u16,
2570) -> Result<(), BlockstoreError> {
2571    let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2572        blockstore,
2573        start_slot,
2574        expected_shred_version,
2575    )?;
2576    let Some(incorrect_shred_version) = incorrect_shred_version else {
2577        info!("Only shreds with the correct version were found in the blockstore");
2578        return Ok(());
2579    };
2580
2581    // .unwrap() safe because getting to this point implies blockstore has slots/shreds
2582    let end_slot = blockstore.highest_slot()?.unwrap();
2583
2584    // Backing up the shreds that will be deleted from primary blockstore is
2585    // not critical, so swallow errors from backup blockstore operations.
2586    let backup_folder = format!(
2587        "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2588    );
2589    match Blockstore::open_with_options(
2590        &blockstore.ledger_path().join(backup_folder),
2591        config.blockstore_options.clone(),
2592    ) {
2593        Ok(backup_blockstore) => {
2594            info!("Backing up slots from {start_slot} to {end_slot}");
2595            let mut timer = Measure::start("blockstore backup");
2596
2597            const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2598            let mut print_timer = Instant::now();
2599            let mut num_slots_copied = 0;
2600            let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2601            for (slot, _meta) in slot_meta_iterator {
2602                let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2603                let shreds = shreds.into_iter().map(Cow::Owned);
2604                let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2605                num_slots_copied += 1;
2606
2607                if print_timer.elapsed() > PRINT_INTERVAL {
2608                    info!("Backed up {num_slots_copied} slots thus far");
2609                    print_timer = Instant::now();
2610                }
2611            }
2612
2613            timer.stop();
2614            info!("Backing up slots done. {timer}");
2615        }
2616        Err(err) => {
2617            warn!("Unable to backup shreds with incorrect shred version: {err}");
2618        }
2619    }
2620
2621    info!("Purging slots {start_slot} to {end_slot} from blockstore");
2622    let mut timer = Measure::start("blockstore purge");
2623    blockstore.purge_from_next_slots(start_slot, end_slot);
2624    blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2625    timer.stop();
2626    info!("Purging slots done. {timer}");
2627
2628    Ok(())
2629}
2630
2631fn initialize_rpc_transaction_history_services(
2632    blockstore: Arc<Blockstore>,
2633    exit: Arc<AtomicBool>,
2634    enable_rpc_transaction_history: bool,
2635    enable_extended_tx_metadata_storage: bool,
2636    transaction_notifier: Option<TransactionNotifierArc>,
2637    dependency_tracker: Option<Arc<DependencyTracker>>,
2638) -> TransactionHistoryServices {
2639    let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2640    let (transaction_status_sender, transaction_status_receiver) = unbounded();
2641    let transaction_status_sender = Some(TransactionStatusSender {
2642        sender: transaction_status_sender,
2643        dependency_tracker: dependency_tracker.clone(),
2644    });
2645    let transaction_status_service = Some(TransactionStatusService::new(
2646        transaction_status_receiver,
2647        max_complete_transaction_status_slot.clone(),
2648        enable_rpc_transaction_history,
2649        transaction_notifier,
2650        blockstore.clone(),
2651        enable_extended_tx_metadata_storage,
2652        dependency_tracker,
2653        exit.clone(),
2654    ));
2655
2656    TransactionHistoryServices {
2657        transaction_status_sender,
2658        transaction_status_service,
2659        max_complete_transaction_status_slot,
2660    }
2661}
2662
2663#[derive(Error, Debug)]
2664pub enum ValidatorError {
2665    #[error("bank hash mismatch: actual={0}, expected={1}")]
2666    BankHashMismatch(Hash, Hash),
2667
2668    #[error("blockstore error: {0}")]
2669    Blockstore(#[source] BlockstoreError),
2670
2671    #[error("genesis hash mismatch: actual={0}, expected={1}")]
2672    GenesisHashMismatch(Hash, Hash),
2673
2674    #[error(
2675        "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2676         slot={1}"
2677    )]
2678    NotEnoughLedgerData(Slot, Slot),
2679
2680    #[error("failed to open genesis: {0}")]
2681    OpenGenesisConfig(#[source] OpenGenesisConfigError),
2682
2683    #[error("{0}")]
2684    Other(String),
2685
2686    #[error(
2687        "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2688    )]
2689    PohTooSlow { mine: u64, target: u64 },
2690
2691    #[error(transparent)]
2692    ResourceLimitError(#[from] ResourceLimitError),
2693
2694    #[error("shred version mismatch: actual {actual}, expected {expected}")]
2695    ShredVersionMismatch { actual: u16, expected: u16 },
2696
2697    #[error(transparent)]
2698    TraceError(#[from] TraceError),
2699
2700    #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2701    WenRestartFinished,
2702}
2703
2704// Return if the validator waited on other nodes to start. In this case
2705// it should not wait for one of it's votes to land to produce blocks
2706// because if the whole network is waiting, then it will stall.
2707//
2708// Error indicates that a bad hash was encountered or another condition
2709// that is unrecoverable and the validator should exit.
2710fn wait_for_supermajority(
2711    config: &ValidatorConfig,
2712    process_blockstore: Option<&mut ProcessBlockStore>,
2713    bank_forks: &RwLock<BankForks>,
2714    cluster_info: &ClusterInfo,
2715    rpc_override_health_check: Arc<AtomicBool>,
2716    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2717) -> Result<bool, ValidatorError> {
2718    match config.wait_for_supermajority {
2719        None => Ok(false),
2720        Some(wait_for_supermajority_slot) => {
2721            if let Some(process_blockstore) = process_blockstore {
2722                process_blockstore
2723                    .process()
2724                    .map_err(ValidatorError::Other)?;
2725            }
2726
2727            let bank = bank_forks.read().unwrap().working_bank();
2728            match wait_for_supermajority_slot.cmp(&bank.slot()) {
2729                std::cmp::Ordering::Less => return Ok(false),
2730                std::cmp::Ordering::Greater => {
2731                    return Err(ValidatorError::NotEnoughLedgerData(
2732                        bank.slot(),
2733                        wait_for_supermajority_slot,
2734                    ));
2735                }
2736                _ => {}
2737            }
2738
2739            if let Some(expected_bank_hash) = config.expected_bank_hash {
2740                if bank.hash() != expected_bank_hash {
2741                    return Err(ValidatorError::BankHashMismatch(
2742                        bank.hash(),
2743                        expected_bank_hash,
2744                    ));
2745                }
2746            }
2747
2748            for i in 1.. {
2749                let logging = i % 10 == 1;
2750                if logging {
2751                    info!(
2752                        "Waiting for {}% of activated stake at slot {} to be in gossip...",
2753                        WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2754                        bank.slot()
2755                    );
2756                }
2757
2758                let gossip_stake_percent =
2759                    get_stake_percent_in_gossip(&bank, cluster_info, logging);
2760
2761                *start_progress.write().unwrap() =
2762                    ValidatorStartProgress::WaitingForSupermajority {
2763                        slot: wait_for_supermajority_slot,
2764                        gossip_stake_percent,
2765                    };
2766
2767                if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2768                    info!(
2769                        "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2770                         starting up now.",
2771                    );
2772                    break;
2773                }
2774                // The normal RPC health checks don't apply as the node is waiting, so feign health to
2775                // prevent load balancers from removing the node from their list of candidates during a
2776                // manual restart.
2777                rpc_override_health_check.store(true, Ordering::Relaxed);
2778                sleep(Duration::new(1, 0));
2779            }
2780            rpc_override_health_check.store(false, Ordering::Relaxed);
2781            Ok(true)
2782        }
2783    }
2784}
2785
2786// Get the activated stake percentage (based on the provided bank) that is visible in gossip
2787fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2788    let mut online_stake = 0;
2789    let mut wrong_shred_stake = 0;
2790    let mut wrong_shred_nodes = vec![];
2791    let mut offline_stake = 0;
2792    let mut offline_nodes = vec![];
2793
2794    let mut total_activated_stake = 0;
2795    let now = timestamp();
2796    // Nodes contact infos are saved to disk and restored on validator startup.
2797    // Staked nodes entries will not expire until an epoch after. So it
2798    // is necessary here to filter for recent entries to establish liveness.
2799    let peers: HashMap<_, _> = cluster_info
2800        .tvu_peers(ContactInfo::clone)
2801        .into_iter()
2802        .filter(|node| {
2803            let age = now.saturating_sub(node.wallclock());
2804            // Contact infos are refreshed twice during this period.
2805            age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2806        })
2807        .map(|node| (*node.pubkey(), node))
2808        .collect();
2809    let my_shred_version = cluster_info.my_shred_version();
2810    let my_id = cluster_info.id();
2811
2812    for (activated_stake, vote_account) in bank.vote_accounts().values() {
2813        let activated_stake = *activated_stake;
2814        total_activated_stake += activated_stake;
2815
2816        if activated_stake == 0 {
2817            continue;
2818        }
2819        let vote_state_node_pubkey = *vote_account.node_pubkey();
2820
2821        if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2822            if peer.shred_version() == my_shred_version {
2823                trace!(
2824                    "observed {vote_state_node_pubkey} in gossip, \
2825                     (activated_stake={activated_stake})"
2826                );
2827                online_stake += activated_stake;
2828            } else {
2829                wrong_shred_stake += activated_stake;
2830                wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2831            }
2832        } else if vote_state_node_pubkey == my_id {
2833            online_stake += activated_stake; // This node is online
2834        } else {
2835            offline_stake += activated_stake;
2836            offline_nodes.push((activated_stake, vote_state_node_pubkey));
2837        }
2838    }
2839
2840    let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2841    if log {
2842        info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2843
2844        if !wrong_shred_nodes.is_empty() {
2845            info!(
2846                "{:.3}% of active stake has the wrong shred version in gossip",
2847                (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2848            );
2849            wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2850            for (stake, identity) in wrong_shred_nodes {
2851                info!(
2852                    "    {:.3}% - {}",
2853                    (stake as f64 / total_activated_stake as f64) * 100.,
2854                    identity
2855                );
2856            }
2857        }
2858
2859        if !offline_nodes.is_empty() {
2860            info!(
2861                "{:.3}% of active stake is not visible in gossip",
2862                (offline_stake as f64 / total_activated_stake as f64) * 100.
2863            );
2864            offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); // sort by reverse stake weight
2865            for (stake, identity) in offline_nodes {
2866                info!(
2867                    "    {:.3}% - {}",
2868                    (stake as f64 / total_activated_stake as f64) * 100.,
2869                    identity
2870                );
2871            }
2872        }
2873        datapoint_info!(
2874            "wfsm_gossip",
2875            ("online_stake", online_stake, i64),
2876            ("offline_stake", offline_stake, i64),
2877            ("total_activated_stake", total_activated_stake, i64),
2878        );
2879    }
2880
2881    online_stake_percentage as u64
2882}
2883
2884fn cleanup_accounts_paths(config: &ValidatorConfig) {
2885    for account_path in &config.account_paths {
2886        move_and_async_delete_path_contents(account_path);
2887    }
2888    if let Some(shrink_paths) = &config.accounts_db_config.shrink_paths {
2889        for shrink_path in shrink_paths {
2890            move_and_async_delete_path_contents(shrink_path);
2891        }
2892    }
2893}
2894
2895pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2896    // if the snapshot config is configured to *not* take snapshots, then it is valid
2897    if !snapshot_config.should_generate_snapshots() {
2898        return true;
2899    }
2900
2901    let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2902        snapshot_config.full_snapshot_archive_interval
2903    else {
2904        // if we *are* generating snapshots, then the full snapshot interval cannot be disabled
2905        return false;
2906    };
2907
2908    match snapshot_config.incremental_snapshot_archive_interval {
2909        SnapshotInterval::Disabled => true,
2910        SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2911            full_snapshot_interval_slots > incremental_snapshot_interval_slots
2912        }
2913    }
2914}
2915
2916#[cfg(test)]
2917mod tests {
2918    use {
2919        super::*,
2920        crossbeam_channel::{bounded, RecvTimeoutError},
2921        solana_entry::entry,
2922        solana_genesis_config::create_genesis_config,
2923        solana_gossip::contact_info::ContactInfo,
2924        solana_ledger::{
2925            blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2926            get_tmp_ledger_path_auto_delete,
2927        },
2928        solana_poh_config::PohConfig,
2929        solana_sha256_hasher::hash,
2930        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2931        std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2932    };
2933
2934    #[test]
2935    fn validator_exit() {
2936        agave_logger::setup();
2937        let leader_keypair = Keypair::new();
2938        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2939
2940        let validator_keypair = Keypair::new();
2941        let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2942        let genesis_config =
2943            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2944                .genesis_config;
2945        let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2946
2947        let voting_keypair = Arc::new(Keypair::new());
2948        let config = ValidatorConfig {
2949            rpc_addrs: Some((
2950                validator_node.info.rpc().unwrap(),
2951                validator_node.info.rpc_pubsub().unwrap(),
2952            )),
2953            ..ValidatorConfig::default_for_test()
2954        };
2955        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2956        let validator = Validator::new(
2957            validator_node,
2958            Arc::new(validator_keypair),
2959            &validator_ledger_path,
2960            &voting_keypair.pubkey(),
2961            Arc::new(RwLock::new(vec![voting_keypair])),
2962            vec![leader_node.info],
2963            &config,
2964            true, // should_check_duplicate_instance
2965            None, // rpc_to_plugin_manager_receiver
2966            start_progress.clone(),
2967            SocketAddrSpace::Unspecified,
2968            ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2969            Arc::new(RwLock::new(None)),
2970        )
2971        .expect("assume successful validator start");
2972        assert_eq!(
2973            *start_progress.read().unwrap(),
2974            ValidatorStartProgress::Running
2975        );
2976        validator.close();
2977        remove_dir_all(validator_ledger_path).unwrap();
2978    }
2979
2980    #[test]
2981    fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2982        agave_logger::setup();
2983
2984        let ledger_path = get_tmp_ledger_path_auto_delete!();
2985        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2986
2987        let mut validator_config = ValidatorConfig::default_for_test();
2988        let mut hard_forks = HardForks::default();
2989        let mut root_slot;
2990
2991        // Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
2992        root_slot = 10;
2993        validator_config.wait_for_supermajority = Some(root_slot);
2994        assert_eq!(
2995            should_cleanup_blockstore_incorrect_shred_versions(
2996                &validator_config,
2997                &blockstore,
2998                root_slot,
2999                &hard_forks
3000            )
3001            .unwrap(),
3002            Some(root_slot + 1)
3003        );
3004
3005        // No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
3006        // Arguably operator error to pass a value for wait_for_supermajority in this case
3007        root_slot = 15;
3008        assert_eq!(
3009            should_cleanup_blockstore_incorrect_shred_versions(
3010                &validator_config,
3011                &blockstore,
3012                root_slot,
3013                &hard_forks
3014            )
3015            .unwrap(),
3016            None,
3017        );
3018
3019        // Emulate cluster restart at slot 10
3020        // No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
3021        hard_forks.register(10);
3022        assert_eq!(
3023            should_cleanup_blockstore_incorrect_shred_versions(
3024                &validator_config,
3025                &blockstore,
3026                root_slot,
3027                &hard_forks
3028            )
3029            .unwrap(),
3030            None,
3031        );
3032
3033        // Insert some shreds at newer slots than hard fork
3034        let entries = entry::create_ticks(1, 0, Hash::default());
3035        for i in 20..35 {
3036            let shreds = blockstore::entries_to_test_shreds(
3037                &entries,
3038                i,     // slot
3039                i - 1, // parent_slot
3040                true,  // is_full_slot
3041                1,     // version
3042            );
3043            blockstore.insert_shreds(shreds, None, true).unwrap();
3044        }
3045
3046        // No check as all blockstore data is newer than latest hard fork
3047        assert_eq!(
3048            should_cleanup_blockstore_incorrect_shred_versions(
3049                &validator_config,
3050                &blockstore,
3051                root_slot,
3052                &hard_forks
3053            )
3054            .unwrap(),
3055            None,
3056        );
3057
3058        // Emulate cluster restart at slot 25
3059        // Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
3060        root_slot = 25;
3061        hard_forks.register(root_slot);
3062        validator_config.wait_for_supermajority = Some(root_slot);
3063        assert_eq!(
3064            should_cleanup_blockstore_incorrect_shred_versions(
3065                &validator_config,
3066                &blockstore,
3067                root_slot,
3068                &hard_forks
3069            )
3070            .unwrap(),
3071            Some(root_slot + 1),
3072        );
3073        validator_config.wait_for_supermajority = None;
3074        assert_eq!(
3075            should_cleanup_blockstore_incorrect_shred_versions(
3076                &validator_config,
3077                &blockstore,
3078                root_slot,
3079                &hard_forks
3080            )
3081            .unwrap(),
3082            Some(root_slot + 1),
3083        );
3084
3085        // Do check with advanced root slot, even without wait_for_supermajority set correctly
3086        // Check starts from latest hard fork + 1
3087        root_slot = 30;
3088        let latest_hard_fork = hard_forks.iter().last().unwrap().0;
3089        assert_eq!(
3090            should_cleanup_blockstore_incorrect_shred_versions(
3091                &validator_config,
3092                &blockstore,
3093                root_slot,
3094                &hard_forks
3095            )
3096            .unwrap(),
3097            Some(latest_hard_fork + 1),
3098        );
3099
3100        // Purge blockstore up to latest hard fork
3101        // No check since all blockstore data newer than latest hard fork
3102        blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
3103        assert_eq!(
3104            should_cleanup_blockstore_incorrect_shred_versions(
3105                &validator_config,
3106                &blockstore,
3107                root_slot,
3108                &hard_forks
3109            )
3110            .unwrap(),
3111            None,
3112        );
3113    }
3114
3115    #[test]
3116    fn test_cleanup_blockstore_incorrect_shred_versions() {
3117        agave_logger::setup();
3118
3119        let validator_config = ValidatorConfig::default_for_test();
3120        let ledger_path = get_tmp_ledger_path_auto_delete!();
3121        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3122
3123        let entries = entry::create_ticks(1, 0, Hash::default());
3124        for i in 1..10 {
3125            let shreds = blockstore::entries_to_test_shreds(
3126                &entries,
3127                i,     // slot
3128                i - 1, // parent_slot
3129                true,  // is_full_slot
3130                1,     // version
3131            );
3132            blockstore.insert_shreds(shreds, None, true).unwrap();
3133        }
3134
3135        // this purges and compacts all slots greater than or equal to 5
3136        cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3137        // assert that slots less than 5 aren't affected
3138        assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3139        for i in 5..10 {
3140            assert!(blockstore
3141                .get_data_shreds_for_slot(i, 0)
3142                .unwrap()
3143                .is_empty());
3144        }
3145    }
3146
3147    #[test]
3148    fn validator_parallel_exit() {
3149        let leader_keypair = Keypair::new();
3150        let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3151        let genesis_config =
3152            create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3153                .genesis_config;
3154
3155        let mut ledger_paths = vec![];
3156        let mut validators: Vec<Validator> = (0..2)
3157            .map(|_| {
3158                let validator_keypair = Keypair::new();
3159                let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3160                let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3161                ledger_paths.push(validator_ledger_path.clone());
3162                let vote_account_keypair = Keypair::new();
3163                let config = ValidatorConfig {
3164                    rpc_addrs: Some((
3165                        validator_node.info.rpc().unwrap(),
3166                        validator_node.info.rpc_pubsub().unwrap(),
3167                    )),
3168                    ..ValidatorConfig::default_for_test()
3169                };
3170                Validator::new(
3171                    validator_node,
3172                    Arc::new(validator_keypair),
3173                    &validator_ledger_path,
3174                    &vote_account_keypair.pubkey(),
3175                    Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3176                    vec![leader_node.info.clone()],
3177                    &config,
3178                    true, // should_check_duplicate_instance.
3179                    None, // rpc_to_plugin_manager_receiver
3180                    Arc::new(RwLock::new(ValidatorStartProgress::default())),
3181                    SocketAddrSpace::Unspecified,
3182                    ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3183                    Arc::new(RwLock::new(None)),
3184                )
3185                .expect("assume successful validator start")
3186            })
3187            .collect();
3188
3189        // Each validator can exit in parallel to speed many sequential calls to join`
3190        validators.iter_mut().for_each(|v| v.exit());
3191
3192        // spawn a new thread to wait for the join of the validator
3193        let (sender, receiver) = bounded(0);
3194        let _ = thread::spawn(move || {
3195            validators.into_iter().for_each(|validator| {
3196                validator.join();
3197            });
3198            sender.send(()).unwrap();
3199        });
3200
3201        let timeout = Duration::from_secs(60);
3202        if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3203            panic!("timeout for shutting down validators",);
3204        }
3205
3206        for path in ledger_paths {
3207            remove_dir_all(path).unwrap();
3208        }
3209    }
3210
3211    #[test]
3212    fn test_wait_for_supermajority() {
3213        agave_logger::setup();
3214        let node_keypair = Arc::new(Keypair::new());
3215        let cluster_info = ClusterInfo::new(
3216            ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3217            node_keypair,
3218            SocketAddrSpace::Unspecified,
3219        );
3220
3221        let (genesis_config, _mint_keypair) = create_genesis_config(1);
3222        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3223        let mut config = ValidatorConfig::default_for_test();
3224        let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3225        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3226
3227        assert!(!wait_for_supermajority(
3228            &config,
3229            None,
3230            &bank_forks,
3231            &cluster_info,
3232            rpc_override_health_check.clone(),
3233            &start_progress,
3234        )
3235        .unwrap());
3236
3237        // bank=0, wait=1, should fail
3238        config.wait_for_supermajority = Some(1);
3239        assert!(matches!(
3240            wait_for_supermajority(
3241                &config,
3242                None,
3243                &bank_forks,
3244                &cluster_info,
3245                rpc_override_health_check.clone(),
3246                &start_progress,
3247            ),
3248            Err(ValidatorError::NotEnoughLedgerData(_, _)),
3249        ));
3250
3251        // bank=1, wait=0, should pass, bank is past the wait slot
3252        let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3253            bank_forks.read().unwrap().root_bank(),
3254            &Pubkey::default(),
3255            1,
3256        ));
3257        config.wait_for_supermajority = Some(0);
3258        assert!(!wait_for_supermajority(
3259            &config,
3260            None,
3261            &bank_forks,
3262            &cluster_info,
3263            rpc_override_health_check.clone(),
3264            &start_progress,
3265        )
3266        .unwrap());
3267
3268        // bank=1, wait=1, equal, but bad hash provided
3269        config.wait_for_supermajority = Some(1);
3270        config.expected_bank_hash = Some(hash(&[1]));
3271        assert!(matches!(
3272            wait_for_supermajority(
3273                &config,
3274                None,
3275                &bank_forks,
3276                &cluster_info,
3277                rpc_override_health_check,
3278                &start_progress,
3279            ),
3280            Err(ValidatorError::BankHashMismatch(_, _)),
3281        ));
3282    }
3283
3284    #[test]
3285    fn test_is_snapshot_config_valid() {
3286        fn new_snapshot_config(
3287            full_snapshot_archive_interval_slots: Slot,
3288            incremental_snapshot_archive_interval_slots: Slot,
3289        ) -> SnapshotConfig {
3290            SnapshotConfig {
3291                full_snapshot_archive_interval: SnapshotInterval::Slots(
3292                    NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3293                ),
3294                incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3295                    NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3296                ),
3297                ..SnapshotConfig::default()
3298            }
3299        }
3300
3301        // default config must be valid
3302        assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3303
3304        // disabled incremental snapshot must be valid
3305        assert!(is_snapshot_config_valid(&SnapshotConfig {
3306            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3307            ..SnapshotConfig::default()
3308        }));
3309
3310        // disabled full snapshot must be invalid though (if generating snapshots)
3311        assert!(!is_snapshot_config_valid(&SnapshotConfig {
3312            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3313            ..SnapshotConfig::default()
3314        }));
3315
3316        // simple config must be valid
3317        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3318        assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3319        assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3320        assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3321
3322        // config where full interval is not larger than incremental interval must be invalid
3323        assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3324        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3325        assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3326
3327        // config with snapshots disabled (or load-only) must be valid
3328        assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3329        assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3330        assert!(is_snapshot_config_valid(&SnapshotConfig {
3331            full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3332            incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3333                NonZeroU64::new(41).unwrap()
3334            ),
3335            ..SnapshotConfig::new_load_only()
3336        }));
3337        assert!(is_snapshot_config_valid(&SnapshotConfig {
3338            full_snapshot_archive_interval: SnapshotInterval::Disabled,
3339            incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3340            ..SnapshotConfig::new_load_only()
3341        }));
3342    }
3343
3344    fn target_tick_duration() -> Duration {
3345        // DEFAULT_MS_PER_SLOT = 400
3346        // DEFAULT_TICKS_PER_SLOT = 64
3347        // MS_PER_TICK = 6
3348        //
3349        // But, DEFAULT_MS_PER_SLOT / DEFAULT_TICKS_PER_SLOT = 6.25
3350        //
3351        // So, convert to microseconds first to avoid the integer rounding error
3352        let target_tick_duration_us =
3353            solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3354        assert_eq!(target_tick_duration_us, 6250);
3355        Duration::from_micros(target_tick_duration_us)
3356    }
3357
3358    #[test]
3359    fn test_poh_speed() {
3360        agave_logger::setup();
3361        let poh_config = PohConfig {
3362            target_tick_duration: target_tick_duration(),
3363            // make PoH rate really fast to cause the panic condition
3364            hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3365            ..PohConfig::default()
3366        };
3367        let genesis_config = GenesisConfig {
3368            poh_config,
3369            ..GenesisConfig::default()
3370        };
3371        let bank = Bank::new_for_tests(&genesis_config);
3372        assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3373    }
3374
3375    #[test]
3376    fn test_poh_speed_no_hashes_per_tick() {
3377        agave_logger::setup();
3378        let poh_config = PohConfig {
3379            target_tick_duration: target_tick_duration(),
3380            hashes_per_tick: None,
3381            ..PohConfig::default()
3382        };
3383        let genesis_config = GenesisConfig {
3384            poh_config,
3385            ..GenesisConfig::default()
3386        };
3387        let bank = Bank::new_for_tests(&genesis_config);
3388        check_poh_speed(&bank, Some(10_000)).unwrap();
3389    }
3390}