1use {
5 crate::{
6 banking_trace::BankingTracer,
7 cluster_info_vote_listener::{
8 DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9 VoteTracker,
10 },
11 cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12 completed_data_sets_service::CompletedDataSetsSender,
13 consensus::{tower_storage::TowerStorage, Tower},
14 cost_update_service::CostUpdateService,
15 drop_bank_service::DropBankService,
16 repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17 replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18 shred_fetch_stage::{ShredFetchStage, SHRED_FETCH_CHANNEL_SIZE},
19 voting_service::VotingService,
20 warm_quic_cache_service::WarmQuicCacheService,
21 window_service::{WindowService, WindowServiceChannels},
22 },
23 bytes::Bytes,
24 crossbeam_channel::{unbounded, Receiver, Sender},
25 solana_client::connection_cache::ConnectionCache,
26 solana_clock::Slot,
27 solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28 solana_gossip::{
29 cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30 duplicate_shred_listener::DuplicateShredListener,
31 },
32 solana_keypair::Keypair,
33 solana_ledger::{
34 blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35 blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36 leader_schedule_cache::LeaderScheduleCache,
37 },
38 solana_poh::{poh_controller::PohController, poh_recorder::PohRecorder},
39 solana_pubkey::Pubkey,
40 solana_rpc::{
41 max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42 rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43 },
44 solana_runtime::{
45 bank_forks::BankForks, commitment::BlockCommitmentCache,
46 prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47 vote_sender_types::ReplayVoteSender,
48 },
49 solana_streamer::evicting_sender::EvictingSender,
50 solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
51 std::{
52 collections::HashSet,
53 net::{SocketAddr, UdpSocket},
54 num::NonZeroUsize,
55 sync::{atomic::AtomicBool, Arc, RwLock},
56 thread::{self, JoinHandle},
57 },
58 tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
68
69pub struct Tvu {
70 fetch_stage: ShredFetchStage,
71 shred_sigverify: JoinHandle<()>,
72 retransmit_stage: RetransmitStage,
73 window_service: WindowService,
74 cluster_slots_service: ClusterSlotsService,
75 replay_stage: Option<ReplayStage>,
76 blockstore_cleanup_service: Option<BlockstoreCleanupService>,
77 cost_update_service: CostUpdateService,
78 voting_service: VotingService,
79 warm_quic_cache_service: Option<WarmQuicCacheService>,
80 drop_bank_service: DropBankService,
81 duplicate_shred_listener: DuplicateShredListener,
82}
83
84pub struct TvuSockets {
85 pub fetch: Vec<UdpSocket>,
86 pub repair: UdpSocket,
87 pub retransmit: Vec<UdpSocket>,
88 pub ancestor_hashes_requests: UdpSocket,
89 pub alpenglow: Option<UdpSocket>,
90}
91
92pub struct TvuConfig {
93 pub max_ledger_shreds: Option<u64>,
94 pub shred_version: u16,
95 pub repair_validators: Option<HashSet<Pubkey>>,
97 pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
99 pub wait_for_vote_to_start_leader: bool,
100 pub replay_forks_threads: NonZeroUsize,
101 pub replay_transactions_threads: NonZeroUsize,
102 pub shred_sigverify_threads: NonZeroUsize,
103 pub xdp_sender: Option<XdpSender>,
104}
105
106impl Default for TvuConfig {
107 fn default() -> Self {
108 Self {
109 max_ledger_shreds: None,
110 shred_version: 0,
111 repair_validators: None,
112 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
113 wait_for_vote_to_start_leader: false,
114 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115 replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
116 shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
117 xdp_sender: None,
118 }
119 }
120}
121
122impl Tvu {
123 #[allow(clippy::too_many_arguments)]
130 pub fn new(
131 vote_account: &Pubkey,
132 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
133 bank_forks: &Arc<RwLock<BankForks>>,
134 cluster_info: &Arc<ClusterInfo>,
135 sockets: TvuSockets,
136 blockstore: Arc<Blockstore>,
137 ledger_signal_receiver: Receiver<bool>,
138 rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
139 poh_recorder: &Arc<RwLock<PohRecorder>>,
140 poh_controller: PohController,
141 tower: Tower,
142 tower_storage: Arc<dyn TowerStorage>,
143 leader_schedule_cache: &Arc<LeaderScheduleCache>,
144 exit: Arc<AtomicBool>,
145 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
146 turbine_disabled: Arc<AtomicBool>,
147 transaction_status_sender: Option<TransactionStatusSender>,
148 entry_notification_sender: Option<EntryNotifierSender>,
149 vote_tracker: Arc<VoteTracker>,
150 retransmit_slots_sender: Sender<Slot>,
151 gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
152 verified_vote_receiver: VerifiedVoteReceiver,
153 replay_vote_sender: ReplayVoteSender,
154 completed_data_sets_sender: Option<CompletedDataSetsSender>,
155 bank_notification_sender: Option<BankNotificationSenderConfig>,
156 duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
157 tvu_config: TvuConfig,
158 max_slots: &Arc<MaxSlots>,
159 block_metadata_notifier: Option<BlockMetadataNotifierArc>,
160 wait_to_vote_slot: Option<Slot>,
161 snapshot_controller: Option<Arc<SnapshotController>>,
162 log_messages_bytes_limit: Option<usize>,
163 connection_cache: Option<&Arc<ConnectionCache>>,
164 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
165 banking_tracer: Arc<BankingTracer>,
166 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
167 turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
168 repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
169 repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
170 ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
171 ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
172 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
173 cluster_slots: Arc<ClusterSlots>,
174 wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
175 slot_status_notifier: Option<SlotStatusNotifier>,
176 vote_connection_cache: Arc<ConnectionCache>,
177 ) -> Result<Self, String> {
178 let in_wen_restart = wen_restart_repair_slots.is_some();
179
180 let TvuSockets {
181 repair: repair_socket,
182 fetch: fetch_sockets,
183 retransmit: retransmit_sockets,
184 ancestor_hashes_requests: ancestor_hashes_socket,
185 alpenglow: alpenglow_socket,
186 } = sockets;
187
188 let (fetch_sender, fetch_receiver) = EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE);
189
190 let repair_socket = Arc::new(repair_socket);
191 let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
192 let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
193 let fetch_stage = ShredFetchStage::new(
194 fetch_sockets,
195 turbine_quic_endpoint_receiver,
196 repair_response_quic_receiver,
197 repair_socket.clone(),
198 fetch_sender,
199 tvu_config.shred_version,
200 bank_forks.clone(),
201 cluster_info.clone(),
202 outstanding_repair_requests.clone(),
203 turbine_disabled,
204 exit.clone(),
205 );
206
207 let (verified_sender, verified_receiver) = unbounded();
208
209 let (retransmit_sender, retransmit_receiver) =
210 EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
211
212 let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
213 cluster_info.clone(),
214 bank_forks.clone(),
215 leader_schedule_cache.clone(),
216 fetch_receiver,
217 retransmit_sender.clone(),
218 verified_sender,
219 tvu_config.shred_sigverify_threads,
220 );
221
222 let retransmit_stage = RetransmitStage::new(
223 bank_forks.clone(),
224 leader_schedule_cache.clone(),
225 cluster_info.clone(),
226 Arc::new(retransmit_sockets),
227 turbine_quic_endpoint_sender,
228 retransmit_receiver,
229 max_slots.clone(),
230 rpc_subscriptions.clone(),
231 slot_status_notifier.clone(),
232 tvu_config.xdp_sender,
233 None,
235 );
236
237 let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
238 let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
239 let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
240 unbounded();
241 let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
242 let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
243 let window_service = {
244 let epoch_schedule = bank_forks
245 .read()
246 .unwrap()
247 .working_bank()
248 .epoch_schedule()
249 .clone();
250 let repair_info = RepairInfo {
251 bank_forks: bank_forks.clone(),
252 epoch_schedule,
253 ancestor_duplicate_slots_sender,
254 repair_validators: tvu_config.repair_validators,
255 repair_whitelist: tvu_config.repair_whitelist,
256 cluster_info: cluster_info.clone(),
257 cluster_slots: cluster_slots.clone(),
258 wen_restart_repair_slots,
259 };
260 let repair_service_channels = RepairServiceChannels::new(
261 repair_request_quic_sender,
262 verified_vote_receiver,
263 dumped_slots_receiver,
264 popular_pruned_forks_sender,
265 ancestor_hashes_request_quic_sender,
266 ancestor_hashes_response_quic_receiver,
267 ancestor_hashes_replay_update_receiver,
268 );
269 let window_service_channels = WindowServiceChannels::new(
270 verified_receiver,
271 retransmit_sender,
272 completed_data_sets_sender,
273 duplicate_slots_sender.clone(),
274 repair_service_channels,
275 );
276 WindowService::new(
277 blockstore.clone(),
278 repair_socket,
279 ancestor_hashes_socket,
280 exit.clone(),
281 repair_info,
282 window_service_channels,
283 leader_schedule_cache.clone(),
284 outstanding_repair_requests,
285 )
286 };
287
288 let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
289 let cluster_slots_service = ClusterSlotsService::new(
290 blockstore.clone(),
291 cluster_slots.clone(),
292 bank_forks.clone(),
293 cluster_info.clone(),
294 cluster_slots_update_receiver,
295 exit.clone(),
296 );
297
298 let (cost_update_sender, cost_update_receiver) = unbounded();
299 let (drop_bank_sender, drop_bank_receiver) = unbounded();
300 let (voting_sender, voting_receiver) = unbounded();
301
302 let replay_senders = ReplaySenders {
303 rpc_subscriptions,
304 slot_status_notifier,
305 transaction_status_sender,
306 entry_notification_sender,
307 bank_notification_sender,
308 ancestor_hashes_replay_update_sender,
309 retransmit_slots_sender,
310 replay_vote_sender,
311 cluster_slots_update_sender,
312 cost_update_sender,
313 voting_sender,
314 drop_bank_sender,
315 block_metadata_notifier,
316 dumped_slots_sender,
317 };
318
319 let replay_receivers = ReplayReceivers {
320 ledger_signal_receiver,
321 duplicate_slots_receiver,
322 ancestor_duplicate_slots_receiver,
323 duplicate_confirmed_slots_receiver,
324 gossip_verified_vote_hash_receiver,
325 popular_pruned_forks_receiver,
326 };
327
328 let replay_stage_config = ReplayStageConfig {
329 vote_account: *vote_account,
330 authorized_voter_keypairs,
331 exit: exit.clone(),
332 leader_schedule_cache: leader_schedule_cache.clone(),
333 block_commitment_cache,
334 wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
335 tower_storage: tower_storage.clone(),
336 wait_to_vote_slot,
337 replay_forks_threads: tvu_config.replay_forks_threads,
338 replay_transactions_threads: tvu_config.replay_transactions_threads,
339 blockstore: blockstore.clone(),
340 bank_forks: bank_forks.clone(),
341 cluster_info: cluster_info.clone(),
342 poh_recorder: poh_recorder.clone(),
343 poh_controller,
344 tower,
345 vote_tracker,
346 cluster_slots,
347 log_messages_bytes_limit,
348 prioritization_fee_cache: prioritization_fee_cache.clone(),
349 banking_tracer,
350 snapshot_controller,
351 };
352
353 let voting_service = VotingService::new(
354 voting_receiver,
355 cluster_info.clone(),
356 poh_recorder.clone(),
357 tower_storage,
358 vote_connection_cache.clone(),
359 alpenglow_socket,
360 bank_forks.clone(),
361 );
362
363 let warm_quic_cache_service = create_cache_warmer_if_needed(
364 connection_cache,
365 vote_connection_cache,
366 cluster_info,
367 poh_recorder,
368 &exit,
369 );
370
371 let cost_update_service = CostUpdateService::new(cost_update_receiver);
372
373 let drop_bank_service = DropBankService::new(drop_bank_receiver);
374
375 let replay_stage = if in_wen_restart {
376 None
377 } else {
378 Some(ReplayStage::new(
379 replay_stage_config,
380 replay_senders,
381 replay_receivers,
382 )?)
383 };
384
385 let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
386 BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
387 });
388
389 let duplicate_shred_listener = DuplicateShredListener::new(
390 exit,
391 cluster_info.clone(),
392 DuplicateShredHandler::new(
393 blockstore,
394 leader_schedule_cache.clone(),
395 bank_forks.clone(),
396 duplicate_slots_sender,
397 tvu_config.shred_version,
398 ),
399 );
400
401 Ok(Tvu {
402 fetch_stage,
403 shred_sigverify,
404 retransmit_stage,
405 window_service,
406 cluster_slots_service,
407 replay_stage,
408 blockstore_cleanup_service,
409 cost_update_service,
410 voting_service,
411 warm_quic_cache_service,
412 drop_bank_service,
413 duplicate_shred_listener,
414 })
415 }
416
417 pub fn join(self) -> thread::Result<()> {
418 self.retransmit_stage.join()?;
419 self.window_service.join()?;
420 self.cluster_slots_service.join()?;
421 self.fetch_stage.join()?;
422 self.shred_sigverify.join()?;
423 if self.blockstore_cleanup_service.is_some() {
424 self.blockstore_cleanup_service.unwrap().join()?;
425 }
426 if self.replay_stage.is_some() {
427 self.replay_stage.unwrap().join()?;
428 }
429 self.cost_update_service.join()?;
430 self.voting_service.join()?;
431 if let Some(warmup_service) = self.warm_quic_cache_service {
432 warmup_service.join()?;
433 }
434 self.drop_bank_service.join()?;
435 self.duplicate_shred_listener.join()?;
436 Ok(())
437 }
438}
439
440fn create_cache_warmer_if_needed(
441 connection_cache: Option<&Arc<ConnectionCache>>,
442 vote_connection_cache: Arc<ConnectionCache>,
443 cluster_info: &Arc<ClusterInfo>,
444 poh_recorder: &Arc<RwLock<PohRecorder>>,
445 exit: &Arc<AtomicBool>,
446) -> Option<WarmQuicCacheService> {
447 let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
448 let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
449
450 (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
451 WarmQuicCacheService::new(
452 tpu_connection_cache,
453 vote_connection_cache,
454 cluster_info.clone(),
455 poh_recorder.clone(),
456 exit.clone(),
457 )
458 })
459}
460
461#[cfg(test)]
462pub mod tests {
463 use {
464 super::*,
465 crate::{
466 consensus::tower_storage::FileTowerStorage,
467 repair::quic_endpoint::RepairQuicAsyncSenders,
468 },
469 serial_test::serial,
470 solana_gossip::{cluster_info::ClusterInfo, node::Node},
471 solana_keypair::Keypair,
472 solana_ledger::{
473 blockstore::BlockstoreSignals,
474 blockstore_options::BlockstoreOptions,
475 create_new_tmp_ledger,
476 genesis_utils::{create_genesis_config, GenesisConfigInfo},
477 },
478 solana_poh::poh_recorder::create_test_recorder,
479 solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
480 solana_runtime::bank::Bank,
481 solana_signer::Signer,
482 solana_streamer::socket::SocketAddrSpace,
483 solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
484 std::sync::atomic::{AtomicU64, Ordering},
485 };
486
487 fn test_tvu_exit(enable_wen_restart: bool) {
488 agave_logger::setup();
489 let leader = Node::new_localhost();
490 let target1_keypair = Keypair::new();
491 let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
492
493 let starting_balance = 10_000;
494 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
495
496 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
497
498 let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
499 tokio::sync::mpsc::channel(128);
500 let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
501 let (_, repair_response_quic_receiver) = unbounded();
502 let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
503 let (_, ancestor_hashes_response_quic_receiver) = unbounded();
504 let cluster_info1 = ClusterInfo::new(
506 target1.info.clone(),
507 target1_keypair.into(),
508 SocketAddrSpace::Unspecified,
509 );
510 cluster_info1.insert_info(leader.info);
511 let cref1 = Arc::new(cluster_info1);
512
513 let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
514 let BlockstoreSignals {
515 blockstore,
516 ledger_signal_receiver,
517 ..
518 } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
519 .expect("Expected to successfully open ledger");
520 let blockstore = Arc::new(blockstore);
521 let bank = bank_forks.read().unwrap().working_bank();
522 let (
523 exit,
524 poh_recorder,
525 poh_controller,
526 _transaction_recorder,
527 poh_service,
528 _entry_receiver,
529 ) = create_test_recorder(bank.clone(), blockstore.clone(), None, None);
530 let vote_keypair = Keypair::new();
531 let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
532 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
533 let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
534 let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
535 let (_verified_vote_sender, verified_vote_receiver) = unbounded();
536 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
537 let (_, gossip_confirmed_slots_receiver) = unbounded();
538 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
539 let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
540 let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
541 let cluster_slots = Arc::new(ClusterSlots::default_for_tests());
542 let wen_restart_repair_slots = if enable_wen_restart {
543 Some(Arc::new(RwLock::new(vec![])))
544 } else {
545 None
546 };
547 let connection_cache = if DEFAULT_VOTE_USE_QUIC {
548 ConnectionCache::new_quic_for_tests(
549 "connection_cache_vote_quic",
550 DEFAULT_TPU_CONNECTION_POOL_SIZE,
551 )
552 } else {
553 ConnectionCache::with_udp(
554 "connection_cache_vote_udp",
555 DEFAULT_TPU_CONNECTION_POOL_SIZE,
556 )
557 };
558
559 let tvu = Tvu::new(
560 &vote_keypair.pubkey(),
561 Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
562 &bank_forks,
563 &cref1,
564 {
565 TvuSockets {
566 repair: target1.sockets.repair,
567 retransmit: target1.sockets.retransmit_sockets,
568 fetch: target1.sockets.tvu,
569 ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
570 alpenglow: target1.sockets.alpenglow,
571 }
572 },
573 blockstore,
574 ledger_signal_receiver,
575 Some(Arc::new(RpcSubscriptions::new_for_tests(
576 exit.clone(),
577 max_complete_transaction_status_slot,
578 bank_forks.clone(),
579 block_commitment_cache.clone(),
580 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
581 ))),
582 &poh_recorder,
583 poh_controller,
584 Tower::default(),
585 Arc::new(FileTowerStorage::default()),
586 &leader_schedule_cache,
587 exit.clone(),
588 block_commitment_cache,
589 Arc::<AtomicBool>::default(),
590 None,
591 None,
592 Arc::<VoteTracker>::default(),
593 retransmit_slots_sender,
594 gossip_verified_vote_hash_receiver,
595 verified_vote_receiver,
596 replay_vote_sender,
597 None,
598 None,
599 gossip_confirmed_slots_receiver,
600 TvuConfig::default(),
601 &Arc::new(MaxSlots::default()),
602 None,
603 None,
604 None, None,
606 Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
607 &ignored_prioritization_fee_cache,
608 BankingTracer::new_disabled(),
609 turbine_quic_endpoint_sender,
610 turbine_quic_endpoint_receiver,
611 repair_response_quic_receiver,
612 repair_quic_async_senders.repair_request_quic_sender,
613 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
614 ancestor_hashes_response_quic_receiver,
615 outstanding_repair_requests,
616 cluster_slots,
617 wen_restart_repair_slots,
618 None,
619 Arc::new(connection_cache),
620 )
621 .expect("assume success");
622 if enable_wen_restart {
623 assert!(tvu.replay_stage.is_none())
624 } else {
625 assert!(tvu.replay_stage.is_some())
626 }
627 exit.store(true, Ordering::Relaxed);
628 tvu.join().unwrap();
629 poh_service.join().unwrap();
630 }
631
632 #[test]
633 #[serial]
634 fn test_tvu_exit_no_wen_restart() {
635 test_tvu_exit(false);
636 }
637
638 #[test]
639 #[serial]
640 fn test_tvu_exit_with_wen_restart() {
641 test_tvu_exit(true);
642 }
643}