solana_core/
tvu.rs

1//! The `tvu` module implements the Transaction Validation Unit, a multi-stage transaction
2//! validation pipeline in software.
3
4use {
5    crate::{
6        banking_trace::BankingTracer,
7        cluster_info_vote_listener::{
8            DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9            VoteTracker,
10        },
11        cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12        completed_data_sets_service::CompletedDataSetsSender,
13        consensus::{tower_storage::TowerStorage, Tower},
14        cost_update_service::CostUpdateService,
15        drop_bank_service::DropBankService,
16        repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17        replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18        shred_fetch_stage::{ShredFetchStage, SHRED_FETCH_CHANNEL_SIZE},
19        voting_service::VotingService,
20        warm_quic_cache_service::WarmQuicCacheService,
21        window_service::{WindowService, WindowServiceChannels},
22    },
23    bytes::Bytes,
24    crossbeam_channel::{unbounded, Receiver, Sender},
25    solana_client::connection_cache::ConnectionCache,
26    solana_clock::Slot,
27    solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28    solana_gossip::{
29        cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30        duplicate_shred_listener::DuplicateShredListener,
31    },
32    solana_keypair::Keypair,
33    solana_ledger::{
34        blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35        blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36        leader_schedule_cache::LeaderScheduleCache,
37    },
38    solana_poh::poh_recorder::PohRecorder,
39    solana_pubkey::Pubkey,
40    solana_rpc::{
41        max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42        rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43    },
44    solana_runtime::{
45        bank_forks::BankForks, commitment::BlockCommitmentCache,
46        prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47        vote_sender_types::ReplayVoteSender,
48    },
49    solana_streamer::evicting_sender::EvictingSender,
50    solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
51    std::{
52        collections::HashSet,
53        net::{SocketAddr, UdpSocket},
54        num::NonZeroUsize,
55        sync::{atomic::AtomicBool, Arc, RwLock},
56        thread::{self, JoinHandle},
57    },
58    tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61/// Sets the upper bound on the number of batches stored in the retransmit
62/// stage ingress channel.
63/// Allows for a max of 16k batches of up to 64 packets each
64/// (PACKETS_PER_BATCH).
65/// This translates to about 1 GB of RAM for packet storage in the worst case.
66/// In reality this means about 200K shreds since most batches are not full.
67const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
68
69pub struct Tvu {
70    fetch_stage: ShredFetchStage,
71    shred_sigverify: JoinHandle<()>,
72    retransmit_stage: RetransmitStage,
73    window_service: WindowService,
74    cluster_slots_service: ClusterSlotsService,
75    replay_stage: Option<ReplayStage>,
76    blockstore_cleanup_service: Option<BlockstoreCleanupService>,
77    cost_update_service: CostUpdateService,
78    voting_service: VotingService,
79    warm_quic_cache_service: Option<WarmQuicCacheService>,
80    drop_bank_service: DropBankService,
81    duplicate_shred_listener: DuplicateShredListener,
82}
83
84pub struct TvuSockets {
85    pub fetch: Vec<UdpSocket>,
86    pub repair: UdpSocket,
87    pub retransmit: Vec<UdpSocket>,
88    pub ancestor_hashes_requests: UdpSocket,
89    pub alpenglow: Option<UdpSocket>,
90}
91
92pub struct TvuConfig {
93    pub max_ledger_shreds: Option<u64>,
94    pub shred_version: u16,
95    // Validators from which repairs are requested
96    pub repair_validators: Option<HashSet<Pubkey>>,
97    // Validators which should be given priority when serving repairs
98    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
99    pub wait_for_vote_to_start_leader: bool,
100    pub replay_forks_threads: NonZeroUsize,
101    pub replay_transactions_threads: NonZeroUsize,
102    pub shred_sigverify_threads: NonZeroUsize,
103    pub xdp_sender: Option<XdpSender>,
104}
105
106impl Default for TvuConfig {
107    fn default() -> Self {
108        Self {
109            max_ledger_shreds: None,
110            shred_version: 0,
111            repair_validators: None,
112            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
113            wait_for_vote_to_start_leader: false,
114            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
116            shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
117            xdp_sender: None,
118        }
119    }
120}
121
122impl Tvu {
123    /// This service receives messages from a leader in the network and processes the transactions
124    /// on the bank state.
125    /// # Arguments
126    /// * `cluster_info` - The cluster_info state.
127    /// * `sockets` - fetch, repair, and retransmit sockets
128    /// * `blockstore` - the ledger itself
129    #[allow(clippy::too_many_arguments)]
130    pub fn new(
131        vote_account: &Pubkey,
132        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
133        bank_forks: &Arc<RwLock<BankForks>>,
134        cluster_info: &Arc<ClusterInfo>,
135        sockets: TvuSockets,
136        blockstore: Arc<Blockstore>,
137        ledger_signal_receiver: Receiver<bool>,
138        rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
139        poh_recorder: &Arc<RwLock<PohRecorder>>,
140        tower: Tower,
141        tower_storage: Arc<dyn TowerStorage>,
142        leader_schedule_cache: &Arc<LeaderScheduleCache>,
143        exit: Arc<AtomicBool>,
144        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
145        turbine_disabled: Arc<AtomicBool>,
146        transaction_status_sender: Option<TransactionStatusSender>,
147        entry_notification_sender: Option<EntryNotifierSender>,
148        vote_tracker: Arc<VoteTracker>,
149        retransmit_slots_sender: Sender<Slot>,
150        gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
151        verified_vote_receiver: VerifiedVoteReceiver,
152        replay_vote_sender: ReplayVoteSender,
153        completed_data_sets_sender: Option<CompletedDataSetsSender>,
154        bank_notification_sender: Option<BankNotificationSenderConfig>,
155        duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
156        tvu_config: TvuConfig,
157        max_slots: &Arc<MaxSlots>,
158        block_metadata_notifier: Option<BlockMetadataNotifierArc>,
159        wait_to_vote_slot: Option<Slot>,
160        snapshot_controller: Option<Arc<SnapshotController>>,
161        log_messages_bytes_limit: Option<usize>,
162        connection_cache: Option<&Arc<ConnectionCache>>,
163        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
164        banking_tracer: Arc<BankingTracer>,
165        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
166        turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
167        repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
168        repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
169        ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
170        ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
171        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
172        cluster_slots: Arc<ClusterSlots>,
173        wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
174        slot_status_notifier: Option<SlotStatusNotifier>,
175        vote_connection_cache: Arc<ConnectionCache>,
176    ) -> Result<Self, String> {
177        let in_wen_restart = wen_restart_repair_slots.is_some();
178
179        let TvuSockets {
180            repair: repair_socket,
181            fetch: fetch_sockets,
182            retransmit: retransmit_sockets,
183            ancestor_hashes_requests: ancestor_hashes_socket,
184            alpenglow: alpenglow_socket,
185        } = sockets;
186
187        let (fetch_sender, fetch_receiver) = EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE);
188
189        let repair_socket = Arc::new(repair_socket);
190        let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
191        let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
192        let fetch_stage = ShredFetchStage::new(
193            fetch_sockets,
194            turbine_quic_endpoint_receiver,
195            repair_response_quic_receiver,
196            repair_socket.clone(),
197            fetch_sender,
198            tvu_config.shred_version,
199            bank_forks.clone(),
200            cluster_info.clone(),
201            outstanding_repair_requests.clone(),
202            turbine_disabled,
203            exit.clone(),
204        );
205
206        let (verified_sender, verified_receiver) = unbounded();
207
208        let (retransmit_sender, retransmit_receiver) =
209            EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
210
211        let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
212            cluster_info.clone(),
213            bank_forks.clone(),
214            leader_schedule_cache.clone(),
215            fetch_receiver,
216            retransmit_sender.clone(),
217            verified_sender,
218            tvu_config.shred_sigverify_threads,
219        );
220
221        let retransmit_stage = RetransmitStage::new(
222            bank_forks.clone(),
223            leader_schedule_cache.clone(),
224            cluster_info.clone(),
225            Arc::new(retransmit_sockets),
226            turbine_quic_endpoint_sender,
227            retransmit_receiver,
228            max_slots.clone(),
229            rpc_subscriptions.clone(),
230            slot_status_notifier.clone(),
231            tvu_config.xdp_sender,
232        );
233
234        let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
235        let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
236        let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
237            unbounded();
238        let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
239        let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
240        let window_service = {
241            let epoch_schedule = bank_forks
242                .read()
243                .unwrap()
244                .working_bank()
245                .epoch_schedule()
246                .clone();
247            let repair_info = RepairInfo {
248                bank_forks: bank_forks.clone(),
249                epoch_schedule,
250                ancestor_duplicate_slots_sender,
251                repair_validators: tvu_config.repair_validators,
252                repair_whitelist: tvu_config.repair_whitelist,
253                cluster_info: cluster_info.clone(),
254                cluster_slots: cluster_slots.clone(),
255                wen_restart_repair_slots,
256            };
257            let repair_service_channels = RepairServiceChannels::new(
258                repair_request_quic_sender,
259                verified_vote_receiver,
260                dumped_slots_receiver,
261                popular_pruned_forks_sender,
262                ancestor_hashes_request_quic_sender,
263                ancestor_hashes_response_quic_receiver,
264                ancestor_hashes_replay_update_receiver,
265            );
266            let window_service_channels = WindowServiceChannels::new(
267                verified_receiver,
268                retransmit_sender,
269                completed_data_sets_sender,
270                duplicate_slots_sender.clone(),
271                repair_service_channels,
272            );
273            WindowService::new(
274                blockstore.clone(),
275                repair_socket,
276                ancestor_hashes_socket,
277                exit.clone(),
278                repair_info,
279                window_service_channels,
280                leader_schedule_cache.clone(),
281                outstanding_repair_requests,
282            )
283        };
284
285        let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
286        let cluster_slots_service = ClusterSlotsService::new(
287            blockstore.clone(),
288            cluster_slots.clone(),
289            bank_forks.clone(),
290            cluster_info.clone(),
291            cluster_slots_update_receiver,
292            exit.clone(),
293        );
294
295        let (cost_update_sender, cost_update_receiver) = unbounded();
296        let (drop_bank_sender, drop_bank_receiver) = unbounded();
297        let (voting_sender, voting_receiver) = unbounded();
298
299        let replay_senders = ReplaySenders {
300            rpc_subscriptions,
301            slot_status_notifier,
302            transaction_status_sender,
303            entry_notification_sender,
304            bank_notification_sender,
305            ancestor_hashes_replay_update_sender,
306            retransmit_slots_sender,
307            replay_vote_sender,
308            cluster_slots_update_sender,
309            cost_update_sender,
310            voting_sender,
311            drop_bank_sender,
312            block_metadata_notifier,
313            dumped_slots_sender,
314        };
315
316        let replay_receivers = ReplayReceivers {
317            ledger_signal_receiver,
318            duplicate_slots_receiver,
319            ancestor_duplicate_slots_receiver,
320            duplicate_confirmed_slots_receiver,
321            gossip_verified_vote_hash_receiver,
322            popular_pruned_forks_receiver,
323        };
324
325        let replay_stage_config = ReplayStageConfig {
326            vote_account: *vote_account,
327            authorized_voter_keypairs,
328            exit: exit.clone(),
329            leader_schedule_cache: leader_schedule_cache.clone(),
330            block_commitment_cache,
331            wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
332            tower_storage: tower_storage.clone(),
333            wait_to_vote_slot,
334            replay_forks_threads: tvu_config.replay_forks_threads,
335            replay_transactions_threads: tvu_config.replay_transactions_threads,
336            blockstore: blockstore.clone(),
337            bank_forks: bank_forks.clone(),
338            cluster_info: cluster_info.clone(),
339            poh_recorder: poh_recorder.clone(),
340            tower,
341            vote_tracker,
342            cluster_slots,
343            log_messages_bytes_limit,
344            prioritization_fee_cache: prioritization_fee_cache.clone(),
345            banking_tracer,
346            snapshot_controller,
347        };
348
349        let voting_service = VotingService::new(
350            voting_receiver,
351            cluster_info.clone(),
352            poh_recorder.clone(),
353            tower_storage,
354            vote_connection_cache.clone(),
355            alpenglow_socket,
356            bank_forks.clone(),
357        );
358
359        let warm_quic_cache_service = create_cache_warmer_if_needed(
360            connection_cache,
361            vote_connection_cache,
362            cluster_info,
363            poh_recorder,
364            &exit,
365        );
366
367        let cost_update_service = CostUpdateService::new(cost_update_receiver);
368
369        let drop_bank_service = DropBankService::new(drop_bank_receiver);
370
371        let replay_stage = if in_wen_restart {
372            None
373        } else {
374            Some(ReplayStage::new(
375                replay_stage_config,
376                replay_senders,
377                replay_receivers,
378            )?)
379        };
380
381        let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
382            BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
383        });
384
385        let duplicate_shred_listener = DuplicateShredListener::new(
386            exit,
387            cluster_info.clone(),
388            DuplicateShredHandler::new(
389                blockstore,
390                leader_schedule_cache.clone(),
391                bank_forks.clone(),
392                duplicate_slots_sender,
393                tvu_config.shred_version,
394            ),
395        );
396
397        Ok(Tvu {
398            fetch_stage,
399            shred_sigverify,
400            retransmit_stage,
401            window_service,
402            cluster_slots_service,
403            replay_stage,
404            blockstore_cleanup_service,
405            cost_update_service,
406            voting_service,
407            warm_quic_cache_service,
408            drop_bank_service,
409            duplicate_shred_listener,
410        })
411    }
412
413    pub fn join(self) -> thread::Result<()> {
414        self.retransmit_stage.join()?;
415        self.window_service.join()?;
416        self.cluster_slots_service.join()?;
417        self.fetch_stage.join()?;
418        self.shred_sigverify.join()?;
419        if self.blockstore_cleanup_service.is_some() {
420            self.blockstore_cleanup_service.unwrap().join()?;
421        }
422        if self.replay_stage.is_some() {
423            self.replay_stage.unwrap().join()?;
424        }
425        self.cost_update_service.join()?;
426        self.voting_service.join()?;
427        if let Some(warmup_service) = self.warm_quic_cache_service {
428            warmup_service.join()?;
429        }
430        self.drop_bank_service.join()?;
431        self.duplicate_shred_listener.join()?;
432        Ok(())
433    }
434}
435
436fn create_cache_warmer_if_needed(
437    connection_cache: Option<&Arc<ConnectionCache>>,
438    vote_connection_cache: Arc<ConnectionCache>,
439    cluster_info: &Arc<ClusterInfo>,
440    poh_recorder: &Arc<RwLock<PohRecorder>>,
441    exit: &Arc<AtomicBool>,
442) -> Option<WarmQuicCacheService> {
443    let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
444    let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
445
446    (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
447        WarmQuicCacheService::new(
448            tpu_connection_cache,
449            vote_connection_cache,
450            cluster_info.clone(),
451            poh_recorder.clone(),
452            exit.clone(),
453        )
454    })
455}
456
457#[cfg(test)]
458pub mod tests {
459    use {
460        super::*,
461        crate::{
462            consensus::tower_storage::FileTowerStorage,
463            repair::quic_endpoint::RepairQuicAsyncSenders,
464        },
465        serial_test::serial,
466        solana_gossip::{cluster_info::ClusterInfo, node::Node},
467        solana_keypair::Keypair,
468        solana_ledger::{
469            blockstore::BlockstoreSignals,
470            blockstore_options::BlockstoreOptions,
471            create_new_tmp_ledger,
472            genesis_utils::{create_genesis_config, GenesisConfigInfo},
473        },
474        solana_poh::poh_recorder::create_test_recorder,
475        solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
476        solana_runtime::bank::Bank,
477        solana_signer::Signer,
478        solana_streamer::socket::SocketAddrSpace,
479        solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
480        std::sync::atomic::{AtomicU64, Ordering},
481    };
482
483    fn test_tvu_exit(enable_wen_restart: bool) {
484        solana_logger::setup();
485        let leader = Node::new_localhost();
486        let target1_keypair = Keypair::new();
487        let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
488
489        let starting_balance = 10_000;
490        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
491
492        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
493
494        let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
495            tokio::sync::mpsc::channel(/*capacity:*/ 128);
496        let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
497        let (_, repair_response_quic_receiver) = unbounded();
498        let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
499        let (_, ancestor_hashes_response_quic_receiver) = unbounded();
500        //start cluster_info1
501        let cluster_info1 = ClusterInfo::new(
502            target1.info.clone(),
503            target1_keypair.into(),
504            SocketAddrSpace::Unspecified,
505        );
506        cluster_info1.insert_info(leader.info);
507        let cref1 = Arc::new(cluster_info1);
508
509        let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
510        let BlockstoreSignals {
511            blockstore,
512            ledger_signal_receiver,
513            ..
514        } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
515            .expect("Expected to successfully open ledger");
516        let blockstore = Arc::new(blockstore);
517        let bank = bank_forks.read().unwrap().working_bank();
518        let (exit, poh_recorder, _transaction_recorder, poh_service, _entry_receiver) =
519            create_test_recorder(bank.clone(), blockstore.clone(), None, None);
520        let vote_keypair = Keypair::new();
521        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
522        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
523        let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
524        let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
525        let (_verified_vote_sender, verified_vote_receiver) = unbounded();
526        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
527        let (_, gossip_confirmed_slots_receiver) = unbounded();
528        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
529        let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
530        let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
531        let cluster_slots = Arc::new(ClusterSlots::default());
532        let wen_restart_repair_slots = if enable_wen_restart {
533            Some(Arc::new(RwLock::new(vec![])))
534        } else {
535            None
536        };
537        let connection_cache = if DEFAULT_VOTE_USE_QUIC {
538            ConnectionCache::new_quic(
539                "connection_cache_vote_quic",
540                DEFAULT_TPU_CONNECTION_POOL_SIZE,
541            )
542        } else {
543            ConnectionCache::with_udp(
544                "connection_cache_vote_udp",
545                DEFAULT_TPU_CONNECTION_POOL_SIZE,
546            )
547        };
548
549        let tvu = Tvu::new(
550            &vote_keypair.pubkey(),
551            Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
552            &bank_forks,
553            &cref1,
554            {
555                TvuSockets {
556                    repair: target1.sockets.repair,
557                    retransmit: target1.sockets.retransmit_sockets,
558                    fetch: target1.sockets.tvu,
559                    ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
560                    alpenglow: target1.sockets.alpenglow,
561                }
562            },
563            blockstore,
564            ledger_signal_receiver,
565            Some(Arc::new(RpcSubscriptions::new_for_tests(
566                exit.clone(),
567                max_complete_transaction_status_slot,
568                bank_forks.clone(),
569                block_commitment_cache.clone(),
570                OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
571            ))),
572            &poh_recorder,
573            Tower::default(),
574            Arc::new(FileTowerStorage::default()),
575            &leader_schedule_cache,
576            exit.clone(),
577            block_commitment_cache,
578            Arc::<AtomicBool>::default(),
579            None,
580            None,
581            Arc::<VoteTracker>::default(),
582            retransmit_slots_sender,
583            gossip_verified_vote_hash_receiver,
584            verified_vote_receiver,
585            replay_vote_sender,
586            /*completed_data_sets_sender:*/ None,
587            None,
588            gossip_confirmed_slots_receiver,
589            TvuConfig::default(),
590            &Arc::new(MaxSlots::default()),
591            None,
592            None,
593            None, // snapshot_controller
594            None,
595            Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
596            &ignored_prioritization_fee_cache,
597            BankingTracer::new_disabled(),
598            turbine_quic_endpoint_sender,
599            turbine_quic_endpoint_receiver,
600            repair_response_quic_receiver,
601            repair_quic_async_senders.repair_request_quic_sender,
602            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
603            ancestor_hashes_response_quic_receiver,
604            outstanding_repair_requests,
605            cluster_slots,
606            wen_restart_repair_slots,
607            None,
608            Arc::new(connection_cache),
609        )
610        .expect("assume success");
611        if enable_wen_restart {
612            assert!(tvu.replay_stage.is_none())
613        } else {
614            assert!(tvu.replay_stage.is_some())
615        }
616        exit.store(true, Ordering::Relaxed);
617        tvu.join().unwrap();
618        poh_service.join().unwrap();
619    }
620
621    #[test]
622    #[serial]
623    fn test_tvu_exit_no_wen_restart() {
624        test_tvu_exit(false);
625    }
626
627    #[test]
628    #[serial]
629    fn test_tvu_exit_with_wen_restart() {
630        test_tvu_exit(true);
631    }
632}