1use {
5 crate::{
6 banking_trace::BankingTracer,
7 cluster_info_vote_listener::{
8 DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9 VoteTracker,
10 },
11 cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12 completed_data_sets_service::CompletedDataSetsSender,
13 consensus::{tower_storage::TowerStorage, Tower},
14 cost_update_service::CostUpdateService,
15 drop_bank_service::DropBankService,
16 repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17 replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18 shred_fetch_stage::{ShredFetchStage, SHRED_FETCH_CHANNEL_SIZE},
19 voting_service::VotingService,
20 warm_quic_cache_service::WarmQuicCacheService,
21 window_service::{WindowService, WindowServiceChannels},
22 },
23 bytes::Bytes,
24 crossbeam_channel::{unbounded, Receiver, Sender},
25 solana_client::connection_cache::ConnectionCache,
26 solana_clock::Slot,
27 solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28 solana_gossip::{
29 cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30 duplicate_shred_listener::DuplicateShredListener,
31 },
32 solana_keypair::Keypair,
33 solana_ledger::{
34 blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35 blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36 leader_schedule_cache::LeaderScheduleCache,
37 },
38 solana_poh::poh_recorder::PohRecorder,
39 solana_pubkey::Pubkey,
40 solana_rpc::{
41 max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42 rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43 },
44 solana_runtime::{
45 bank_forks::BankForks, commitment::BlockCommitmentCache,
46 prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47 vote_sender_types::ReplayVoteSender,
48 },
49 solana_streamer::evicting_sender::EvictingSender,
50 solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
51 std::{
52 collections::HashSet,
53 net::{SocketAddr, UdpSocket},
54 num::NonZeroUsize,
55 sync::{atomic::AtomicBool, Arc, RwLock},
56 thread::{self, JoinHandle},
57 },
58 tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
68
69pub struct Tvu {
70 fetch_stage: ShredFetchStage,
71 shred_sigverify: JoinHandle<()>,
72 retransmit_stage: RetransmitStage,
73 window_service: WindowService,
74 cluster_slots_service: ClusterSlotsService,
75 replay_stage: Option<ReplayStage>,
76 blockstore_cleanup_service: Option<BlockstoreCleanupService>,
77 cost_update_service: CostUpdateService,
78 voting_service: VotingService,
79 warm_quic_cache_service: Option<WarmQuicCacheService>,
80 drop_bank_service: DropBankService,
81 duplicate_shred_listener: DuplicateShredListener,
82}
83
84pub struct TvuSockets {
85 pub fetch: Vec<UdpSocket>,
86 pub repair: UdpSocket,
87 pub retransmit: Vec<UdpSocket>,
88 pub ancestor_hashes_requests: UdpSocket,
89 pub alpenglow: Option<UdpSocket>,
90}
91
92pub struct TvuConfig {
93 pub max_ledger_shreds: Option<u64>,
94 pub shred_version: u16,
95 pub repair_validators: Option<HashSet<Pubkey>>,
97 pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
99 pub wait_for_vote_to_start_leader: bool,
100 pub replay_forks_threads: NonZeroUsize,
101 pub replay_transactions_threads: NonZeroUsize,
102 pub shred_sigverify_threads: NonZeroUsize,
103 pub xdp_sender: Option<XdpSender>,
104}
105
106impl Default for TvuConfig {
107 fn default() -> Self {
108 Self {
109 max_ledger_shreds: None,
110 shred_version: 0,
111 repair_validators: None,
112 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
113 wait_for_vote_to_start_leader: false,
114 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115 replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
116 shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
117 xdp_sender: None,
118 }
119 }
120}
121
122impl Tvu {
123 #[allow(clippy::too_many_arguments)]
130 pub fn new(
131 vote_account: &Pubkey,
132 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
133 bank_forks: &Arc<RwLock<BankForks>>,
134 cluster_info: &Arc<ClusterInfo>,
135 sockets: TvuSockets,
136 blockstore: Arc<Blockstore>,
137 ledger_signal_receiver: Receiver<bool>,
138 rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
139 poh_recorder: &Arc<RwLock<PohRecorder>>,
140 tower: Tower,
141 tower_storage: Arc<dyn TowerStorage>,
142 leader_schedule_cache: &Arc<LeaderScheduleCache>,
143 exit: Arc<AtomicBool>,
144 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
145 turbine_disabled: Arc<AtomicBool>,
146 transaction_status_sender: Option<TransactionStatusSender>,
147 entry_notification_sender: Option<EntryNotifierSender>,
148 vote_tracker: Arc<VoteTracker>,
149 retransmit_slots_sender: Sender<Slot>,
150 gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
151 verified_vote_receiver: VerifiedVoteReceiver,
152 replay_vote_sender: ReplayVoteSender,
153 completed_data_sets_sender: Option<CompletedDataSetsSender>,
154 bank_notification_sender: Option<BankNotificationSenderConfig>,
155 duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
156 tvu_config: TvuConfig,
157 max_slots: &Arc<MaxSlots>,
158 block_metadata_notifier: Option<BlockMetadataNotifierArc>,
159 wait_to_vote_slot: Option<Slot>,
160 snapshot_controller: Option<Arc<SnapshotController>>,
161 log_messages_bytes_limit: Option<usize>,
162 connection_cache: Option<&Arc<ConnectionCache>>,
163 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
164 banking_tracer: Arc<BankingTracer>,
165 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
166 turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
167 repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
168 repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
169 ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
170 ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
171 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
172 cluster_slots: Arc<ClusterSlots>,
173 wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
174 slot_status_notifier: Option<SlotStatusNotifier>,
175 vote_connection_cache: Arc<ConnectionCache>,
176 ) -> Result<Self, String> {
177 let in_wen_restart = wen_restart_repair_slots.is_some();
178
179 let TvuSockets {
180 repair: repair_socket,
181 fetch: fetch_sockets,
182 retransmit: retransmit_sockets,
183 ancestor_hashes_requests: ancestor_hashes_socket,
184 alpenglow: alpenglow_socket,
185 } = sockets;
186
187 let (fetch_sender, fetch_receiver) = EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE);
188
189 let repair_socket = Arc::new(repair_socket);
190 let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
191 let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
192 let fetch_stage = ShredFetchStage::new(
193 fetch_sockets,
194 turbine_quic_endpoint_receiver,
195 repair_response_quic_receiver,
196 repair_socket.clone(),
197 fetch_sender,
198 tvu_config.shred_version,
199 bank_forks.clone(),
200 cluster_info.clone(),
201 outstanding_repair_requests.clone(),
202 turbine_disabled,
203 exit.clone(),
204 );
205
206 let (verified_sender, verified_receiver) = unbounded();
207
208 let (retransmit_sender, retransmit_receiver) =
209 EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
210
211 let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
212 cluster_info.clone(),
213 bank_forks.clone(),
214 leader_schedule_cache.clone(),
215 fetch_receiver,
216 retransmit_sender.clone(),
217 verified_sender,
218 tvu_config.shred_sigverify_threads,
219 );
220
221 let retransmit_stage = RetransmitStage::new(
222 bank_forks.clone(),
223 leader_schedule_cache.clone(),
224 cluster_info.clone(),
225 Arc::new(retransmit_sockets),
226 turbine_quic_endpoint_sender,
227 retransmit_receiver,
228 max_slots.clone(),
229 rpc_subscriptions.clone(),
230 slot_status_notifier.clone(),
231 tvu_config.xdp_sender,
232 );
233
234 let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
235 let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
236 let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
237 unbounded();
238 let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
239 let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
240 let window_service = {
241 let epoch_schedule = bank_forks
242 .read()
243 .unwrap()
244 .working_bank()
245 .epoch_schedule()
246 .clone();
247 let repair_info = RepairInfo {
248 bank_forks: bank_forks.clone(),
249 epoch_schedule,
250 ancestor_duplicate_slots_sender,
251 repair_validators: tvu_config.repair_validators,
252 repair_whitelist: tvu_config.repair_whitelist,
253 cluster_info: cluster_info.clone(),
254 cluster_slots: cluster_slots.clone(),
255 wen_restart_repair_slots,
256 };
257 let repair_service_channels = RepairServiceChannels::new(
258 repair_request_quic_sender,
259 verified_vote_receiver,
260 dumped_slots_receiver,
261 popular_pruned_forks_sender,
262 ancestor_hashes_request_quic_sender,
263 ancestor_hashes_response_quic_receiver,
264 ancestor_hashes_replay_update_receiver,
265 );
266 let window_service_channels = WindowServiceChannels::new(
267 verified_receiver,
268 retransmit_sender,
269 completed_data_sets_sender,
270 duplicate_slots_sender.clone(),
271 repair_service_channels,
272 );
273 WindowService::new(
274 blockstore.clone(),
275 repair_socket,
276 ancestor_hashes_socket,
277 exit.clone(),
278 repair_info,
279 window_service_channels,
280 leader_schedule_cache.clone(),
281 outstanding_repair_requests,
282 )
283 };
284
285 let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
286 let cluster_slots_service = ClusterSlotsService::new(
287 blockstore.clone(),
288 cluster_slots.clone(),
289 bank_forks.clone(),
290 cluster_info.clone(),
291 cluster_slots_update_receiver,
292 exit.clone(),
293 );
294
295 let (cost_update_sender, cost_update_receiver) = unbounded();
296 let (drop_bank_sender, drop_bank_receiver) = unbounded();
297 let (voting_sender, voting_receiver) = unbounded();
298
299 let replay_senders = ReplaySenders {
300 rpc_subscriptions,
301 slot_status_notifier,
302 transaction_status_sender,
303 entry_notification_sender,
304 bank_notification_sender,
305 ancestor_hashes_replay_update_sender,
306 retransmit_slots_sender,
307 replay_vote_sender,
308 cluster_slots_update_sender,
309 cost_update_sender,
310 voting_sender,
311 drop_bank_sender,
312 block_metadata_notifier,
313 dumped_slots_sender,
314 };
315
316 let replay_receivers = ReplayReceivers {
317 ledger_signal_receiver,
318 duplicate_slots_receiver,
319 ancestor_duplicate_slots_receiver,
320 duplicate_confirmed_slots_receiver,
321 gossip_verified_vote_hash_receiver,
322 popular_pruned_forks_receiver,
323 };
324
325 let replay_stage_config = ReplayStageConfig {
326 vote_account: *vote_account,
327 authorized_voter_keypairs,
328 exit: exit.clone(),
329 leader_schedule_cache: leader_schedule_cache.clone(),
330 block_commitment_cache,
331 wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
332 tower_storage: tower_storage.clone(),
333 wait_to_vote_slot,
334 replay_forks_threads: tvu_config.replay_forks_threads,
335 replay_transactions_threads: tvu_config.replay_transactions_threads,
336 blockstore: blockstore.clone(),
337 bank_forks: bank_forks.clone(),
338 cluster_info: cluster_info.clone(),
339 poh_recorder: poh_recorder.clone(),
340 tower,
341 vote_tracker,
342 cluster_slots,
343 log_messages_bytes_limit,
344 prioritization_fee_cache: prioritization_fee_cache.clone(),
345 banking_tracer,
346 snapshot_controller,
347 };
348
349 let voting_service = VotingService::new(
350 voting_receiver,
351 cluster_info.clone(),
352 poh_recorder.clone(),
353 tower_storage,
354 vote_connection_cache.clone(),
355 alpenglow_socket,
356 bank_forks.clone(),
357 );
358
359 let warm_quic_cache_service = create_cache_warmer_if_needed(
360 connection_cache,
361 vote_connection_cache,
362 cluster_info,
363 poh_recorder,
364 &exit,
365 );
366
367 let cost_update_service = CostUpdateService::new(cost_update_receiver);
368
369 let drop_bank_service = DropBankService::new(drop_bank_receiver);
370
371 let replay_stage = if in_wen_restart {
372 None
373 } else {
374 Some(ReplayStage::new(
375 replay_stage_config,
376 replay_senders,
377 replay_receivers,
378 )?)
379 };
380
381 let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
382 BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
383 });
384
385 let duplicate_shred_listener = DuplicateShredListener::new(
386 exit,
387 cluster_info.clone(),
388 DuplicateShredHandler::new(
389 blockstore,
390 leader_schedule_cache.clone(),
391 bank_forks.clone(),
392 duplicate_slots_sender,
393 tvu_config.shred_version,
394 ),
395 );
396
397 Ok(Tvu {
398 fetch_stage,
399 shred_sigverify,
400 retransmit_stage,
401 window_service,
402 cluster_slots_service,
403 replay_stage,
404 blockstore_cleanup_service,
405 cost_update_service,
406 voting_service,
407 warm_quic_cache_service,
408 drop_bank_service,
409 duplicate_shred_listener,
410 })
411 }
412
413 pub fn join(self) -> thread::Result<()> {
414 self.retransmit_stage.join()?;
415 self.window_service.join()?;
416 self.cluster_slots_service.join()?;
417 self.fetch_stage.join()?;
418 self.shred_sigverify.join()?;
419 if self.blockstore_cleanup_service.is_some() {
420 self.blockstore_cleanup_service.unwrap().join()?;
421 }
422 if self.replay_stage.is_some() {
423 self.replay_stage.unwrap().join()?;
424 }
425 self.cost_update_service.join()?;
426 self.voting_service.join()?;
427 if let Some(warmup_service) = self.warm_quic_cache_service {
428 warmup_service.join()?;
429 }
430 self.drop_bank_service.join()?;
431 self.duplicate_shred_listener.join()?;
432 Ok(())
433 }
434}
435
436fn create_cache_warmer_if_needed(
437 connection_cache: Option<&Arc<ConnectionCache>>,
438 vote_connection_cache: Arc<ConnectionCache>,
439 cluster_info: &Arc<ClusterInfo>,
440 poh_recorder: &Arc<RwLock<PohRecorder>>,
441 exit: &Arc<AtomicBool>,
442) -> Option<WarmQuicCacheService> {
443 let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
444 let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
445
446 (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
447 WarmQuicCacheService::new(
448 tpu_connection_cache,
449 vote_connection_cache,
450 cluster_info.clone(),
451 poh_recorder.clone(),
452 exit.clone(),
453 )
454 })
455}
456
457#[cfg(test)]
458pub mod tests {
459 use {
460 super::*,
461 crate::{
462 consensus::tower_storage::FileTowerStorage,
463 repair::quic_endpoint::RepairQuicAsyncSenders,
464 },
465 serial_test::serial,
466 solana_gossip::{cluster_info::ClusterInfo, node::Node},
467 solana_keypair::Keypair,
468 solana_ledger::{
469 blockstore::BlockstoreSignals,
470 blockstore_options::BlockstoreOptions,
471 create_new_tmp_ledger,
472 genesis_utils::{create_genesis_config, GenesisConfigInfo},
473 },
474 solana_poh::poh_recorder::create_test_recorder,
475 solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
476 solana_runtime::bank::Bank,
477 solana_signer::Signer,
478 solana_streamer::socket::SocketAddrSpace,
479 solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
480 std::sync::atomic::{AtomicU64, Ordering},
481 };
482
483 fn test_tvu_exit(enable_wen_restart: bool) {
484 solana_logger::setup();
485 let leader = Node::new_localhost();
486 let target1_keypair = Keypair::new();
487 let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
488
489 let starting_balance = 10_000;
490 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
491
492 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
493
494 let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
495 tokio::sync::mpsc::channel(128);
496 let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
497 let (_, repair_response_quic_receiver) = unbounded();
498 let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
499 let (_, ancestor_hashes_response_quic_receiver) = unbounded();
500 let cluster_info1 = ClusterInfo::new(
502 target1.info.clone(),
503 target1_keypair.into(),
504 SocketAddrSpace::Unspecified,
505 );
506 cluster_info1.insert_info(leader.info);
507 let cref1 = Arc::new(cluster_info1);
508
509 let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
510 let BlockstoreSignals {
511 blockstore,
512 ledger_signal_receiver,
513 ..
514 } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
515 .expect("Expected to successfully open ledger");
516 let blockstore = Arc::new(blockstore);
517 let bank = bank_forks.read().unwrap().working_bank();
518 let (exit, poh_recorder, _transaction_recorder, poh_service, _entry_receiver) =
519 create_test_recorder(bank.clone(), blockstore.clone(), None, None);
520 let vote_keypair = Keypair::new();
521 let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
522 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
523 let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
524 let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
525 let (_verified_vote_sender, verified_vote_receiver) = unbounded();
526 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
527 let (_, gossip_confirmed_slots_receiver) = unbounded();
528 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
529 let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
530 let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
531 let cluster_slots = Arc::new(ClusterSlots::default());
532 let wen_restart_repair_slots = if enable_wen_restart {
533 Some(Arc::new(RwLock::new(vec![])))
534 } else {
535 None
536 };
537 let connection_cache = if DEFAULT_VOTE_USE_QUIC {
538 ConnectionCache::new_quic(
539 "connection_cache_vote_quic",
540 DEFAULT_TPU_CONNECTION_POOL_SIZE,
541 )
542 } else {
543 ConnectionCache::with_udp(
544 "connection_cache_vote_udp",
545 DEFAULT_TPU_CONNECTION_POOL_SIZE,
546 )
547 };
548
549 let tvu = Tvu::new(
550 &vote_keypair.pubkey(),
551 Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
552 &bank_forks,
553 &cref1,
554 {
555 TvuSockets {
556 repair: target1.sockets.repair,
557 retransmit: target1.sockets.retransmit_sockets,
558 fetch: target1.sockets.tvu,
559 ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
560 alpenglow: target1.sockets.alpenglow,
561 }
562 },
563 blockstore,
564 ledger_signal_receiver,
565 Some(Arc::new(RpcSubscriptions::new_for_tests(
566 exit.clone(),
567 max_complete_transaction_status_slot,
568 bank_forks.clone(),
569 block_commitment_cache.clone(),
570 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
571 ))),
572 &poh_recorder,
573 Tower::default(),
574 Arc::new(FileTowerStorage::default()),
575 &leader_schedule_cache,
576 exit.clone(),
577 block_commitment_cache,
578 Arc::<AtomicBool>::default(),
579 None,
580 None,
581 Arc::<VoteTracker>::default(),
582 retransmit_slots_sender,
583 gossip_verified_vote_hash_receiver,
584 verified_vote_receiver,
585 replay_vote_sender,
586 None,
587 None,
588 gossip_confirmed_slots_receiver,
589 TvuConfig::default(),
590 &Arc::new(MaxSlots::default()),
591 None,
592 None,
593 None, None,
595 Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
596 &ignored_prioritization_fee_cache,
597 BankingTracer::new_disabled(),
598 turbine_quic_endpoint_sender,
599 turbine_quic_endpoint_receiver,
600 repair_response_quic_receiver,
601 repair_quic_async_senders.repair_request_quic_sender,
602 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
603 ancestor_hashes_response_quic_receiver,
604 outstanding_repair_requests,
605 cluster_slots,
606 wen_restart_repair_slots,
607 None,
608 Arc::new(connection_cache),
609 )
610 .expect("assume success");
611 if enable_wen_restart {
612 assert!(tvu.replay_stage.is_none())
613 } else {
614 assert!(tvu.replay_stage.is_some())
615 }
616 exit.store(true, Ordering::Relaxed);
617 tvu.join().unwrap();
618 poh_service.join().unwrap();
619 }
620
621 #[test]
622 #[serial]
623 fn test_tvu_exit_no_wen_restart() {
624 test_tvu_exit(false);
625 }
626
627 #[test]
628 #[serial]
629 fn test_tvu_exit_with_wen_restart() {
630 test_tvu_exit(true);
631 }
632}