Skip to main content

solana_core/
banking_stage.rs

1//! The `banking_stage` processes Transaction messages. It is intended to be used
2//! to construct a software pipeline. The stage uses all available CPU cores and
3//! can do its processing in parallel with signature verification on the GPU.
4
5#[cfg(feature = "dev-context-only-utils")]
6use qualifier_attr::qualifiers;
7use {
8    self::{
9        committer::Committer, consumer::Consumer, decision_maker::DecisionMaker,
10        qos_service::QosService, vote_packet_receiver::VotePacketReceiver,
11        vote_storage::VoteStorage,
12    },
13    crate::{
14        banking_stage::{
15            consume_worker::ConsumeWorker,
16            transaction_scheduler::{
17                prio_graph_scheduler::PrioGraphScheduler,
18                scheduler_controller::{
19                    SchedulerConfig, SchedulerController, DEFAULT_SCHEDULER_PACING_FILL_TIME_MILLIS,
20                },
21                scheduler_error::SchedulerError,
22            },
23        },
24        validator::BlockProductionMethod,
25    },
26    agave_banking_stage_ingress_types::BankingPacketReceiver,
27    crossbeam_channel::{unbounded, Receiver, Sender},
28    histogram::Histogram,
29    solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery},
30    solana_ledger::blockstore_processor::TransactionStatusSender,
31    solana_perf::packet::PACKETS_PER_BATCH,
32    solana_poh::{
33        poh_controller::PohController, poh_recorder::PohRecorder,
34        transaction_recorder::TransactionRecorder,
35    },
36    solana_pubkey::Pubkey,
37    solana_runtime::{
38        bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
39        vote_sender_types::ReplayVoteSender,
40    },
41    solana_time_utils::AtomicInterval,
42    std::{
43        num::{NonZeroU64, NonZeroUsize, Saturating},
44        ops::Deref,
45        sync::{
46            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
47            Arc, RwLock,
48        },
49        thread::{self, Builder, JoinHandle},
50        time::Duration,
51    },
52    transaction_scheduler::{
53        greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig},
54        prio_graph_scheduler::PrioGraphSchedulerConfig,
55        receive_and_buffer::{ReceiveAndBuffer, TransactionViewReceiveAndBuffer},
56    },
57    vote_worker::VoteWorker,
58};
59
60// Below modules are pub to allow use by banking_stage bench
61pub mod committer;
62pub mod consumer;
63pub mod leader_slot_metrics;
64pub mod qos_service;
65pub mod vote_storage;
66
67mod consume_worker;
68mod vote_worker;
69
70#[cfg(feature = "dev-context-only-utils")]
71pub mod decision_maker;
72#[cfg(not(feature = "dev-context-only-utils"))]
73mod decision_maker;
74
75mod latest_validator_vote_packet;
76mod leader_slot_timing_metrics;
77mod read_write_account_set;
78mod vote_packet_receiver;
79
80#[cfg(feature = "dev-context-only-utils")]
81pub mod scheduler_messages;
82#[cfg(not(feature = "dev-context-only-utils"))]
83mod scheduler_messages;
84
85pub mod transaction_scheduler;
86
87#[cfg(feature = "dev-context-only-utils")]
88pub mod unified_scheduler;
89#[cfg(not(feature = "dev-context-only-utils"))]
90pub(crate) mod unified_scheduler;
91
92#[cfg(unix)]
93mod progress_tracker;
94#[cfg(unix)]
95mod tpu_to_pack;
96
97/// The maximum number of worker threads that can be spawned by banking stage.
98/// 64 because `ThreadAwareAccountLocks` uses a `u64` as a bitmask to
99/// track thread placement.
100const MAX_NUM_WORKERS: NonZeroUsize = NonZeroUsize::new(64).unwrap();
101const DEFAULT_NUM_WORKERS: NonZeroUsize = NonZeroUsize::new(4).unwrap();
102
103#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
104const TOTAL_BUFFERED_PACKETS: usize = 100_000;
105const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
106
107#[derive(Debug, Default)]
108pub struct BankingStageStats {
109    last_report: AtomicInterval,
110    tpu_counts: VoteSourceCounts,
111    gossip_counts: VoteSourceCounts,
112    pub(crate) dropped_duplicated_packets_count: AtomicUsize,
113    dropped_forward_packets_count: AtomicUsize,
114    current_buffered_packets_count: AtomicUsize,
115    rebuffered_packets_count: AtomicUsize,
116    consumed_buffered_packets_count: AtomicUsize,
117    batch_packet_indexes_len: Histogram,
118
119    // Timing
120    consume_buffered_packets_elapsed: AtomicU64,
121    receive_and_buffer_packets_elapsed: AtomicU64,
122    filter_pending_packets_elapsed: AtomicU64,
123    pub(crate) packet_conversion_elapsed: AtomicU64,
124    transaction_processing_elapsed: AtomicU64,
125}
126
127#[derive(Debug, Default)]
128struct VoteSourceCounts {
129    receive_and_buffer_packets_count: AtomicUsize,
130    dropped_packets_count: AtomicUsize,
131    newly_buffered_packets_count: AtomicUsize,
132    newly_buffered_forwarded_packets_count: AtomicUsize,
133}
134
135impl VoteSourceCounts {
136    fn is_empty(&self) -> bool {
137        0 == self
138            .receive_and_buffer_packets_count
139            .load(Ordering::Relaxed)
140            + self.dropped_packets_count.load(Ordering::Relaxed)
141            + self.newly_buffered_packets_count.load(Ordering::Relaxed)
142            + self
143                .newly_buffered_forwarded_packets_count
144                .load(Ordering::Relaxed)
145    }
146}
147
148impl BankingStageStats {
149    pub fn new() -> Self {
150        BankingStageStats {
151            batch_packet_indexes_len: Histogram::configure()
152                .max_value(PACKETS_PER_BATCH as u64)
153                .build()
154                .unwrap(),
155            ..BankingStageStats::default()
156        }
157    }
158
159    fn is_empty(&self) -> bool {
160        self.gossip_counts.is_empty()
161            && self.tpu_counts.is_empty()
162            && 0 == self
163                .dropped_duplicated_packets_count
164                .load(Ordering::Relaxed) as u64
165                + self.dropped_forward_packets_count.load(Ordering::Relaxed) as u64
166                + self.current_buffered_packets_count.load(Ordering::Relaxed) as u64
167                + self.rebuffered_packets_count.load(Ordering::Relaxed) as u64
168                + self.consumed_buffered_packets_count.load(Ordering::Relaxed) as u64
169                + self
170                    .consume_buffered_packets_elapsed
171                    .load(Ordering::Relaxed)
172                + self
173                    .receive_and_buffer_packets_elapsed
174                    .load(Ordering::Relaxed)
175                + self.filter_pending_packets_elapsed.load(Ordering::Relaxed)
176                + self.packet_conversion_elapsed.load(Ordering::Relaxed)
177                + self.transaction_processing_elapsed.load(Ordering::Relaxed)
178                + self.batch_packet_indexes_len.entries()
179    }
180
181    fn report(&mut self, report_interval_ms: u64) {
182        // skip reporting metrics if stats is empty
183        if self.is_empty() {
184            return;
185        }
186        if self.last_report.should_update(report_interval_ms) {
187            datapoint_info!(
188                "banking_stage-vote_loop_stats",
189                (
190                    "tpu_receive_and_buffer_packets_count",
191                    self.tpu_counts
192                        .receive_and_buffer_packets_count
193                        .swap(0, Ordering::Relaxed),
194                    i64
195                ),
196                (
197                    "tpu_dropped_packets_count",
198                    self.tpu_counts
199                        .dropped_packets_count
200                        .swap(0, Ordering::Relaxed),
201                    i64
202                ),
203                (
204                    "tpu_newly_buffered_packets_count",
205                    self.tpu_counts
206                        .newly_buffered_packets_count
207                        .swap(0, Ordering::Relaxed),
208                    i64
209                ),
210                (
211                    "tpu_newly_buffered_forwarded_packets_count",
212                    self.tpu_counts
213                        .newly_buffered_forwarded_packets_count
214                        .swap(0, Ordering::Relaxed),
215                    i64
216                ),
217                (
218                    "gossip_receive_and_buffer_packets_count",
219                    self.gossip_counts
220                        .receive_and_buffer_packets_count
221                        .swap(0, Ordering::Relaxed),
222                    i64
223                ),
224                (
225                    "gossip_dropped_packets_count",
226                    self.gossip_counts
227                        .dropped_packets_count
228                        .swap(0, Ordering::Relaxed),
229                    i64
230                ),
231                (
232                    "gossip_newly_buffered_packets_count",
233                    self.gossip_counts
234                        .newly_buffered_packets_count
235                        .swap(0, Ordering::Relaxed),
236                    i64
237                ),
238                (
239                    "gossip_newly_buffered_forwarded_packets_count",
240                    self.gossip_counts
241                        .newly_buffered_forwarded_packets_count
242                        .swap(0, Ordering::Relaxed),
243                    i64
244                ),
245                (
246                    "dropped_duplicated_packets_count",
247                    self.dropped_duplicated_packets_count
248                        .swap(0, Ordering::Relaxed),
249                    i64
250                ),
251                (
252                    "dropped_forward_packets_count",
253                    self.dropped_forward_packets_count
254                        .swap(0, Ordering::Relaxed),
255                    i64
256                ),
257                (
258                    "current_buffered_packets_count",
259                    self.current_buffered_packets_count
260                        .swap(0, Ordering::Relaxed),
261                    i64
262                ),
263                (
264                    "rebuffered_packets_count",
265                    self.rebuffered_packets_count.swap(0, Ordering::Relaxed),
266                    i64
267                ),
268                (
269                    "consumed_buffered_packets_count",
270                    self.consumed_buffered_packets_count
271                        .swap(0, Ordering::Relaxed),
272                    i64
273                ),
274                (
275                    "consume_buffered_packets_elapsed",
276                    self.consume_buffered_packets_elapsed
277                        .swap(0, Ordering::Relaxed),
278                    i64
279                ),
280                (
281                    "receive_and_buffer_packets_elapsed",
282                    self.receive_and_buffer_packets_elapsed
283                        .swap(0, Ordering::Relaxed),
284                    i64
285                ),
286                (
287                    "filter_pending_packets_elapsed",
288                    self.filter_pending_packets_elapsed
289                        .swap(0, Ordering::Relaxed),
290                    i64
291                ),
292                (
293                    "packet_conversion_elapsed",
294                    self.packet_conversion_elapsed.swap(0, Ordering::Relaxed),
295                    i64
296                ),
297                (
298                    "transaction_processing_elapsed",
299                    self.transaction_processing_elapsed
300                        .swap(0, Ordering::Relaxed),
301                    i64
302                ),
303                (
304                    "packet_batch_indices_len_min",
305                    self.batch_packet_indexes_len.minimum().unwrap_or(0),
306                    i64
307                ),
308                (
309                    "packet_batch_indices_len_max",
310                    self.batch_packet_indexes_len.maximum().unwrap_or(0),
311                    i64
312                ),
313                (
314                    "packet_batch_indices_len_mean",
315                    self.batch_packet_indexes_len.mean().unwrap_or(0),
316                    i64
317                ),
318                (
319                    "packet_batch_indices_len_90pct",
320                    self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0),
321                    i64
322                )
323            );
324            self.batch_packet_indexes_len.clear();
325        }
326    }
327}
328
329#[derive(Debug, Default)]
330pub struct BatchedTransactionDetails {
331    pub costs: BatchedTransactionCostDetails,
332    pub errors: BatchedTransactionErrorDetails,
333}
334
335#[derive(Debug, Default)]
336pub struct BatchedTransactionCostDetails {
337    pub batched_signature_cost: Saturating<u64>,
338    pub batched_write_lock_cost: Saturating<u64>,
339    pub batched_data_bytes_cost: Saturating<u64>,
340    pub batched_loaded_accounts_data_size_cost: Saturating<u64>,
341    pub batched_programs_execute_cost: Saturating<u64>,
342}
343
344#[derive(Debug, Default)]
345pub struct BatchedTransactionErrorDetails {
346    pub batched_retried_txs_per_block_limit_count: Saturating<u64>,
347    pub batched_retried_txs_per_vote_limit_count: Saturating<u64>,
348    pub batched_retried_txs_per_account_limit_count: Saturating<u64>,
349    pub batched_retried_txs_per_account_data_block_limit_count: Saturating<u64>,
350    pub batched_dropped_txs_per_account_data_total_limit_count: Saturating<u64>,
351}
352
353pub struct BankingStage {
354    // Only None during final join of BankingStage.
355    context: Option<BankingStageContext>,
356    thread_hdls: Vec<JoinHandle<()>>,
357}
358
359pub trait LikeClusterInfo: Send + Sync + 'static + Clone {
360    fn id(&self) -> Pubkey;
361
362    fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R>;
363}
364
365impl LikeClusterInfo for Arc<ClusterInfo> {
366    fn id(&self) -> Pubkey {
367        self.deref().id()
368    }
369
370    fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R> {
371        self.deref().lookup_contact_info(id, query)
372    }
373}
374
375impl BankingStage {
376    #[allow(clippy::too_many_arguments)]
377    pub fn new_num_threads(
378        block_production_method: BlockProductionMethod,
379        poh_recorder: Arc<RwLock<PohRecorder>>,
380        transaction_recorder: TransactionRecorder,
381        non_vote_receiver: BankingPacketReceiver,
382        tpu_vote_receiver: BankingPacketReceiver,
383        gossip_vote_receiver: BankingPacketReceiver,
384        num_workers: NonZeroUsize,
385        scheduler_config: SchedulerConfig,
386        transaction_status_sender: Option<TransactionStatusSender>,
387        replay_vote_sender: ReplayVoteSender,
388        log_messages_bytes_limit: Option<usize>,
389        bank_forks: Arc<RwLock<BankForks>>,
390        prioritization_fee_cache: Arc<PrioritizationFeeCache>,
391    ) -> Self {
392        let committer = Committer::new(
393            transaction_status_sender,
394            replay_vote_sender,
395            prioritization_fee_cache,
396        );
397
398        let context = BankingStageContext {
399            exit_signal: Arc::new(AtomicBool::new(false)),
400            tpu_vote_receiver,
401            gossip_vote_receiver,
402            non_vote_receiver,
403            transaction_recorder,
404            poh_recorder,
405            bank_forks,
406            committer,
407            log_messages_bytes_limit,
408        };
409        // + 1 for vote worker
410        // + 1 for the scheduler thread
411        let mut thread_hdls = Vec::with_capacity(num_workers.get() + 2);
412        thread_hdls.push(Self::spawn_vote_worker(&context));
413
414        let receive_and_buffer = TransactionViewReceiveAndBuffer {
415            receiver: context.non_vote_receiver.clone(),
416            bank_forks: context.bank_forks.clone(),
417        };
418        Self::spawn_scheduler_and_workers(
419            &mut thread_hdls,
420            receive_and_buffer,
421            matches!(
422                block_production_method,
423                BlockProductionMethod::CentralSchedulerGreedy
424            ),
425            num_workers,
426            scheduler_config,
427            &context,
428        );
429
430        Self {
431            context: Some(context),
432            thread_hdls,
433        }
434    }
435
436    /// Spawns the requested internal scheduler & accompanying worker threads.
437    pub fn spawn_internal_threads(
438        &mut self,
439        block_production_method: BlockProductionMethod,
440        num_workers: NonZeroUsize,
441        scheduler_config: SchedulerConfig,
442    ) -> thread::Result<()> {
443        if let Some(context) = self.context.as_ref() {
444            info!("Shutting down banking stage threads");
445            context.exit_signal.store(true, Ordering::Relaxed);
446            for bank_thread_hdl in self.thread_hdls.drain(..) {
447                bank_thread_hdl.join()?;
448            }
449
450            info!(
451                "Spawning new banking stage threads with block-production-method: \
452                 {block_production_method:?} num-workers: {num_workers}"
453            );
454            context.exit_signal.store(false, Ordering::Relaxed);
455            self.thread_hdls.push(Self::spawn_vote_worker(context));
456
457            let receive_and_buffer = TransactionViewReceiveAndBuffer {
458                receiver: context.non_vote_receiver.clone(),
459                bank_forks: context.bank_forks.clone(),
460            };
461            Self::spawn_scheduler_and_workers(
462                &mut self.thread_hdls,
463                receive_and_buffer,
464                matches!(
465                    block_production_method,
466                    BlockProductionMethod::CentralSchedulerGreedy
467                ),
468                num_workers,
469                scheduler_config,
470                context,
471            );
472        }
473
474        Ok(())
475    }
476
477    fn spawn_scheduler_and_workers<R: ReceiveAndBuffer + Send + Sync + 'static>(
478        non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
479        receive_and_buffer: R,
480        use_greedy_scheduler: bool,
481        num_workers: NonZeroUsize,
482        scheduler_config: SchedulerConfig,
483        context: &BankingStageContext,
484    ) {
485        assert!(num_workers <= BankingStage::max_num_workers());
486        let num_workers = num_workers.get();
487
488        let exit = context.exit_signal.clone();
489
490        // Create channels for communication between scheduler and workers
491        let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
492            (0..num_workers).map(|_| unbounded()).unzip();
493        let (finished_work_sender, finished_work_receiver) = unbounded();
494
495        // Spawn the worker threads
496        let decision_maker = DecisionMaker::from(context.poh_recorder.read().unwrap().deref());
497        let mut worker_metrics = Vec::with_capacity(num_workers);
498        for (index, work_receiver) in work_receivers.into_iter().enumerate() {
499            let id = index as u32;
500            let consume_worker = ConsumeWorker::new(
501                id,
502                exit.clone(),
503                work_receiver,
504                Consumer::new(
505                    context.committer.clone(),
506                    context.transaction_recorder.clone(),
507                    QosService::new(id),
508                    context.log_messages_bytes_limit,
509                ),
510                finished_work_sender.clone(),
511                context.poh_recorder.read().unwrap().shared_leader_state(),
512            );
513
514            worker_metrics.push(consume_worker.metrics_handle());
515            non_vote_thread_hdls.push(
516                Builder::new()
517                    .name(format!("solCoWorker{id:02}"))
518                    .spawn(move || {
519                        let _ = consume_worker.run();
520                    })
521                    .unwrap(),
522            )
523        }
524
525        // Macro to spawn the scheduler. Different type on `scheduler` and thus
526        // scheduler_controller mean we cannot have an easy if for `scheduler`
527        // assignment without introducing `dyn`.
528        macro_rules! spawn_scheduler {
529            ($scheduler:ident) => {
530                let exit = exit.clone();
531                let bank_forks = context.bank_forks.clone();
532                non_vote_thread_hdls.push(
533                    Builder::new()
534                        .name("solBnkTxSched".to_string())
535                        .spawn(move || {
536                            let scheduler_controller = SchedulerController::new(
537                                exit,
538                                scheduler_config,
539                                decision_maker,
540                                receive_and_buffer,
541                                bank_forks,
542                                $scheduler,
543                                worker_metrics,
544                            );
545
546                            match scheduler_controller.run() {
547                                Ok(_) => {}
548                                Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
549                                Err(SchedulerError::DisconnectedSendChannel(_)) => {
550                                    warn!("Unexpected worker disconnect from scheduler")
551                                }
552                            }
553                        })
554                        .unwrap(),
555                );
556            };
557        }
558
559        // Spawn the central scheduler thread
560        if use_greedy_scheduler {
561            let scheduler = GreedyScheduler::new(
562                work_senders,
563                finished_work_receiver,
564                GreedySchedulerConfig::default(),
565            );
566            spawn_scheduler!(scheduler);
567        } else {
568            let scheduler = PrioGraphScheduler::new(
569                work_senders,
570                finished_work_receiver,
571                PrioGraphSchedulerConfig::default(),
572            );
573            spawn_scheduler!(scheduler);
574        }
575    }
576
577    fn spawn_vote_worker(context: &BankingStageContext) -> JoinHandle<()> {
578        let vote_storage = VoteStorage::new(&context.bank_forks.read().unwrap().working_bank());
579        let tpu_receiver = VotePacketReceiver::new(context.tpu_vote_receiver.clone());
580        let gossip_receiver = VotePacketReceiver::new(context.gossip_vote_receiver.clone());
581        let consumer = Consumer::new(
582            context.committer.clone(),
583            context.transaction_recorder.clone(),
584            QosService::new(0),
585            context.log_messages_bytes_limit,
586        );
587        let decision_maker = DecisionMaker::from(context.poh_recorder.read().unwrap().deref());
588
589        let exit_signal = context.exit_signal.clone();
590        let bank_forks = context.bank_forks.clone();
591        Builder::new()
592            .name("solBanknStgVote".to_string())
593            .spawn(move || {
594                VoteWorker::new(
595                    exit_signal,
596                    decision_maker,
597                    tpu_receiver,
598                    gossip_receiver,
599                    vote_storage,
600                    bank_forks,
601                    consumer,
602                )
603                .run()
604            })
605            .unwrap()
606    }
607
608    pub fn default_num_workers() -> NonZeroUsize {
609        DEFAULT_NUM_WORKERS
610    }
611
612    pub const fn max_num_workers() -> NonZeroUsize {
613        MAX_NUM_WORKERS
614    }
615
616    pub const fn default_fill_time_millis() -> NonZeroU64 {
617        DEFAULT_SCHEDULER_PACING_FILL_TIME_MILLIS
618    }
619
620    pub fn join(mut self) -> thread::Result<()> {
621        self.context
622            .take()
623            .expect("non-vote context must be Some")
624            .exit_signal
625            .store(true, Ordering::Relaxed);
626        for bank_thread_hdl in self.thread_hdls {
627            bank_thread_hdl.join()?;
628        }
629        Ok(())
630    }
631}
632
633#[cfg(unix)]
634mod external {
635    use {
636        super::*,
637        crate::banking_stage::consume_worker::external::ExternalWorker,
638        agave_scheduling_utils::handshake::server::{AgaveSession, AgaveWorkerSession},
639        tpu_to_pack::BankingPacketReceivers,
640    };
641
642    impl BankingStage {
643        /// Spawns the external workers as specified by the [`AgaveSession`].
644        pub fn spawn_external_threads(
645            &mut self,
646            AgaveSession {
647                tpu_to_pack,
648                progress_tracker,
649                workers,
650            }: AgaveSession,
651        ) -> thread::Result<()> {
652            if let Some(context) = self.context.as_ref() {
653                // Shutdown the previous workers.
654                info!("Shutting down banking stage threads");
655                context.exit_signal.store(true, Ordering::Relaxed);
656                for bank_thread_hdl in self.thread_hdls.drain(..) {
657                    bank_thread_hdl.join()?;
658                }
659
660                context.exit_signal.store(false, Ordering::Relaxed);
661
662                // Spawn the new workers.
663                self.thread_hdls.push(Self::spawn_vote_worker(context));
664                Self::spawn_external_workers(&mut self.thread_hdls, context, workers);
665
666                // Spawn tpu_to_pack.
667                self.thread_hdls.push(tpu_to_pack::spawn(
668                    context.exit_signal.clone(),
669                    BankingPacketReceivers {
670                        non_vote_receiver: context.non_vote_receiver.clone(),
671                        gossip_vote_receiver: None,
672                        tpu_vote_receiver: None,
673                    },
674                    tpu_to_pack,
675                ));
676
677                // Spawn progress tracker.
678                let (shared_leader_state, ticks_per_slot) = {
679                    let poh = context.poh_recorder.read().unwrap();
680
681                    (poh.shared_leader_state(), poh.ticks_per_slot())
682                };
683                self.thread_hdls.push(progress_tracker::spawn(
684                    context.exit_signal.clone(),
685                    progress_tracker,
686                    shared_leader_state,
687                    ticks_per_slot,
688                ));
689            }
690
691            Ok(())
692        }
693
694        fn spawn_external_workers(
695            non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
696            context: &BankingStageContext,
697            workers: Vec<AgaveWorkerSession>,
698        ) {
699            static_assertions::const_assert!(
700                agave_scheduling_utils::handshake::MAX_WORKERS
701                    == BankingStage::max_num_workers().get()
702            );
703            assert!(workers.len() <= BankingStage::max_num_workers().get());
704
705            // Spawn the worker threads
706            let mut worker_metrics = Vec::with_capacity(workers.len());
707            for (
708                index,
709                AgaveWorkerSession {
710                    allocator,
711                    pack_to_worker,
712                    worker_to_pack,
713                },
714            ) in workers.into_iter().enumerate()
715            {
716                let id = index as u32;
717                let consume_worker = ExternalWorker::new(
718                    id,
719                    context.exit_signal.clone(),
720                    pack_to_worker,
721                    Consumer::new(
722                        context.committer.clone(),
723                        context.transaction_recorder.clone(),
724                        QosService::new(id),
725                        context.log_messages_bytes_limit,
726                    ),
727                    worker_to_pack,
728                    allocator,
729                    context.poh_recorder.read().unwrap().shared_leader_state(),
730                    context.bank_forks.read().unwrap().sharable_banks(),
731                );
732
733                worker_metrics.push(consume_worker.metrics_handle());
734                non_vote_thread_hdls.push(
735                    Builder::new()
736                        .name(format!("solECoWorker{id:02}"))
737                        .spawn(move || {
738                            let _ = consume_worker.run();
739                        })
740                        .unwrap(),
741                )
742            }
743        }
744    }
745}
746
747// Context for spawning threads in the banking stage.
748#[derive(Clone)]
749struct BankingStageContext {
750    exit_signal: Arc<AtomicBool>,
751    tpu_vote_receiver: BankingPacketReceiver,
752    gossip_vote_receiver: BankingPacketReceiver,
753    non_vote_receiver: BankingPacketReceiver,
754    transaction_recorder: TransactionRecorder,
755    poh_recorder: Arc<RwLock<PohRecorder>>,
756    bank_forks: Arc<RwLock<BankForks>>,
757    committer: Committer,
758    log_messages_bytes_limit: Option<usize>,
759}
760
761#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
762pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
763    bank_forks: &RwLock<BankForks>,
764    poh_controller: &mut PohController,
765    tpu_bank: Bank,
766) {
767    let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
768    if poh_controller.set_bank(tpu_bank).is_err() {
769        warn!("Failed to set poh bank, poh service is disconnected");
770    }
771}
772
773#[cfg(test)]
774mod tests {
775    use {
776        super::*,
777        crate::{
778            banking_trace::{BankingTracer, Channels},
779            validator::SchedulerPacing,
780        },
781        agave_banking_stage_ingress_types::BankingPacketBatch,
782        crossbeam_channel::unbounded,
783        itertools::Itertools,
784        solana_entry::entry::{self, EntrySlice},
785        solana_hash::Hash,
786        solana_keypair::Keypair,
787        solana_ledger::{
788            blockstore::Blockstore,
789            genesis_utils::{
790                create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
791            },
792            get_tmp_ledger_path_auto_delete,
793        },
794        solana_perf::packet::to_packet_batches,
795        solana_poh::{
796            poh_recorder::{create_test_recorder, PohRecorderError},
797            record_channels::record_channels,
798            transaction_recorder::RecordTransactionsSummary,
799        },
800        solana_poh_config::PohConfig,
801        solana_pubkey::Pubkey,
802        solana_runtime::{bank::Bank, genesis_utils::bootstrap_validator_stake_lamports},
803        solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
804        solana_signer::Signer,
805        solana_system_transaction as system_transaction,
806        solana_transaction::{sanitized::SanitizedTransaction, Transaction},
807        solana_vote::vote_transaction::new_tower_sync_transaction,
808        solana_vote_program::vote_state::TowerSync,
809        std::{sync::atomic::Ordering, thread::sleep, time::Instant},
810    };
811
812    pub(crate) fn sanitize_transactions(
813        txs: Vec<Transaction>,
814    ) -> Vec<RuntimeTransaction<SanitizedTransaction>> {
815        txs.into_iter()
816            .map(RuntimeTransaction::from_transaction_for_tests)
817            .collect()
818    }
819
820    #[test]
821    fn test_banking_stage_shutdown1() {
822        let genesis_config = create_genesis_config(2).genesis_config;
823        let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
824        let banking_tracer = BankingTracer::new_disabled();
825        let Channels {
826            non_vote_sender,
827            non_vote_receiver,
828            tpu_vote_sender,
829            tpu_vote_receiver,
830            gossip_vote_sender,
831            gossip_vote_receiver,
832        } = banking_tracer.create_channels(false);
833        let ledger_path = get_tmp_ledger_path_auto_delete!();
834        let blockstore = Arc::new(
835            Blockstore::open(ledger_path.path())
836                .expect("Expected to be able to open database ledger"),
837        );
838        let (
839            exit,
840            poh_recorder,
841            _poh_controller,
842            transaction_recorder,
843            poh_service,
844            _entry_receiever,
845        ) = create_test_recorder(bank, blockstore, None, None);
846        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
847
848        let banking_stage = BankingStage::new_num_threads(
849            BlockProductionMethod::CentralScheduler,
850            poh_recorder.clone(),
851            transaction_recorder,
852            non_vote_receiver,
853            tpu_vote_receiver,
854            gossip_vote_receiver,
855            DEFAULT_NUM_WORKERS,
856            SchedulerConfig {
857                scheduler_pacing: SchedulerPacing::Disabled,
858            },
859            None,
860            replay_vote_sender,
861            None,
862            bank_forks,
863            Arc::new(PrioritizationFeeCache::new(0u64)),
864        );
865        drop(non_vote_sender);
866        drop(tpu_vote_sender);
867        drop(gossip_vote_sender);
868        exit.store(true, Ordering::Relaxed);
869        banking_stage.join().unwrap();
870        poh_service.join().unwrap();
871    }
872
873    #[test]
874    fn test_banking_stage_tick() {
875        agave_logger::setup();
876        let GenesisConfigInfo {
877            mut genesis_config, ..
878        } = create_genesis_config(2);
879        genesis_config.ticks_per_slot = 4;
880        let num_extra_ticks = 2;
881        let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
882        let start_hash = bank.last_blockhash();
883        let banking_tracer = BankingTracer::new_disabled();
884        let Channels {
885            non_vote_sender,
886            non_vote_receiver,
887            tpu_vote_sender,
888            tpu_vote_receiver,
889            gossip_vote_sender,
890            gossip_vote_receiver,
891        } = banking_tracer.create_channels(false);
892        let ledger_path = get_tmp_ledger_path_auto_delete!();
893        let blockstore = Arc::new(
894            Blockstore::open(ledger_path.path())
895                .expect("Expected to be able to open database ledger"),
896        );
897        let poh_config = PohConfig {
898            target_tick_count: Some(bank.max_tick_height() + num_extra_ticks),
899            ..PohConfig::default()
900        };
901        let (
902            exit,
903            poh_recorder,
904            _poh_controller,
905            transaction_recorder,
906            poh_service,
907            entry_receiver,
908        ) = create_test_recorder(bank.clone(), blockstore, Some(poh_config), None);
909        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
910
911        let banking_stage = BankingStage::new_num_threads(
912            BlockProductionMethod::CentralScheduler,
913            poh_recorder.clone(),
914            transaction_recorder,
915            non_vote_receiver,
916            tpu_vote_receiver,
917            gossip_vote_receiver,
918            DEFAULT_NUM_WORKERS,
919            SchedulerConfig {
920                scheduler_pacing: SchedulerPacing::Disabled,
921            },
922            None,
923            replay_vote_sender,
924            None,
925            bank_forks,
926            Arc::new(PrioritizationFeeCache::new(0u64)),
927        );
928        trace!("sending bank");
929        drop(non_vote_sender);
930        drop(tpu_vote_sender);
931        drop(gossip_vote_sender);
932        exit.store(true, Ordering::Relaxed);
933        poh_service.join().unwrap();
934        drop(poh_recorder);
935        banking_stage.join().unwrap();
936
937        trace!("getting entries");
938        let entries: Vec<_> = entry_receiver
939            .iter()
940            .map(|(_bank, (entry, _tick_height))| entry)
941            .collect();
942        trace!("done");
943        assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize);
944        assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests()));
945        assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash());
946    }
947
948    #[test]
949    fn test_banking_stage_entries_only_central_scheduler() {
950        agave_logger::setup();
951        let GenesisConfigInfo {
952            genesis_config,
953            mint_keypair,
954            ..
955        } = create_slow_genesis_config(10);
956        let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
957        let start_hash = bank.last_blockhash();
958        let banking_tracer = BankingTracer::new_disabled();
959        let Channels {
960            non_vote_sender,
961            non_vote_receiver,
962            tpu_vote_sender,
963            tpu_vote_receiver,
964            gossip_vote_sender,
965            gossip_vote_receiver,
966        } = banking_tracer.create_channels(false);
967        let ledger_path = get_tmp_ledger_path_auto_delete!();
968        let blockstore = Arc::new(
969            Blockstore::open(ledger_path.path())
970                .expect("Expected to be able to open database ledger"),
971        );
972        let (
973            exit,
974            poh_recorder,
975            _poh_controller,
976            transaction_recorder,
977            poh_service,
978            entry_receiver,
979        ) = create_test_recorder(bank.clone(), blockstore, None, None);
980        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
981
982        let banking_stage = BankingStage::new_num_threads(
983            BlockProductionMethod::CentralScheduler,
984            poh_recorder.clone(),
985            transaction_recorder,
986            non_vote_receiver,
987            tpu_vote_receiver,
988            gossip_vote_receiver,
989            DEFAULT_NUM_WORKERS,
990            SchedulerConfig {
991                scheduler_pacing: SchedulerPacing::Disabled,
992            },
993            None,
994            replay_vote_sender,
995            None,
996            bank_forks.clone(), // keep a local-copy of bank-forks so worker threads do not lose weak access to bank-forks
997            Arc::new(PrioritizationFeeCache::new(0u64)),
998        );
999
1000        // good tx, and no verify
1001        let to = solana_pubkey::new_rand();
1002        let tx_no_ver = system_transaction::transfer(&mint_keypair, &to, 2, start_hash);
1003
1004        // good tx
1005        let to2 = solana_pubkey::new_rand();
1006        let tx = system_transaction::transfer(&mint_keypair, &to2, 1, start_hash);
1007
1008        // bad tx, AccountNotFound
1009        let keypair = Keypair::new();
1010        let to3 = solana_pubkey::new_rand();
1011        let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash);
1012
1013        // send 'em over
1014        let mut packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3);
1015        packet_batches[0]
1016            .first_mut()
1017            .unwrap()
1018            .meta_mut()
1019            .set_discard(true); // set discard on `tx_no_ver`
1020
1021        // glad they all fit
1022        assert_eq!(packet_batches.len(), 1);
1023
1024        non_vote_sender // no_ver, anf, tx
1025            .send(BankingPacketBatch::new(packet_batches))
1026            .unwrap();
1027
1028        // capture the entry receiver until we've received all our entries.
1029        let mut entries = Vec::with_capacity(100);
1030        loop {
1031            if let Ok((_bank, (entry, _))) = entry_receiver.try_recv() {
1032                let tx_entry = !entry.transactions.is_empty();
1033                entries.push(entry);
1034                if tx_entry {
1035                    break; // once we have the entry break. don't expect more than one.
1036                }
1037            }
1038            sleep(Duration::from_millis(10));
1039        }
1040
1041        drop(non_vote_sender);
1042        drop(tpu_vote_sender);
1043        drop(gossip_vote_sender);
1044        banking_stage.join().unwrap();
1045
1046        exit.store(true, Ordering::Relaxed);
1047        poh_service.join().unwrap();
1048        drop(poh_recorder);
1049
1050        let blockhash = start_hash;
1051        let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1052
1053        // receive entries + ticks. The sender has been dropped, so there
1054        // are no more entries that will ever come in after the `iter` here.
1055        entries.extend(
1056            entry_receiver
1057                .iter()
1058                .map(|(_bank, (entry, _tick_height))| entry),
1059        );
1060
1061        assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests()));
1062        for entry in entries {
1063            bank.process_entry_transactions(entry.transactions)
1064                .iter()
1065                .for_each(|x| assert_eq!(*x, Ok(())));
1066        }
1067
1068        assert_eq!(bank.get_balance(&to2), 1);
1069        assert_eq!(bank.get_balance(&to), 0);
1070
1071        drop(entry_receiver);
1072    }
1073
1074    #[test]
1075    fn test_banking_stage_entryfication() {
1076        agave_logger::setup();
1077        // In this attack we'll demonstrate that a verifier can interpret the ledger
1078        // differently if either the server doesn't signal the ledger to add an
1079        // Entry OR if the verifier tries to parallelize across multiple Entries.
1080        let GenesisConfigInfo {
1081            genesis_config,
1082            mint_keypair,
1083            ..
1084        } = create_slow_genesis_config(2);
1085        let banking_tracer = BankingTracer::new_disabled();
1086        let Channels {
1087            non_vote_sender,
1088            non_vote_receiver,
1089            tpu_vote_sender,
1090            tpu_vote_receiver,
1091            gossip_vote_sender,
1092            gossip_vote_receiver,
1093        } = banking_tracer.create_channels(false);
1094
1095        // Process a batch that includes a transaction that receives two lamports.
1096        let alice = Keypair::new();
1097        let tx =
1098            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());
1099
1100        let packet_batches = to_packet_batches(&[tx], 1);
1101        non_vote_sender
1102            .send(BankingPacketBatch::new(packet_batches))
1103            .unwrap();
1104
1105        // Process a second batch that uses the same from account, so conflicts with above TX
1106        let tx =
1107            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
1108        let packet_batches = to_packet_batches(&[tx], 1);
1109        non_vote_sender
1110            .send(BankingPacketBatch::new(packet_batches))
1111            .unwrap();
1112
1113        let ledger_path = get_tmp_ledger_path_auto_delete!();
1114        let blockstore = Arc::new(
1115            Blockstore::open(ledger_path.path())
1116                .expect("Expected to be able to open database ledger"),
1117        );
1118
1119        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
1120        let entry_receiver = {
1121            // start a banking_stage to eat verified receiver
1122            let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1123            let (
1124                exit,
1125                poh_recorder,
1126                _poh_controller,
1127                transaction_recorder,
1128                poh_service,
1129                entry_receiver,
1130            ) = create_test_recorder(bank.clone(), blockstore, None, None);
1131            let _banking_stage = BankingStage::new_num_threads(
1132                BlockProductionMethod::CentralScheduler,
1133                poh_recorder.clone(),
1134                transaction_recorder,
1135                non_vote_receiver,
1136                tpu_vote_receiver,
1137                gossip_vote_receiver,
1138                DEFAULT_NUM_WORKERS,
1139                SchedulerConfig {
1140                    scheduler_pacing: SchedulerPacing::Disabled,
1141                },
1142                None,
1143                replay_vote_sender,
1144                None,
1145                bank_forks,
1146                Arc::new(PrioritizationFeeCache::new(0u64)),
1147            );
1148
1149            // wait for banking_stage to eat the packets
1150            const TIMEOUT: Duration = Duration::from_secs(10);
1151            let start = Instant::now();
1152            while bank.get_balance(&alice.pubkey()) < 1 {
1153                if start.elapsed() > TIMEOUT {
1154                    panic!("banking stage took too long to process transactions");
1155                }
1156                sleep(Duration::from_millis(10));
1157            }
1158            exit.store(true, Ordering::Relaxed);
1159            poh_service.join().unwrap();
1160            entry_receiver
1161        };
1162        drop(non_vote_sender);
1163        drop(tpu_vote_sender);
1164        drop(gossip_vote_sender);
1165
1166        // consume the entire entry_receiver, feed it into a new bank
1167        // check that the balance is what we expect.
1168        let entries: Vec<_> = entry_receiver
1169            .iter()
1170            .map(|(_bank, (entry, _tick_height))| entry)
1171            .collect();
1172
1173        let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1174        for entry in entries {
1175            let _ = bank
1176                .try_process_entry_transactions(entry.transactions)
1177                .expect("All transactions should be processed");
1178        }
1179
1180        // Assert the user doesn't hold three lamports. If the stage only outputs one
1181        // entry, then one of the transactions will be rejected, because it drives
1182        // the account balance below zero before the credit is added.
1183        assert!(bank.get_balance(&alice.pubkey()) != 3);
1184    }
1185
1186    #[test]
1187    fn test_bank_record_transactions() {
1188        agave_logger::setup();
1189
1190        let GenesisConfigInfo {
1191            genesis_config,
1192            mint_keypair,
1193            ..
1194        } = create_genesis_config(10_000);
1195        let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1196
1197        let (record_sender, mut record_receiver) = record_channels(false);
1198        let recorder = TransactionRecorder::new(record_sender);
1199        record_receiver.restart(bank.bank_id());
1200
1201        let pubkey = solana_pubkey::new_rand();
1202        let keypair2 = Keypair::new();
1203        let pubkey2 = solana_pubkey::new_rand();
1204
1205        let txs = vec![
1206            system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()).into(),
1207            system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
1208        ];
1209
1210        let summary = recorder.record_transactions(bank.bank_id(), txs.clone());
1211        assert!(summary.result.is_ok());
1212        assert_eq!(
1213            record_receiver.try_recv().unwrap().transaction_batches,
1214            vec![txs.clone()]
1215        );
1216        assert!(record_receiver.try_recv().is_err());
1217
1218        // Once bank is set to a new bank (setting bank id + 1 in record_transactions),
1219        // record_transactions should throw MaxHeightReached
1220        let next_bank_id = bank.bank_id() + 1;
1221        let RecordTransactionsSummary { result, .. } =
1222            recorder.record_transactions(next_bank_id, txs);
1223        assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
1224        // Should receive nothing from PohRecorder b/c record failed
1225        assert!(record_receiver.try_recv().is_err());
1226    }
1227
1228    pub(crate) fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo {
1229        create_slow_genesis_config_with_leader(lamports, &solana_pubkey::new_rand())
1230    }
1231
1232    pub(crate) fn create_slow_genesis_config_with_leader(
1233        lamports: u64,
1234        validator_pubkey: &Pubkey,
1235    ) -> GenesisConfigInfo {
1236        let mut config_info = create_genesis_config_with_leader(
1237            lamports,
1238            validator_pubkey,
1239            // See solana_ledger::genesis_utils::create_genesis_config.
1240            bootstrap_validator_stake_lamports(),
1241        );
1242
1243        // For these tests there's only 1 slot, don't want to run out of ticks
1244        config_info.genesis_config.ticks_per_slot *= 1024;
1245        config_info
1246    }
1247
1248    #[test]
1249    fn test_vote_storage_full_send() {
1250        agave_logger::setup();
1251        let GenesisConfigInfo {
1252            genesis_config,
1253            mint_keypair,
1254            ..
1255        } = create_slow_genesis_config(10000);
1256        let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1257        let start_hash = bank.last_blockhash();
1258        let banking_tracer = BankingTracer::new_disabled();
1259        let Channels {
1260            non_vote_sender,
1261            non_vote_receiver,
1262            tpu_vote_sender,
1263            tpu_vote_receiver,
1264            gossip_vote_sender,
1265            gossip_vote_receiver,
1266        } = banking_tracer.create_channels(false);
1267        let ledger_path = get_tmp_ledger_path_auto_delete!();
1268        let blockstore = Arc::new(
1269            Blockstore::open(ledger_path.path())
1270                .expect("Expected to be able to open database ledger"),
1271        );
1272        let (
1273            exit,
1274            poh_recorder,
1275            _poh_controller,
1276            transaction_recorder,
1277            poh_service,
1278            _entry_receiver,
1279        ) = create_test_recorder(bank.clone(), blockstore, None, None);
1280        let (replay_vote_sender, _replay_vote_receiver) = unbounded();
1281
1282        let banking_stage = BankingStage::new_num_threads(
1283            BlockProductionMethod::CentralScheduler,
1284            poh_recorder.clone(),
1285            transaction_recorder,
1286            non_vote_receiver,
1287            tpu_vote_receiver,
1288            gossip_vote_receiver,
1289            DEFAULT_NUM_WORKERS,
1290            SchedulerConfig {
1291                scheduler_pacing: SchedulerPacing::Disabled,
1292            },
1293            None,
1294            replay_vote_sender,
1295            None,
1296            bank_forks,
1297            Arc::new(PrioritizationFeeCache::new(0u64)),
1298        );
1299
1300        let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
1301        let vote_keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
1302        for keypair in keypairs.iter() {
1303            bank.process_transaction(&system_transaction::transfer(
1304                &mint_keypair,
1305                &keypair.pubkey(),
1306                20,
1307                start_hash,
1308            ))
1309            .unwrap();
1310        }
1311
1312        // Send a bunch of votes and transfers
1313        let tpu_votes = (0..100_usize)
1314            .map(|i| {
1315                new_tower_sync_transaction(
1316                    TowerSync::from(vec![(0, 8), (1, 7), (i as u64 + 10, 6), (i as u64 + 11, 1)]),
1317                    Hash::new_unique(),
1318                    &keypairs[i],
1319                    &vote_keypairs[i],
1320                    &vote_keypairs[i],
1321                    None,
1322                )
1323            })
1324            .collect_vec();
1325        let gossip_votes = (0..100_usize)
1326            .map(|i| {
1327                new_tower_sync_transaction(
1328                    TowerSync::from(vec![(0, 9), (1, 8), (i as u64 + 5, 6), (i as u64 + 63, 1)]),
1329                    Hash::new_unique(),
1330                    &keypairs[i],
1331                    &vote_keypairs[i],
1332                    &vote_keypairs[i],
1333                    None,
1334                )
1335            })
1336            .collect_vec();
1337        let txs = (0..100_usize)
1338            .map(|i| {
1339                system_transaction::transfer(
1340                    &keypairs[i],
1341                    &keypairs[(i + 1) % 100].pubkey(),
1342                    10,
1343                    start_hash,
1344                );
1345            })
1346            .collect_vec();
1347
1348        let non_vote_packet_batches = to_packet_batches(&txs, 10);
1349        let tpu_packet_batches = to_packet_batches(&tpu_votes, 10);
1350        let gossip_packet_batches = to_packet_batches(&gossip_votes, 10);
1351
1352        // Send em all
1353        [
1354            (non_vote_packet_batches, non_vote_sender),
1355            (tpu_packet_batches, tpu_vote_sender),
1356            (gossip_packet_batches, gossip_vote_sender),
1357        ]
1358        .into_iter()
1359        .map(|(packet_batches, sender)| {
1360            Builder::new()
1361                .spawn(move || {
1362                    sender
1363                        .send(BankingPacketBatch::new(packet_batches))
1364                        .unwrap()
1365                })
1366                .unwrap()
1367        })
1368        .for_each(|handle| handle.join().unwrap());
1369
1370        banking_stage.join().unwrap();
1371        exit.store(true, Ordering::Relaxed);
1372        poh_service.join().unwrap();
1373    }
1374}