solana_core/
tvu.rs

1//! The `tvu` module implements the Transaction Validation Unit, a multi-stage transaction
2//! validation pipeline in software.
3
4use {
5    crate::{
6        banking_trace::BankingTracer,
7        cluster_info_vote_listener::{
8            DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9            VoteTracker,
10        },
11        cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12        completed_data_sets_service::CompletedDataSetsSender,
13        consensus::{tower_storage::TowerStorage, Tower},
14        cost_update_service::CostUpdateService,
15        drop_bank_service::DropBankService,
16        repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17        replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18        shred_fetch_stage::{ShredFetchStage, SHRED_FETCH_CHANNEL_SIZE},
19        voting_service::VotingService,
20        warm_quic_cache_service::WarmQuicCacheService,
21        window_service::{WindowService, WindowServiceChannels},
22    },
23    bytes::Bytes,
24    crossbeam_channel::{unbounded, Receiver, Sender},
25    solana_client::connection_cache::ConnectionCache,
26    solana_clock::Slot,
27    solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28    solana_gossip::{
29        cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30        duplicate_shred_listener::DuplicateShredListener,
31    },
32    solana_keypair::Keypair,
33    solana_ledger::{
34        blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35        blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36        leader_schedule_cache::LeaderScheduleCache,
37    },
38    solana_poh::{poh_controller::PohController, poh_recorder::PohRecorder},
39    solana_pubkey::Pubkey,
40    solana_rpc::{
41        max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42        rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43    },
44    solana_runtime::{
45        bank_forks::BankForks, commitment::BlockCommitmentCache,
46        prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47        vote_sender_types::ReplayVoteSender,
48    },
49    solana_streamer::evicting_sender::EvictingSender,
50    solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
51    std::{
52        collections::HashSet,
53        net::{SocketAddr, UdpSocket},
54        num::NonZeroUsize,
55        sync::{atomic::AtomicBool, Arc, RwLock},
56        thread::{self, JoinHandle},
57    },
58    tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61/// Sets the upper bound on the number of batches stored in the retransmit
62/// stage ingress channel.
63/// Allows for a max of 16k batches of up to 64 packets each
64/// (PACKETS_PER_BATCH).
65/// This translates to about 1 GB of RAM for packet storage in the worst case.
66/// In reality this means about 200K shreds since most batches are not full.
67const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
68
69pub struct Tvu {
70    fetch_stage: ShredFetchStage,
71    shred_sigverify: JoinHandle<()>,
72    retransmit_stage: RetransmitStage,
73    window_service: WindowService,
74    cluster_slots_service: ClusterSlotsService,
75    replay_stage: Option<ReplayStage>,
76    blockstore_cleanup_service: Option<BlockstoreCleanupService>,
77    cost_update_service: CostUpdateService,
78    voting_service: VotingService,
79    warm_quic_cache_service: Option<WarmQuicCacheService>,
80    drop_bank_service: DropBankService,
81    duplicate_shred_listener: DuplicateShredListener,
82}
83
84pub struct TvuSockets {
85    pub fetch: Vec<UdpSocket>,
86    pub repair: UdpSocket,
87    pub retransmit: Vec<UdpSocket>,
88    pub ancestor_hashes_requests: UdpSocket,
89    pub alpenglow: Option<UdpSocket>,
90}
91
92pub struct TvuConfig {
93    pub max_ledger_shreds: Option<u64>,
94    pub shred_version: u16,
95    // Validators from which repairs are requested
96    pub repair_validators: Option<HashSet<Pubkey>>,
97    // Validators which should be given priority when serving repairs
98    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
99    pub wait_for_vote_to_start_leader: bool,
100    pub replay_forks_threads: NonZeroUsize,
101    pub replay_transactions_threads: NonZeroUsize,
102    pub shred_sigverify_threads: NonZeroUsize,
103    pub xdp_sender: Option<XdpSender>,
104}
105
106impl Default for TvuConfig {
107    fn default() -> Self {
108        Self {
109            max_ledger_shreds: None,
110            shred_version: 0,
111            repair_validators: None,
112            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
113            wait_for_vote_to_start_leader: false,
114            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
116            shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
117            xdp_sender: None,
118        }
119    }
120}
121
122impl Tvu {
123    /// This service receives messages from a leader in the network and processes the transactions
124    /// on the bank state.
125    /// # Arguments
126    /// * `cluster_info` - The cluster_info state.
127    /// * `sockets` - fetch, repair, and retransmit sockets
128    /// * `blockstore` - the ledger itself
129    #[allow(clippy::too_many_arguments)]
130    pub fn new(
131        vote_account: &Pubkey,
132        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
133        bank_forks: &Arc<RwLock<BankForks>>,
134        cluster_info: &Arc<ClusterInfo>,
135        sockets: TvuSockets,
136        blockstore: Arc<Blockstore>,
137        ledger_signal_receiver: Receiver<bool>,
138        rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
139        poh_recorder: &Arc<RwLock<PohRecorder>>,
140        poh_controller: PohController,
141        tower: Tower,
142        tower_storage: Arc<dyn TowerStorage>,
143        leader_schedule_cache: &Arc<LeaderScheduleCache>,
144        exit: Arc<AtomicBool>,
145        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
146        turbine_disabled: Arc<AtomicBool>,
147        transaction_status_sender: Option<TransactionStatusSender>,
148        entry_notification_sender: Option<EntryNotifierSender>,
149        vote_tracker: Arc<VoteTracker>,
150        retransmit_slots_sender: Sender<Slot>,
151        gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
152        verified_vote_receiver: VerifiedVoteReceiver,
153        replay_vote_sender: ReplayVoteSender,
154        completed_data_sets_sender: Option<CompletedDataSetsSender>,
155        bank_notification_sender: Option<BankNotificationSenderConfig>,
156        duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
157        tvu_config: TvuConfig,
158        max_slots: &Arc<MaxSlots>,
159        block_metadata_notifier: Option<BlockMetadataNotifierArc>,
160        wait_to_vote_slot: Option<Slot>,
161        snapshot_controller: Option<Arc<SnapshotController>>,
162        log_messages_bytes_limit: Option<usize>,
163        connection_cache: Option<&Arc<ConnectionCache>>,
164        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
165        banking_tracer: Arc<BankingTracer>,
166        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
167        turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
168        repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
169        repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
170        ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
171        ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
172        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
173        cluster_slots: Arc<ClusterSlots>,
174        wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
175        slot_status_notifier: Option<SlotStatusNotifier>,
176        vote_connection_cache: Arc<ConnectionCache>,
177    ) -> Result<Self, String> {
178        let in_wen_restart = wen_restart_repair_slots.is_some();
179
180        let TvuSockets {
181            repair: repair_socket,
182            fetch: fetch_sockets,
183            retransmit: retransmit_sockets,
184            ancestor_hashes_requests: ancestor_hashes_socket,
185            alpenglow: alpenglow_socket,
186        } = sockets;
187
188        let (fetch_sender, fetch_receiver) = EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE);
189
190        let repair_socket = Arc::new(repair_socket);
191        let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
192        let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
193        let fetch_stage = ShredFetchStage::new(
194            fetch_sockets,
195            turbine_quic_endpoint_receiver,
196            repair_response_quic_receiver,
197            repair_socket.clone(),
198            fetch_sender,
199            tvu_config.shred_version,
200            bank_forks.clone(),
201            cluster_info.clone(),
202            outstanding_repair_requests.clone(),
203            turbine_disabled,
204            exit.clone(),
205        );
206
207        let (verified_sender, verified_receiver) = unbounded();
208
209        let (retransmit_sender, retransmit_receiver) =
210            EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
211
212        let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
213            cluster_info.clone(),
214            bank_forks.clone(),
215            leader_schedule_cache.clone(),
216            fetch_receiver,
217            retransmit_sender.clone(),
218            verified_sender,
219            tvu_config.shred_sigverify_threads,
220        );
221
222        let retransmit_stage = RetransmitStage::new(
223            bank_forks.clone(),
224            leader_schedule_cache.clone(),
225            cluster_info.clone(),
226            Arc::new(retransmit_sockets),
227            turbine_quic_endpoint_sender,
228            retransmit_receiver,
229            max_slots.clone(),
230            rpc_subscriptions.clone(),
231            slot_status_notifier.clone(),
232            tvu_config.xdp_sender,
233            // votor_event_sender is Alpenglow specific sender, it is None if Alpenglow is not enabled.
234            None,
235        );
236
237        let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
238        let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
239        let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
240            unbounded();
241        let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
242        let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
243        let window_service = {
244            let epoch_schedule = bank_forks
245                .read()
246                .unwrap()
247                .working_bank()
248                .epoch_schedule()
249                .clone();
250            let repair_info = RepairInfo {
251                bank_forks: bank_forks.clone(),
252                epoch_schedule,
253                ancestor_duplicate_slots_sender,
254                repair_validators: tvu_config.repair_validators,
255                repair_whitelist: tvu_config.repair_whitelist,
256                cluster_info: cluster_info.clone(),
257                cluster_slots: cluster_slots.clone(),
258                wen_restart_repair_slots,
259            };
260            let repair_service_channels = RepairServiceChannels::new(
261                repair_request_quic_sender,
262                verified_vote_receiver,
263                dumped_slots_receiver,
264                popular_pruned_forks_sender,
265                ancestor_hashes_request_quic_sender,
266                ancestor_hashes_response_quic_receiver,
267                ancestor_hashes_replay_update_receiver,
268            );
269            let window_service_channels = WindowServiceChannels::new(
270                verified_receiver,
271                retransmit_sender,
272                completed_data_sets_sender,
273                duplicate_slots_sender.clone(),
274                repair_service_channels,
275            );
276            WindowService::new(
277                blockstore.clone(),
278                repair_socket,
279                ancestor_hashes_socket,
280                exit.clone(),
281                repair_info,
282                window_service_channels,
283                leader_schedule_cache.clone(),
284                outstanding_repair_requests,
285            )
286        };
287
288        let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
289        let cluster_slots_service = ClusterSlotsService::new(
290            blockstore.clone(),
291            cluster_slots.clone(),
292            bank_forks.clone(),
293            cluster_info.clone(),
294            cluster_slots_update_receiver,
295            exit.clone(),
296        );
297
298        let (cost_update_sender, cost_update_receiver) = unbounded();
299        let (drop_bank_sender, drop_bank_receiver) = unbounded();
300        let (voting_sender, voting_receiver) = unbounded();
301
302        let replay_senders = ReplaySenders {
303            rpc_subscriptions,
304            slot_status_notifier,
305            transaction_status_sender,
306            entry_notification_sender,
307            bank_notification_sender,
308            ancestor_hashes_replay_update_sender,
309            retransmit_slots_sender,
310            replay_vote_sender,
311            cluster_slots_update_sender,
312            cost_update_sender,
313            voting_sender,
314            drop_bank_sender,
315            block_metadata_notifier,
316            dumped_slots_sender,
317        };
318
319        let replay_receivers = ReplayReceivers {
320            ledger_signal_receiver,
321            duplicate_slots_receiver,
322            ancestor_duplicate_slots_receiver,
323            duplicate_confirmed_slots_receiver,
324            gossip_verified_vote_hash_receiver,
325            popular_pruned_forks_receiver,
326        };
327
328        let replay_stage_config = ReplayStageConfig {
329            vote_account: *vote_account,
330            authorized_voter_keypairs,
331            exit: exit.clone(),
332            leader_schedule_cache: leader_schedule_cache.clone(),
333            block_commitment_cache,
334            wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
335            tower_storage: tower_storage.clone(),
336            wait_to_vote_slot,
337            replay_forks_threads: tvu_config.replay_forks_threads,
338            replay_transactions_threads: tvu_config.replay_transactions_threads,
339            blockstore: blockstore.clone(),
340            bank_forks: bank_forks.clone(),
341            cluster_info: cluster_info.clone(),
342            poh_recorder: poh_recorder.clone(),
343            poh_controller,
344            tower,
345            vote_tracker,
346            cluster_slots,
347            log_messages_bytes_limit,
348            prioritization_fee_cache: prioritization_fee_cache.clone(),
349            banking_tracer,
350            snapshot_controller,
351        };
352
353        let voting_service = VotingService::new(
354            voting_receiver,
355            cluster_info.clone(),
356            poh_recorder.clone(),
357            tower_storage,
358            vote_connection_cache.clone(),
359            alpenglow_socket,
360            bank_forks.clone(),
361        );
362
363        let warm_quic_cache_service = create_cache_warmer_if_needed(
364            connection_cache,
365            vote_connection_cache,
366            cluster_info,
367            poh_recorder,
368            &exit,
369        );
370
371        let cost_update_service = CostUpdateService::new(cost_update_receiver);
372
373        let drop_bank_service = DropBankService::new(drop_bank_receiver);
374
375        let replay_stage = if in_wen_restart {
376            None
377        } else {
378            Some(ReplayStage::new(
379                replay_stage_config,
380                replay_senders,
381                replay_receivers,
382            )?)
383        };
384
385        let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
386            BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
387        });
388
389        let duplicate_shred_listener = DuplicateShredListener::new(
390            exit,
391            cluster_info.clone(),
392            DuplicateShredHandler::new(
393                blockstore,
394                leader_schedule_cache.clone(),
395                bank_forks.clone(),
396                duplicate_slots_sender,
397                tvu_config.shred_version,
398            ),
399        );
400
401        Ok(Tvu {
402            fetch_stage,
403            shred_sigverify,
404            retransmit_stage,
405            window_service,
406            cluster_slots_service,
407            replay_stage,
408            blockstore_cleanup_service,
409            cost_update_service,
410            voting_service,
411            warm_quic_cache_service,
412            drop_bank_service,
413            duplicate_shred_listener,
414        })
415    }
416
417    pub fn join(self) -> thread::Result<()> {
418        self.retransmit_stage.join()?;
419        self.window_service.join()?;
420        self.cluster_slots_service.join()?;
421        self.fetch_stage.join()?;
422        self.shred_sigverify.join()?;
423        if self.blockstore_cleanup_service.is_some() {
424            self.blockstore_cleanup_service.unwrap().join()?;
425        }
426        if self.replay_stage.is_some() {
427            self.replay_stage.unwrap().join()?;
428        }
429        self.cost_update_service.join()?;
430        self.voting_service.join()?;
431        if let Some(warmup_service) = self.warm_quic_cache_service {
432            warmup_service.join()?;
433        }
434        self.drop_bank_service.join()?;
435        self.duplicate_shred_listener.join()?;
436        Ok(())
437    }
438}
439
440fn create_cache_warmer_if_needed(
441    connection_cache: Option<&Arc<ConnectionCache>>,
442    vote_connection_cache: Arc<ConnectionCache>,
443    cluster_info: &Arc<ClusterInfo>,
444    poh_recorder: &Arc<RwLock<PohRecorder>>,
445    exit: &Arc<AtomicBool>,
446) -> Option<WarmQuicCacheService> {
447    let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
448    let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
449
450    (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
451        WarmQuicCacheService::new(
452            tpu_connection_cache,
453            vote_connection_cache,
454            cluster_info.clone(),
455            poh_recorder.clone(),
456            exit.clone(),
457        )
458    })
459}
460
461#[cfg(test)]
462pub mod tests {
463    use {
464        super::*,
465        crate::{
466            consensus::tower_storage::FileTowerStorage,
467            repair::quic_endpoint::RepairQuicAsyncSenders,
468        },
469        serial_test::serial,
470        solana_gossip::{cluster_info::ClusterInfo, node::Node},
471        solana_keypair::Keypair,
472        solana_ledger::{
473            blockstore::BlockstoreSignals,
474            blockstore_options::BlockstoreOptions,
475            create_new_tmp_ledger,
476            genesis_utils::{create_genesis_config, GenesisConfigInfo},
477        },
478        solana_poh::poh_recorder::create_test_recorder,
479        solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
480        solana_runtime::bank::Bank,
481        solana_signer::Signer,
482        solana_streamer::socket::SocketAddrSpace,
483        solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
484        std::sync::atomic::{AtomicU64, Ordering},
485    };
486
487    fn test_tvu_exit(enable_wen_restart: bool) {
488        agave_logger::setup();
489        let leader = Node::new_localhost();
490        let target1_keypair = Keypair::new();
491        let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
492
493        let starting_balance = 10_000;
494        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
495
496        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
497
498        let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
499            tokio::sync::mpsc::channel(/*capacity:*/ 128);
500        let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
501        let (_, repair_response_quic_receiver) = unbounded();
502        let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
503        let (_, ancestor_hashes_response_quic_receiver) = unbounded();
504        //start cluster_info1
505        let cluster_info1 = ClusterInfo::new(
506            target1.info.clone(),
507            target1_keypair.into(),
508            SocketAddrSpace::Unspecified,
509        );
510        cluster_info1.insert_info(leader.info);
511        let cref1 = Arc::new(cluster_info1);
512
513        let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
514        let BlockstoreSignals {
515            blockstore,
516            ledger_signal_receiver,
517            ..
518        } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
519            .expect("Expected to successfully open ledger");
520        let blockstore = Arc::new(blockstore);
521        let bank = bank_forks.read().unwrap().working_bank();
522        let (
523            exit,
524            poh_recorder,
525            poh_controller,
526            _transaction_recorder,
527            poh_service,
528            _entry_receiver,
529        ) = create_test_recorder(bank.clone(), blockstore.clone(), None, None);
530        let vote_keypair = Keypair::new();
531        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
532        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
533        let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
534        let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
535        let (_verified_vote_sender, verified_vote_receiver) = unbounded();
536        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
537        let (_, gossip_confirmed_slots_receiver) = unbounded();
538        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
539        let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
540        let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
541        let cluster_slots = Arc::new(ClusterSlots::default_for_tests());
542        let wen_restart_repair_slots = if enable_wen_restart {
543            Some(Arc::new(RwLock::new(vec![])))
544        } else {
545            None
546        };
547        let connection_cache = if DEFAULT_VOTE_USE_QUIC {
548            ConnectionCache::new_quic_for_tests(
549                "connection_cache_vote_quic",
550                DEFAULT_TPU_CONNECTION_POOL_SIZE,
551            )
552        } else {
553            ConnectionCache::with_udp(
554                "connection_cache_vote_udp",
555                DEFAULT_TPU_CONNECTION_POOL_SIZE,
556            )
557        };
558
559        let tvu = Tvu::new(
560            &vote_keypair.pubkey(),
561            Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
562            &bank_forks,
563            &cref1,
564            {
565                TvuSockets {
566                    repair: target1.sockets.repair,
567                    retransmit: target1.sockets.retransmit_sockets,
568                    fetch: target1.sockets.tvu,
569                    ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
570                    alpenglow: target1.sockets.alpenglow,
571                }
572            },
573            blockstore,
574            ledger_signal_receiver,
575            Some(Arc::new(RpcSubscriptions::new_for_tests(
576                exit.clone(),
577                max_complete_transaction_status_slot,
578                bank_forks.clone(),
579                block_commitment_cache.clone(),
580                OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
581            ))),
582            &poh_recorder,
583            poh_controller,
584            Tower::default(),
585            Arc::new(FileTowerStorage::default()),
586            &leader_schedule_cache,
587            exit.clone(),
588            block_commitment_cache,
589            Arc::<AtomicBool>::default(),
590            None,
591            None,
592            Arc::<VoteTracker>::default(),
593            retransmit_slots_sender,
594            gossip_verified_vote_hash_receiver,
595            verified_vote_receiver,
596            replay_vote_sender,
597            /*completed_data_sets_sender:*/ None,
598            None,
599            gossip_confirmed_slots_receiver,
600            TvuConfig::default(),
601            &Arc::new(MaxSlots::default()),
602            None,
603            None,
604            None, // snapshot_controller
605            None,
606            Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
607            &ignored_prioritization_fee_cache,
608            BankingTracer::new_disabled(),
609            turbine_quic_endpoint_sender,
610            turbine_quic_endpoint_receiver,
611            repair_response_quic_receiver,
612            repair_quic_async_senders.repair_request_quic_sender,
613            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
614            ancestor_hashes_response_quic_receiver,
615            outstanding_repair_requests,
616            cluster_slots,
617            wen_restart_repair_slots,
618            None,
619            Arc::new(connection_cache),
620        )
621        .expect("assume success");
622        if enable_wen_restart {
623            assert!(tvu.replay_stage.is_none())
624        } else {
625            assert!(tvu.replay_stage.is_some())
626        }
627        exit.store(true, Ordering::Relaxed);
628        tvu.join().unwrap();
629        poh_service.join().unwrap();
630    }
631
632    #[test]
633    #[serial]
634    fn test_tvu_exit_no_wen_restart() {
635        test_tvu_exit(false);
636    }
637
638    #[test]
639    #[serial]
640    fn test_tvu_exit_with_wen_restart() {
641        test_tvu_exit(true);
642    }
643}