solana_core/
tvu.rs

1//! The `tvu` module implements the Transaction Validation Unit, a multi-stage transaction
2//! validation pipeline in software.
3
4use {
5    crate::{
6        banking_trace::BankingTracer,
7        cluster_info_vote_listener::{
8            DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9            VoteTracker,
10        },
11        cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12        completed_data_sets_service::CompletedDataSetsSender,
13        consensus::{tower_storage::TowerStorage, Tower},
14        cost_update_service::CostUpdateService,
15        drop_bank_service::DropBankService,
16        repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17        replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18        shred_fetch_stage::ShredFetchStage,
19        voting_service::VotingService,
20        warm_quic_cache_service::WarmQuicCacheService,
21        window_service::{WindowService, WindowServiceChannels},
22    },
23    bytes::Bytes,
24    crossbeam_channel::{unbounded, Receiver, Sender},
25    solana_client::connection_cache::ConnectionCache,
26    solana_clock::Slot,
27    solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28    solana_gossip::{
29        cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30        duplicate_shred_listener::DuplicateShredListener,
31    },
32    solana_keypair::Keypair,
33    solana_ledger::{
34        blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35        blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36        leader_schedule_cache::LeaderScheduleCache,
37    },
38    solana_poh::poh_recorder::PohRecorder,
39    solana_pubkey::Pubkey,
40    solana_rpc::{
41        max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42        rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43    },
44    solana_runtime::{
45        bank_forks::BankForks, commitment::BlockCommitmentCache,
46        prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47        vote_sender_types::ReplayVoteSender,
48    },
49    solana_streamer::evicting_sender::EvictingSender,
50    solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpConfig},
51    std::{
52        collections::HashSet,
53        net::{SocketAddr, UdpSocket},
54        num::NonZeroUsize,
55        sync::{atomic::AtomicBool, Arc, RwLock},
56        thread::{self, JoinHandle},
57    },
58    tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61/// Sets the upper bound on the number of batches stored in the retransmit
62/// stage ingress channel.
63/// Allows for a max of 16k batches of up to 64 packets each (NUM_RCVMMSGS).
64/// This translates to about 1 GB of RAM for packet storage in the worst case.
65/// In reality this means about 200K shreds since most batches are not full.
66const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
67
68pub struct Tvu {
69    fetch_stage: ShredFetchStage,
70    shred_sigverify: JoinHandle<()>,
71    retransmit_stage: RetransmitStage,
72    window_service: WindowService,
73    cluster_slots_service: ClusterSlotsService,
74    replay_stage: Option<ReplayStage>,
75    blockstore_cleanup_service: Option<BlockstoreCleanupService>,
76    cost_update_service: CostUpdateService,
77    voting_service: VotingService,
78    warm_quic_cache_service: Option<WarmQuicCacheService>,
79    drop_bank_service: DropBankService,
80    duplicate_shred_listener: DuplicateShredListener,
81}
82
83pub struct TvuSockets {
84    pub fetch: Vec<UdpSocket>,
85    pub repair: UdpSocket,
86    pub retransmit: Vec<UdpSocket>,
87    pub ancestor_hashes_requests: UdpSocket,
88}
89
90pub struct TvuConfig {
91    pub max_ledger_shreds: Option<u64>,
92    pub shred_version: u16,
93    // Validators from which repairs are requested
94    pub repair_validators: Option<HashSet<Pubkey>>,
95    // Validators which should be given priority when serving repairs
96    pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
97    pub wait_for_vote_to_start_leader: bool,
98    pub replay_forks_threads: NonZeroUsize,
99    pub replay_transactions_threads: NonZeroUsize,
100    pub shred_sigverify_threads: NonZeroUsize,
101    pub retransmit_xdp: Option<XdpConfig>,
102}
103
104impl Default for TvuConfig {
105    fn default() -> Self {
106        Self {
107            max_ledger_shreds: None,
108            shred_version: 0,
109            repair_validators: None,
110            repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
111            wait_for_vote_to_start_leader: false,
112            replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
113            replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
114            shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115            retransmit_xdp: None,
116        }
117    }
118}
119
120impl Tvu {
121    /// This service receives messages from a leader in the network and processes the transactions
122    /// on the bank state.
123    /// # Arguments
124    /// * `cluster_info` - The cluster_info state.
125    /// * `sockets` - fetch, repair, and retransmit sockets
126    /// * `blockstore` - the ledger itself
127    #[allow(clippy::too_many_arguments)]
128    pub fn new(
129        vote_account: &Pubkey,
130        authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
131        bank_forks: &Arc<RwLock<BankForks>>,
132        cluster_info: &Arc<ClusterInfo>,
133        sockets: TvuSockets,
134        blockstore: Arc<Blockstore>,
135        ledger_signal_receiver: Receiver<bool>,
136        rpc_subscriptions: &Arc<RpcSubscriptions>,
137        poh_recorder: &Arc<RwLock<PohRecorder>>,
138        tower: Tower,
139        tower_storage: Arc<dyn TowerStorage>,
140        leader_schedule_cache: &Arc<LeaderScheduleCache>,
141        exit: Arc<AtomicBool>,
142        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
143        turbine_disabled: Arc<AtomicBool>,
144        transaction_status_sender: Option<TransactionStatusSender>,
145        entry_notification_sender: Option<EntryNotifierSender>,
146        vote_tracker: Arc<VoteTracker>,
147        retransmit_slots_sender: Sender<Slot>,
148        gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
149        verified_vote_receiver: VerifiedVoteReceiver,
150        replay_vote_sender: ReplayVoteSender,
151        completed_data_sets_sender: Option<CompletedDataSetsSender>,
152        bank_notification_sender: Option<BankNotificationSenderConfig>,
153        duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
154        tvu_config: TvuConfig,
155        max_slots: &Arc<MaxSlots>,
156        block_metadata_notifier: Option<BlockMetadataNotifierArc>,
157        wait_to_vote_slot: Option<Slot>,
158        snapshot_controller: Option<Arc<SnapshotController>>,
159        log_messages_bytes_limit: Option<usize>,
160        connection_cache: Option<&Arc<ConnectionCache>>,
161        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
162        banking_tracer: Arc<BankingTracer>,
163        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
164        turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
165        repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
166        repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
167        ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
168        ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
169        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
170        cluster_slots: Arc<ClusterSlots>,
171        wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
172        slot_status_notifier: Option<SlotStatusNotifier>,
173        vote_connection_cache: Arc<ConnectionCache>,
174    ) -> Result<Self, String> {
175        let in_wen_restart = wen_restart_repair_slots.is_some();
176
177        let TvuSockets {
178            repair: repair_socket,
179            fetch: fetch_sockets,
180            retransmit: retransmit_sockets,
181            ancestor_hashes_requests: ancestor_hashes_socket,
182        } = sockets;
183
184        let (fetch_sender, fetch_receiver) = unbounded();
185
186        let repair_socket = Arc::new(repair_socket);
187        let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
188        let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
189        let fetch_stage = ShredFetchStage::new(
190            fetch_sockets,
191            turbine_quic_endpoint_receiver,
192            repair_response_quic_receiver,
193            repair_socket.clone(),
194            fetch_sender,
195            tvu_config.shred_version,
196            bank_forks.clone(),
197            cluster_info.clone(),
198            outstanding_repair_requests.clone(),
199            turbine_disabled,
200            exit.clone(),
201        );
202
203        let (verified_sender, verified_receiver) = unbounded();
204
205        let (retransmit_sender, retransmit_receiver) =
206            EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
207
208        let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
209            cluster_info.clone(),
210            bank_forks.clone(),
211            leader_schedule_cache.clone(),
212            fetch_receiver,
213            retransmit_sender.clone(),
214            verified_sender,
215            tvu_config.shred_sigverify_threads,
216        );
217
218        let retransmit_stage = RetransmitStage::new(
219            bank_forks.clone(),
220            leader_schedule_cache.clone(),
221            cluster_info.clone(),
222            Arc::new(retransmit_sockets),
223            turbine_quic_endpoint_sender,
224            retransmit_receiver,
225            max_slots.clone(),
226            Some(rpc_subscriptions.clone()),
227            slot_status_notifier.clone(),
228            tvu_config.retransmit_xdp.clone(),
229        );
230
231        let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
232        let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
233        let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
234            unbounded();
235        let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
236        let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
237        let window_service = {
238            let epoch_schedule = bank_forks
239                .read()
240                .unwrap()
241                .working_bank()
242                .epoch_schedule()
243                .clone();
244            let repair_info = RepairInfo {
245                bank_forks: bank_forks.clone(),
246                epoch_schedule,
247                ancestor_duplicate_slots_sender,
248                repair_validators: tvu_config.repair_validators,
249                repair_whitelist: tvu_config.repair_whitelist,
250                cluster_info: cluster_info.clone(),
251                cluster_slots: cluster_slots.clone(),
252                wen_restart_repair_slots,
253            };
254            let repair_service_channels = RepairServiceChannels::new(
255                repair_request_quic_sender,
256                verified_vote_receiver,
257                dumped_slots_receiver,
258                popular_pruned_forks_sender,
259                ancestor_hashes_request_quic_sender,
260                ancestor_hashes_response_quic_receiver,
261                ancestor_hashes_replay_update_receiver,
262            );
263            let window_service_channels = WindowServiceChannels::new(
264                verified_receiver,
265                retransmit_sender,
266                completed_data_sets_sender,
267                duplicate_slots_sender.clone(),
268                repair_service_channels,
269            );
270            WindowService::new(
271                blockstore.clone(),
272                repair_socket,
273                ancestor_hashes_socket,
274                exit.clone(),
275                repair_info,
276                window_service_channels,
277                leader_schedule_cache.clone(),
278                outstanding_repair_requests,
279            )
280        };
281
282        let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
283        let cluster_slots_service = ClusterSlotsService::new(
284            blockstore.clone(),
285            cluster_slots.clone(),
286            bank_forks.clone(),
287            cluster_info.clone(),
288            cluster_slots_update_receiver,
289            exit.clone(),
290        );
291
292        let (cost_update_sender, cost_update_receiver) = unbounded();
293        let (drop_bank_sender, drop_bank_receiver) = unbounded();
294        let (voting_sender, voting_receiver) = unbounded();
295
296        let replay_senders = ReplaySenders {
297            rpc_subscriptions: rpc_subscriptions.clone(),
298            slot_status_notifier,
299            transaction_status_sender,
300            entry_notification_sender,
301            bank_notification_sender,
302            ancestor_hashes_replay_update_sender,
303            retransmit_slots_sender,
304            replay_vote_sender,
305            cluster_slots_update_sender,
306            cost_update_sender,
307            voting_sender,
308            drop_bank_sender,
309            block_metadata_notifier,
310            dumped_slots_sender,
311        };
312
313        let replay_receivers = ReplayReceivers {
314            ledger_signal_receiver,
315            duplicate_slots_receiver,
316            ancestor_duplicate_slots_receiver,
317            duplicate_confirmed_slots_receiver,
318            gossip_verified_vote_hash_receiver,
319            popular_pruned_forks_receiver,
320        };
321
322        let replay_stage_config = ReplayStageConfig {
323            vote_account: *vote_account,
324            authorized_voter_keypairs,
325            exit: exit.clone(),
326            leader_schedule_cache: leader_schedule_cache.clone(),
327            block_commitment_cache,
328            wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
329            tower_storage: tower_storage.clone(),
330            wait_to_vote_slot,
331            replay_forks_threads: tvu_config.replay_forks_threads,
332            replay_transactions_threads: tvu_config.replay_transactions_threads,
333            blockstore: blockstore.clone(),
334            bank_forks: bank_forks.clone(),
335            cluster_info: cluster_info.clone(),
336            poh_recorder: poh_recorder.clone(),
337            tower,
338            vote_tracker,
339            cluster_slots,
340            log_messages_bytes_limit,
341            prioritization_fee_cache: prioritization_fee_cache.clone(),
342            banking_tracer,
343            snapshot_controller,
344        };
345
346        let voting_service = VotingService::new(
347            voting_receiver,
348            cluster_info.clone(),
349            poh_recorder.clone(),
350            tower_storage,
351            vote_connection_cache.clone(),
352        );
353
354        let warm_quic_cache_service = create_cache_warmer_if_needed(
355            connection_cache,
356            vote_connection_cache,
357            cluster_info,
358            poh_recorder,
359            &exit,
360        );
361
362        let cost_update_service = CostUpdateService::new(blockstore.clone(), cost_update_receiver);
363
364        let drop_bank_service = DropBankService::new(drop_bank_receiver);
365
366        let replay_stage = if in_wen_restart {
367            None
368        } else {
369            Some(ReplayStage::new(
370                replay_stage_config,
371                replay_senders,
372                replay_receivers,
373            )?)
374        };
375
376        let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
377            BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
378        });
379
380        let duplicate_shred_listener = DuplicateShredListener::new(
381            exit,
382            cluster_info.clone(),
383            DuplicateShredHandler::new(
384                blockstore,
385                leader_schedule_cache.clone(),
386                bank_forks.clone(),
387                duplicate_slots_sender,
388                tvu_config.shred_version,
389            ),
390        );
391
392        Ok(Tvu {
393            fetch_stage,
394            shred_sigverify,
395            retransmit_stage,
396            window_service,
397            cluster_slots_service,
398            replay_stage,
399            blockstore_cleanup_service,
400            cost_update_service,
401            voting_service,
402            warm_quic_cache_service,
403            drop_bank_service,
404            duplicate_shred_listener,
405        })
406    }
407
408    pub fn join(self) -> thread::Result<()> {
409        self.retransmit_stage.join()?;
410        self.window_service.join()?;
411        self.cluster_slots_service.join()?;
412        self.fetch_stage.join()?;
413        self.shred_sigverify.join()?;
414        if self.blockstore_cleanup_service.is_some() {
415            self.blockstore_cleanup_service.unwrap().join()?;
416        }
417        if self.replay_stage.is_some() {
418            self.replay_stage.unwrap().join()?;
419        }
420        self.cost_update_service.join()?;
421        self.voting_service.join()?;
422        if let Some(warmup_service) = self.warm_quic_cache_service {
423            warmup_service.join()?;
424        }
425        self.drop_bank_service.join()?;
426        self.duplicate_shred_listener.join()?;
427        Ok(())
428    }
429}
430
431fn create_cache_warmer_if_needed(
432    connection_cache: Option<&Arc<ConnectionCache>>,
433    vote_connection_cache: Arc<ConnectionCache>,
434    cluster_info: &Arc<ClusterInfo>,
435    poh_recorder: &Arc<RwLock<PohRecorder>>,
436    exit: &Arc<AtomicBool>,
437) -> Option<WarmQuicCacheService> {
438    let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
439    let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
440
441    (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
442        WarmQuicCacheService::new(
443            tpu_connection_cache,
444            vote_connection_cache,
445            cluster_info.clone(),
446            poh_recorder.clone(),
447            exit.clone(),
448        )
449    })
450}
451
452#[cfg(test)]
453pub mod tests {
454    use {
455        super::*,
456        crate::{
457            consensus::tower_storage::FileTowerStorage,
458            repair::quic_endpoint::RepairQuicAsyncSenders,
459        },
460        serial_test::serial,
461        solana_gossip::cluster_info::{ClusterInfo, Node},
462        solana_keypair::Keypair,
463        solana_ledger::{
464            blockstore::BlockstoreSignals,
465            blockstore_options::BlockstoreOptions,
466            create_new_tmp_ledger,
467            genesis_utils::{create_genesis_config, GenesisConfigInfo},
468        },
469        solana_poh::poh_recorder::create_test_recorder,
470        solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
471        solana_runtime::bank::Bank,
472        solana_signer::Signer,
473        solana_streamer::socket::SocketAddrSpace,
474        solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
475        std::sync::atomic::{AtomicU64, Ordering},
476    };
477
478    fn test_tvu_exit(enable_wen_restart: bool) {
479        solana_logger::setup();
480        let leader = Node::new_localhost();
481        let target1_keypair = Keypair::new();
482        let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
483
484        let starting_balance = 10_000;
485        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
486
487        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
488
489        let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
490            tokio::sync::mpsc::channel(/*capacity:*/ 128);
491        let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
492        let (_, repair_response_quic_receiver) = unbounded();
493        let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
494        let (_, ancestor_hashes_response_quic_receiver) = unbounded();
495        //start cluster_info1
496        let cluster_info1 = ClusterInfo::new(
497            target1.info.clone(),
498            target1_keypair.into(),
499            SocketAddrSpace::Unspecified,
500        );
501        cluster_info1.insert_info(leader.info);
502        let cref1 = Arc::new(cluster_info1);
503
504        let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
505        let BlockstoreSignals {
506            blockstore,
507            ledger_signal_receiver,
508            ..
509        } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
510            .expect("Expected to successfully open ledger");
511        let blockstore = Arc::new(blockstore);
512        let bank = bank_forks.read().unwrap().working_bank();
513        let (exit, poh_recorder, _transaction_recorder, poh_service, _entry_receiver) =
514            create_test_recorder(bank.clone(), blockstore.clone(), None, None);
515        let vote_keypair = Keypair::new();
516        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
517        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
518        let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
519        let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
520        let (_verified_vote_sender, verified_vote_receiver) = unbounded();
521        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
522        let (_, gossip_confirmed_slots_receiver) = unbounded();
523        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
524        let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
525        let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
526        let cluster_slots = Arc::new(ClusterSlots::default());
527        let wen_restart_repair_slots = if enable_wen_restart {
528            Some(Arc::new(RwLock::new(vec![])))
529        } else {
530            None
531        };
532        let connection_cache = if DEFAULT_VOTE_USE_QUIC {
533            ConnectionCache::new_quic(
534                "connection_cache_vote_quic",
535                DEFAULT_TPU_CONNECTION_POOL_SIZE,
536            )
537        } else {
538            ConnectionCache::with_udp(
539                "connection_cache_vote_udp",
540                DEFAULT_TPU_CONNECTION_POOL_SIZE,
541            )
542        };
543
544        let tvu = Tvu::new(
545            &vote_keypair.pubkey(),
546            Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
547            &bank_forks,
548            &cref1,
549            {
550                TvuSockets {
551                    repair: target1.sockets.repair,
552                    retransmit: target1.sockets.retransmit_sockets,
553                    fetch: target1.sockets.tvu,
554                    ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
555                }
556            },
557            blockstore,
558            ledger_signal_receiver,
559            &Arc::new(RpcSubscriptions::new_for_tests(
560                exit.clone(),
561                max_complete_transaction_status_slot,
562                bank_forks.clone(),
563                block_commitment_cache.clone(),
564                OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
565            )),
566            &poh_recorder,
567            Tower::default(),
568            Arc::new(FileTowerStorage::default()),
569            &leader_schedule_cache,
570            exit.clone(),
571            block_commitment_cache,
572            Arc::<AtomicBool>::default(),
573            None,
574            None,
575            Arc::<VoteTracker>::default(),
576            retransmit_slots_sender,
577            gossip_verified_vote_hash_receiver,
578            verified_vote_receiver,
579            replay_vote_sender,
580            /*completed_data_sets_sender:*/ None,
581            None,
582            gossip_confirmed_slots_receiver,
583            TvuConfig::default(),
584            &Arc::new(MaxSlots::default()),
585            None,
586            None,
587            None, // snapshot_controller
588            None,
589            Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
590            &ignored_prioritization_fee_cache,
591            BankingTracer::new_disabled(),
592            turbine_quic_endpoint_sender,
593            turbine_quic_endpoint_receiver,
594            repair_response_quic_receiver,
595            repair_quic_async_senders.repair_request_quic_sender,
596            repair_quic_async_senders.ancestor_hashes_request_quic_sender,
597            ancestor_hashes_response_quic_receiver,
598            outstanding_repair_requests,
599            cluster_slots,
600            wen_restart_repair_slots,
601            None,
602            Arc::new(connection_cache),
603        )
604        .expect("assume success");
605        if enable_wen_restart {
606            assert!(tvu.replay_stage.is_none())
607        } else {
608            assert!(tvu.replay_stage.is_some())
609        }
610        exit.store(true, Ordering::Relaxed);
611        tvu.join().unwrap();
612        poh_service.join().unwrap();
613    }
614
615    #[test]
616    #[serial]
617    fn test_tvu_exit_no_wen_restart() {
618        test_tvu_exit(false);
619    }
620
621    #[test]
622    #[serial]
623    fn test_tvu_exit_with_wen_restart() {
624        test_tvu_exit(true);
625    }
626}