1use {
5 crate::{
6 banking_trace::BankingTracer,
7 cluster_info_vote_listener::{
8 DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VerifiedVoteReceiver,
9 VoteTracker,
10 },
11 cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
12 completed_data_sets_service::CompletedDataSetsSender,
13 consensus::{tower_storage::TowerStorage, Tower},
14 cost_update_service::CostUpdateService,
15 drop_bank_service::DropBankService,
16 repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels},
17 replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
18 shred_fetch_stage::ShredFetchStage,
19 voting_service::VotingService,
20 warm_quic_cache_service::WarmQuicCacheService,
21 window_service::{WindowService, WindowServiceChannels},
22 },
23 bytes::Bytes,
24 crossbeam_channel::{unbounded, Receiver, Sender},
25 solana_client::connection_cache::ConnectionCache,
26 solana_clock::Slot,
27 solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
28 solana_gossip::{
29 cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
30 duplicate_shred_listener::DuplicateShredListener,
31 },
32 solana_keypair::Keypair,
33 solana_ledger::{
34 blockstore::Blockstore, blockstore_cleanup_service::BlockstoreCleanupService,
35 blockstore_processor::TransactionStatusSender, entry_notifier_service::EntryNotifierSender,
36 leader_schedule_cache::LeaderScheduleCache,
37 },
38 solana_poh::poh_recorder::PohRecorder,
39 solana_pubkey::Pubkey,
40 solana_rpc::{
41 max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
42 rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
43 },
44 solana_runtime::{
45 bank_forks::BankForks, commitment::BlockCommitmentCache,
46 prioritization_fee_cache::PrioritizationFeeCache, snapshot_controller::SnapshotController,
47 vote_sender_types::ReplayVoteSender,
48 },
49 solana_streamer::evicting_sender::EvictingSender,
50 solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpConfig},
51 std::{
52 collections::HashSet,
53 net::{SocketAddr, UdpSocket},
54 num::NonZeroUsize,
55 sync::{atomic::AtomicBool, Arc, RwLock},
56 thread::{self, JoinHandle},
57 },
58 tokio::sync::mpsc::Sender as AsyncSender,
59};
60
61const CHANNEL_SIZE_RETRANSMIT_INGRESS: usize = 16 * 1024;
67
68pub struct Tvu {
69 fetch_stage: ShredFetchStage,
70 shred_sigverify: JoinHandle<()>,
71 retransmit_stage: RetransmitStage,
72 window_service: WindowService,
73 cluster_slots_service: ClusterSlotsService,
74 replay_stage: Option<ReplayStage>,
75 blockstore_cleanup_service: Option<BlockstoreCleanupService>,
76 cost_update_service: CostUpdateService,
77 voting_service: VotingService,
78 warm_quic_cache_service: Option<WarmQuicCacheService>,
79 drop_bank_service: DropBankService,
80 duplicate_shred_listener: DuplicateShredListener,
81}
82
83pub struct TvuSockets {
84 pub fetch: Vec<UdpSocket>,
85 pub repair: UdpSocket,
86 pub retransmit: Vec<UdpSocket>,
87 pub ancestor_hashes_requests: UdpSocket,
88}
89
90pub struct TvuConfig {
91 pub max_ledger_shreds: Option<u64>,
92 pub shred_version: u16,
93 pub repair_validators: Option<HashSet<Pubkey>>,
95 pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
97 pub wait_for_vote_to_start_leader: bool,
98 pub replay_forks_threads: NonZeroUsize,
99 pub replay_transactions_threads: NonZeroUsize,
100 pub shred_sigverify_threads: NonZeroUsize,
101 pub retransmit_xdp: Option<XdpConfig>,
102}
103
104impl Default for TvuConfig {
105 fn default() -> Self {
106 Self {
107 max_ledger_shreds: None,
108 shred_version: 0,
109 repair_validators: None,
110 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
111 wait_for_vote_to_start_leader: false,
112 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
113 replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
114 shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
115 retransmit_xdp: None,
116 }
117 }
118}
119
120impl Tvu {
121 #[allow(clippy::too_many_arguments)]
128 pub fn new(
129 vote_account: &Pubkey,
130 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
131 bank_forks: &Arc<RwLock<BankForks>>,
132 cluster_info: &Arc<ClusterInfo>,
133 sockets: TvuSockets,
134 blockstore: Arc<Blockstore>,
135 ledger_signal_receiver: Receiver<bool>,
136 rpc_subscriptions: &Arc<RpcSubscriptions>,
137 poh_recorder: &Arc<RwLock<PohRecorder>>,
138 tower: Tower,
139 tower_storage: Arc<dyn TowerStorage>,
140 leader_schedule_cache: &Arc<LeaderScheduleCache>,
141 exit: Arc<AtomicBool>,
142 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
143 turbine_disabled: Arc<AtomicBool>,
144 transaction_status_sender: Option<TransactionStatusSender>,
145 entry_notification_sender: Option<EntryNotifierSender>,
146 vote_tracker: Arc<VoteTracker>,
147 retransmit_slots_sender: Sender<Slot>,
148 gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
149 verified_vote_receiver: VerifiedVoteReceiver,
150 replay_vote_sender: ReplayVoteSender,
151 completed_data_sets_sender: Option<CompletedDataSetsSender>,
152 bank_notification_sender: Option<BankNotificationSenderConfig>,
153 duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
154 tvu_config: TvuConfig,
155 max_slots: &Arc<MaxSlots>,
156 block_metadata_notifier: Option<BlockMetadataNotifierArc>,
157 wait_to_vote_slot: Option<Slot>,
158 snapshot_controller: Option<Arc<SnapshotController>>,
159 log_messages_bytes_limit: Option<usize>,
160 connection_cache: Option<&Arc<ConnectionCache>>,
161 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
162 banking_tracer: Arc<BankingTracer>,
163 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
164 turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
165 repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
166 repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
167 ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
168 ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
169 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
170 cluster_slots: Arc<ClusterSlots>,
171 wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
172 slot_status_notifier: Option<SlotStatusNotifier>,
173 vote_connection_cache: Arc<ConnectionCache>,
174 ) -> Result<Self, String> {
175 let in_wen_restart = wen_restart_repair_slots.is_some();
176
177 let TvuSockets {
178 repair: repair_socket,
179 fetch: fetch_sockets,
180 retransmit: retransmit_sockets,
181 ancestor_hashes_requests: ancestor_hashes_socket,
182 } = sockets;
183
184 let (fetch_sender, fetch_receiver) = unbounded();
185
186 let repair_socket = Arc::new(repair_socket);
187 let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
188 let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
189 let fetch_stage = ShredFetchStage::new(
190 fetch_sockets,
191 turbine_quic_endpoint_receiver,
192 repair_response_quic_receiver,
193 repair_socket.clone(),
194 fetch_sender,
195 tvu_config.shred_version,
196 bank_forks.clone(),
197 cluster_info.clone(),
198 outstanding_repair_requests.clone(),
199 turbine_disabled,
200 exit.clone(),
201 );
202
203 let (verified_sender, verified_receiver) = unbounded();
204
205 let (retransmit_sender, retransmit_receiver) =
206 EvictingSender::new_bounded(CHANNEL_SIZE_RETRANSMIT_INGRESS);
207
208 let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
209 cluster_info.clone(),
210 bank_forks.clone(),
211 leader_schedule_cache.clone(),
212 fetch_receiver,
213 retransmit_sender.clone(),
214 verified_sender,
215 tvu_config.shred_sigverify_threads,
216 );
217
218 let retransmit_stage = RetransmitStage::new(
219 bank_forks.clone(),
220 leader_schedule_cache.clone(),
221 cluster_info.clone(),
222 Arc::new(retransmit_sockets),
223 turbine_quic_endpoint_sender,
224 retransmit_receiver,
225 max_slots.clone(),
226 Some(rpc_subscriptions.clone()),
227 slot_status_notifier.clone(),
228 tvu_config.retransmit_xdp.clone(),
229 );
230
231 let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
232 let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
233 let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
234 unbounded();
235 let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
236 let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
237 let window_service = {
238 let epoch_schedule = bank_forks
239 .read()
240 .unwrap()
241 .working_bank()
242 .epoch_schedule()
243 .clone();
244 let repair_info = RepairInfo {
245 bank_forks: bank_forks.clone(),
246 epoch_schedule,
247 ancestor_duplicate_slots_sender,
248 repair_validators: tvu_config.repair_validators,
249 repair_whitelist: tvu_config.repair_whitelist,
250 cluster_info: cluster_info.clone(),
251 cluster_slots: cluster_slots.clone(),
252 wen_restart_repair_slots,
253 };
254 let repair_service_channels = RepairServiceChannels::new(
255 repair_request_quic_sender,
256 verified_vote_receiver,
257 dumped_slots_receiver,
258 popular_pruned_forks_sender,
259 ancestor_hashes_request_quic_sender,
260 ancestor_hashes_response_quic_receiver,
261 ancestor_hashes_replay_update_receiver,
262 );
263 let window_service_channels = WindowServiceChannels::new(
264 verified_receiver,
265 retransmit_sender,
266 completed_data_sets_sender,
267 duplicate_slots_sender.clone(),
268 repair_service_channels,
269 );
270 WindowService::new(
271 blockstore.clone(),
272 repair_socket,
273 ancestor_hashes_socket,
274 exit.clone(),
275 repair_info,
276 window_service_channels,
277 leader_schedule_cache.clone(),
278 outstanding_repair_requests,
279 )
280 };
281
282 let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
283 let cluster_slots_service = ClusterSlotsService::new(
284 blockstore.clone(),
285 cluster_slots.clone(),
286 bank_forks.clone(),
287 cluster_info.clone(),
288 cluster_slots_update_receiver,
289 exit.clone(),
290 );
291
292 let (cost_update_sender, cost_update_receiver) = unbounded();
293 let (drop_bank_sender, drop_bank_receiver) = unbounded();
294 let (voting_sender, voting_receiver) = unbounded();
295
296 let replay_senders = ReplaySenders {
297 rpc_subscriptions: rpc_subscriptions.clone(),
298 slot_status_notifier,
299 transaction_status_sender,
300 entry_notification_sender,
301 bank_notification_sender,
302 ancestor_hashes_replay_update_sender,
303 retransmit_slots_sender,
304 replay_vote_sender,
305 cluster_slots_update_sender,
306 cost_update_sender,
307 voting_sender,
308 drop_bank_sender,
309 block_metadata_notifier,
310 dumped_slots_sender,
311 };
312
313 let replay_receivers = ReplayReceivers {
314 ledger_signal_receiver,
315 duplicate_slots_receiver,
316 ancestor_duplicate_slots_receiver,
317 duplicate_confirmed_slots_receiver,
318 gossip_verified_vote_hash_receiver,
319 popular_pruned_forks_receiver,
320 };
321
322 let replay_stage_config = ReplayStageConfig {
323 vote_account: *vote_account,
324 authorized_voter_keypairs,
325 exit: exit.clone(),
326 leader_schedule_cache: leader_schedule_cache.clone(),
327 block_commitment_cache,
328 wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
329 tower_storage: tower_storage.clone(),
330 wait_to_vote_slot,
331 replay_forks_threads: tvu_config.replay_forks_threads,
332 replay_transactions_threads: tvu_config.replay_transactions_threads,
333 blockstore: blockstore.clone(),
334 bank_forks: bank_forks.clone(),
335 cluster_info: cluster_info.clone(),
336 poh_recorder: poh_recorder.clone(),
337 tower,
338 vote_tracker,
339 cluster_slots,
340 log_messages_bytes_limit,
341 prioritization_fee_cache: prioritization_fee_cache.clone(),
342 banking_tracer,
343 snapshot_controller,
344 };
345
346 let voting_service = VotingService::new(
347 voting_receiver,
348 cluster_info.clone(),
349 poh_recorder.clone(),
350 tower_storage,
351 vote_connection_cache.clone(),
352 );
353
354 let warm_quic_cache_service = create_cache_warmer_if_needed(
355 connection_cache,
356 vote_connection_cache,
357 cluster_info,
358 poh_recorder,
359 &exit,
360 );
361
362 let cost_update_service = CostUpdateService::new(blockstore.clone(), cost_update_receiver);
363
364 let drop_bank_service = DropBankService::new(drop_bank_receiver);
365
366 let replay_stage = if in_wen_restart {
367 None
368 } else {
369 Some(ReplayStage::new(
370 replay_stage_config,
371 replay_senders,
372 replay_receivers,
373 )?)
374 };
375
376 let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
377 BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone())
378 });
379
380 let duplicate_shred_listener = DuplicateShredListener::new(
381 exit,
382 cluster_info.clone(),
383 DuplicateShredHandler::new(
384 blockstore,
385 leader_schedule_cache.clone(),
386 bank_forks.clone(),
387 duplicate_slots_sender,
388 tvu_config.shred_version,
389 ),
390 );
391
392 Ok(Tvu {
393 fetch_stage,
394 shred_sigverify,
395 retransmit_stage,
396 window_service,
397 cluster_slots_service,
398 replay_stage,
399 blockstore_cleanup_service,
400 cost_update_service,
401 voting_service,
402 warm_quic_cache_service,
403 drop_bank_service,
404 duplicate_shred_listener,
405 })
406 }
407
408 pub fn join(self) -> thread::Result<()> {
409 self.retransmit_stage.join()?;
410 self.window_service.join()?;
411 self.cluster_slots_service.join()?;
412 self.fetch_stage.join()?;
413 self.shred_sigverify.join()?;
414 if self.blockstore_cleanup_service.is_some() {
415 self.blockstore_cleanup_service.unwrap().join()?;
416 }
417 if self.replay_stage.is_some() {
418 self.replay_stage.unwrap().join()?;
419 }
420 self.cost_update_service.join()?;
421 self.voting_service.join()?;
422 if let Some(warmup_service) = self.warm_quic_cache_service {
423 warmup_service.join()?;
424 }
425 self.drop_bank_service.join()?;
426 self.duplicate_shred_listener.join()?;
427 Ok(())
428 }
429}
430
431fn create_cache_warmer_if_needed(
432 connection_cache: Option<&Arc<ConnectionCache>>,
433 vote_connection_cache: Arc<ConnectionCache>,
434 cluster_info: &Arc<ClusterInfo>,
435 poh_recorder: &Arc<RwLock<PohRecorder>>,
436 exit: &Arc<AtomicBool>,
437) -> Option<WarmQuicCacheService> {
438 let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
439 let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());
440
441 (tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
442 WarmQuicCacheService::new(
443 tpu_connection_cache,
444 vote_connection_cache,
445 cluster_info.clone(),
446 poh_recorder.clone(),
447 exit.clone(),
448 )
449 })
450}
451
452#[cfg(test)]
453pub mod tests {
454 use {
455 super::*,
456 crate::{
457 consensus::tower_storage::FileTowerStorage,
458 repair::quic_endpoint::RepairQuicAsyncSenders,
459 },
460 serial_test::serial,
461 solana_gossip::cluster_info::{ClusterInfo, Node},
462 solana_keypair::Keypair,
463 solana_ledger::{
464 blockstore::BlockstoreSignals,
465 blockstore_options::BlockstoreOptions,
466 create_new_tmp_ledger,
467 genesis_utils::{create_genesis_config, GenesisConfigInfo},
468 },
469 solana_poh::poh_recorder::create_test_recorder,
470 solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
471 solana_runtime::bank::Bank,
472 solana_signer::Signer,
473 solana_streamer::socket::SocketAddrSpace,
474 solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
475 std::sync::atomic::{AtomicU64, Ordering},
476 };
477
478 fn test_tvu_exit(enable_wen_restart: bool) {
479 solana_logger::setup();
480 let leader = Node::new_localhost();
481 let target1_keypair = Keypair::new();
482 let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey());
483
484 let starting_balance = 10_000;
485 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(starting_balance);
486
487 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
488
489 let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
490 tokio::sync::mpsc::channel(128);
491 let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
492 let (_, repair_response_quic_receiver) = unbounded();
493 let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy();
494 let (_, ancestor_hashes_response_quic_receiver) = unbounded();
495 let cluster_info1 = ClusterInfo::new(
497 target1.info.clone(),
498 target1_keypair.into(),
499 SocketAddrSpace::Unspecified,
500 );
501 cluster_info1.insert_info(leader.info);
502 let cref1 = Arc::new(cluster_info1);
503
504 let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
505 let BlockstoreSignals {
506 blockstore,
507 ledger_signal_receiver,
508 ..
509 } = Blockstore::open_with_signal(&blockstore_path, BlockstoreOptions::default())
510 .expect("Expected to successfully open ledger");
511 let blockstore = Arc::new(blockstore);
512 let bank = bank_forks.read().unwrap().working_bank();
513 let (exit, poh_recorder, _transaction_recorder, poh_service, _entry_receiver) =
514 create_test_recorder(bank.clone(), blockstore.clone(), None, None);
515 let vote_keypair = Keypair::new();
516 let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
517 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
518 let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
519 let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
520 let (_verified_vote_sender, verified_vote_receiver) = unbounded();
521 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
522 let (_, gossip_confirmed_slots_receiver) = unbounded();
523 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
524 let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
525 let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
526 let cluster_slots = Arc::new(ClusterSlots::default());
527 let wen_restart_repair_slots = if enable_wen_restart {
528 Some(Arc::new(RwLock::new(vec![])))
529 } else {
530 None
531 };
532 let connection_cache = if DEFAULT_VOTE_USE_QUIC {
533 ConnectionCache::new_quic(
534 "connection_cache_vote_quic",
535 DEFAULT_TPU_CONNECTION_POOL_SIZE,
536 )
537 } else {
538 ConnectionCache::with_udp(
539 "connection_cache_vote_udp",
540 DEFAULT_TPU_CONNECTION_POOL_SIZE,
541 )
542 };
543
544 let tvu = Tvu::new(
545 &vote_keypair.pubkey(),
546 Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
547 &bank_forks,
548 &cref1,
549 {
550 TvuSockets {
551 repair: target1.sockets.repair,
552 retransmit: target1.sockets.retransmit_sockets,
553 fetch: target1.sockets.tvu,
554 ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
555 }
556 },
557 blockstore,
558 ledger_signal_receiver,
559 &Arc::new(RpcSubscriptions::new_for_tests(
560 exit.clone(),
561 max_complete_transaction_status_slot,
562 bank_forks.clone(),
563 block_commitment_cache.clone(),
564 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
565 )),
566 &poh_recorder,
567 Tower::default(),
568 Arc::new(FileTowerStorage::default()),
569 &leader_schedule_cache,
570 exit.clone(),
571 block_commitment_cache,
572 Arc::<AtomicBool>::default(),
573 None,
574 None,
575 Arc::<VoteTracker>::default(),
576 retransmit_slots_sender,
577 gossip_verified_vote_hash_receiver,
578 verified_vote_receiver,
579 replay_vote_sender,
580 None,
581 None,
582 gossip_confirmed_slots_receiver,
583 TvuConfig::default(),
584 &Arc::new(MaxSlots::default()),
585 None,
586 None,
587 None, None,
589 Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
590 &ignored_prioritization_fee_cache,
591 BankingTracer::new_disabled(),
592 turbine_quic_endpoint_sender,
593 turbine_quic_endpoint_receiver,
594 repair_response_quic_receiver,
595 repair_quic_async_senders.repair_request_quic_sender,
596 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
597 ancestor_hashes_response_quic_receiver,
598 outstanding_repair_requests,
599 cluster_slots,
600 wen_restart_repair_slots,
601 None,
602 Arc::new(connection_cache),
603 )
604 .expect("assume success");
605 if enable_wen_restart {
606 assert!(tvu.replay_stage.is_none())
607 } else {
608 assert!(tvu.replay_stage.is_some())
609 }
610 exit.store(true, Ordering::Relaxed);
611 tvu.join().unwrap();
612 poh_service.join().unwrap();
613 }
614
615 #[test]
616 #[serial]
617 fn test_tvu_exit_no_wen_restart() {
618 test_tvu_exit(false);
619 }
620
621 #[test]
622 #[serial]
623 fn test_tvu_exit_with_wen_restart() {
624 test_tvu_exit(true);
625 }
626}