1#[cfg(feature = "dev-context-only-utils")]
6use qualifier_attr::qualifiers;
7use {
8 self::{
9 committer::Committer, consumer::Consumer, decision_maker::DecisionMaker,
10 qos_service::QosService, vote_packet_receiver::VotePacketReceiver,
11 vote_storage::VoteStorage,
12 },
13 crate::{
14 banking_stage::{
15 consume_worker::ConsumeWorker,
16 transaction_scheduler::{
17 prio_graph_scheduler::PrioGraphScheduler,
18 scheduler_controller::{
19 SchedulerConfig, SchedulerController, DEFAULT_SCHEDULER_PACING_FILL_TIME_MILLIS,
20 },
21 scheduler_error::SchedulerError,
22 },
23 },
24 validator::BlockProductionMethod,
25 },
26 agave_banking_stage_ingress_types::BankingPacketReceiver,
27 crossbeam_channel::{unbounded, Receiver, Sender},
28 histogram::Histogram,
29 solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery},
30 solana_ledger::blockstore_processor::TransactionStatusSender,
31 solana_perf::packet::PACKETS_PER_BATCH,
32 solana_poh::{
33 poh_controller::PohController, poh_recorder::PohRecorder,
34 transaction_recorder::TransactionRecorder,
35 },
36 solana_pubkey::Pubkey,
37 solana_runtime::{
38 bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
39 vote_sender_types::ReplayVoteSender,
40 },
41 solana_time_utils::AtomicInterval,
42 std::{
43 num::{NonZeroU64, NonZeroUsize, Saturating},
44 ops::Deref,
45 sync::{
46 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
47 Arc, RwLock,
48 },
49 thread::{self, Builder, JoinHandle},
50 time::Duration,
51 },
52 transaction_scheduler::{
53 greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig},
54 prio_graph_scheduler::PrioGraphSchedulerConfig,
55 receive_and_buffer::{ReceiveAndBuffer, TransactionViewReceiveAndBuffer},
56 },
57 vote_worker::VoteWorker,
58};
59
60pub mod committer;
62pub mod consumer;
63pub mod leader_slot_metrics;
64pub mod qos_service;
65pub mod vote_storage;
66
67mod consume_worker;
68mod vote_worker;
69
70#[cfg(feature = "dev-context-only-utils")]
71pub mod decision_maker;
72#[cfg(not(feature = "dev-context-only-utils"))]
73mod decision_maker;
74
75mod latest_validator_vote_packet;
76mod leader_slot_timing_metrics;
77mod read_write_account_set;
78mod vote_packet_receiver;
79
80#[cfg(feature = "dev-context-only-utils")]
81pub mod scheduler_messages;
82#[cfg(not(feature = "dev-context-only-utils"))]
83mod scheduler_messages;
84
85pub mod transaction_scheduler;
86
87#[cfg(feature = "dev-context-only-utils")]
88pub mod unified_scheduler;
89#[cfg(not(feature = "dev-context-only-utils"))]
90pub(crate) mod unified_scheduler;
91
92#[cfg(unix)]
93mod progress_tracker;
94#[cfg(unix)]
95mod tpu_to_pack;
96
97const MAX_NUM_WORKERS: NonZeroUsize = NonZeroUsize::new(64).unwrap();
101const DEFAULT_NUM_WORKERS: NonZeroUsize = NonZeroUsize::new(4).unwrap();
102
103#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
104const TOTAL_BUFFERED_PACKETS: usize = 100_000;
105const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
106
107#[derive(Debug, Default)]
108pub struct BankingStageStats {
109 last_report: AtomicInterval,
110 tpu_counts: VoteSourceCounts,
111 gossip_counts: VoteSourceCounts,
112 pub(crate) dropped_duplicated_packets_count: AtomicUsize,
113 dropped_forward_packets_count: AtomicUsize,
114 current_buffered_packets_count: AtomicUsize,
115 rebuffered_packets_count: AtomicUsize,
116 consumed_buffered_packets_count: AtomicUsize,
117 batch_packet_indexes_len: Histogram,
118
119 consume_buffered_packets_elapsed: AtomicU64,
121 receive_and_buffer_packets_elapsed: AtomicU64,
122 filter_pending_packets_elapsed: AtomicU64,
123 pub(crate) packet_conversion_elapsed: AtomicU64,
124 transaction_processing_elapsed: AtomicU64,
125}
126
127#[derive(Debug, Default)]
128struct VoteSourceCounts {
129 receive_and_buffer_packets_count: AtomicUsize,
130 dropped_packets_count: AtomicUsize,
131 newly_buffered_packets_count: AtomicUsize,
132 newly_buffered_forwarded_packets_count: AtomicUsize,
133}
134
135impl VoteSourceCounts {
136 fn is_empty(&self) -> bool {
137 0 == self
138 .receive_and_buffer_packets_count
139 .load(Ordering::Relaxed)
140 + self.dropped_packets_count.load(Ordering::Relaxed)
141 + self.newly_buffered_packets_count.load(Ordering::Relaxed)
142 + self
143 .newly_buffered_forwarded_packets_count
144 .load(Ordering::Relaxed)
145 }
146}
147
148impl BankingStageStats {
149 pub fn new() -> Self {
150 BankingStageStats {
151 batch_packet_indexes_len: Histogram::configure()
152 .max_value(PACKETS_PER_BATCH as u64)
153 .build()
154 .unwrap(),
155 ..BankingStageStats::default()
156 }
157 }
158
159 fn is_empty(&self) -> bool {
160 self.gossip_counts.is_empty()
161 && self.tpu_counts.is_empty()
162 && 0 == self
163 .dropped_duplicated_packets_count
164 .load(Ordering::Relaxed) as u64
165 + self.dropped_forward_packets_count.load(Ordering::Relaxed) as u64
166 + self.current_buffered_packets_count.load(Ordering::Relaxed) as u64
167 + self.rebuffered_packets_count.load(Ordering::Relaxed) as u64
168 + self.consumed_buffered_packets_count.load(Ordering::Relaxed) as u64
169 + self
170 .consume_buffered_packets_elapsed
171 .load(Ordering::Relaxed)
172 + self
173 .receive_and_buffer_packets_elapsed
174 .load(Ordering::Relaxed)
175 + self.filter_pending_packets_elapsed.load(Ordering::Relaxed)
176 + self.packet_conversion_elapsed.load(Ordering::Relaxed)
177 + self.transaction_processing_elapsed.load(Ordering::Relaxed)
178 + self.batch_packet_indexes_len.entries()
179 }
180
181 fn report(&mut self, report_interval_ms: u64) {
182 if self.is_empty() {
184 return;
185 }
186 if self.last_report.should_update(report_interval_ms) {
187 datapoint_info!(
188 "banking_stage-vote_loop_stats",
189 (
190 "tpu_receive_and_buffer_packets_count",
191 self.tpu_counts
192 .receive_and_buffer_packets_count
193 .swap(0, Ordering::Relaxed),
194 i64
195 ),
196 (
197 "tpu_dropped_packets_count",
198 self.tpu_counts
199 .dropped_packets_count
200 .swap(0, Ordering::Relaxed),
201 i64
202 ),
203 (
204 "tpu_newly_buffered_packets_count",
205 self.tpu_counts
206 .newly_buffered_packets_count
207 .swap(0, Ordering::Relaxed),
208 i64
209 ),
210 (
211 "tpu_newly_buffered_forwarded_packets_count",
212 self.tpu_counts
213 .newly_buffered_forwarded_packets_count
214 .swap(0, Ordering::Relaxed),
215 i64
216 ),
217 (
218 "gossip_receive_and_buffer_packets_count",
219 self.gossip_counts
220 .receive_and_buffer_packets_count
221 .swap(0, Ordering::Relaxed),
222 i64
223 ),
224 (
225 "gossip_dropped_packets_count",
226 self.gossip_counts
227 .dropped_packets_count
228 .swap(0, Ordering::Relaxed),
229 i64
230 ),
231 (
232 "gossip_newly_buffered_packets_count",
233 self.gossip_counts
234 .newly_buffered_packets_count
235 .swap(0, Ordering::Relaxed),
236 i64
237 ),
238 (
239 "gossip_newly_buffered_forwarded_packets_count",
240 self.gossip_counts
241 .newly_buffered_forwarded_packets_count
242 .swap(0, Ordering::Relaxed),
243 i64
244 ),
245 (
246 "dropped_duplicated_packets_count",
247 self.dropped_duplicated_packets_count
248 .swap(0, Ordering::Relaxed),
249 i64
250 ),
251 (
252 "dropped_forward_packets_count",
253 self.dropped_forward_packets_count
254 .swap(0, Ordering::Relaxed),
255 i64
256 ),
257 (
258 "current_buffered_packets_count",
259 self.current_buffered_packets_count
260 .swap(0, Ordering::Relaxed),
261 i64
262 ),
263 (
264 "rebuffered_packets_count",
265 self.rebuffered_packets_count.swap(0, Ordering::Relaxed),
266 i64
267 ),
268 (
269 "consumed_buffered_packets_count",
270 self.consumed_buffered_packets_count
271 .swap(0, Ordering::Relaxed),
272 i64
273 ),
274 (
275 "consume_buffered_packets_elapsed",
276 self.consume_buffered_packets_elapsed
277 .swap(0, Ordering::Relaxed),
278 i64
279 ),
280 (
281 "receive_and_buffer_packets_elapsed",
282 self.receive_and_buffer_packets_elapsed
283 .swap(0, Ordering::Relaxed),
284 i64
285 ),
286 (
287 "filter_pending_packets_elapsed",
288 self.filter_pending_packets_elapsed
289 .swap(0, Ordering::Relaxed),
290 i64
291 ),
292 (
293 "packet_conversion_elapsed",
294 self.packet_conversion_elapsed.swap(0, Ordering::Relaxed),
295 i64
296 ),
297 (
298 "transaction_processing_elapsed",
299 self.transaction_processing_elapsed
300 .swap(0, Ordering::Relaxed),
301 i64
302 ),
303 (
304 "packet_batch_indices_len_min",
305 self.batch_packet_indexes_len.minimum().unwrap_or(0),
306 i64
307 ),
308 (
309 "packet_batch_indices_len_max",
310 self.batch_packet_indexes_len.maximum().unwrap_or(0),
311 i64
312 ),
313 (
314 "packet_batch_indices_len_mean",
315 self.batch_packet_indexes_len.mean().unwrap_or(0),
316 i64
317 ),
318 (
319 "packet_batch_indices_len_90pct",
320 self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0),
321 i64
322 )
323 );
324 self.batch_packet_indexes_len.clear();
325 }
326 }
327}
328
329#[derive(Debug, Default)]
330pub struct BatchedTransactionDetails {
331 pub costs: BatchedTransactionCostDetails,
332 pub errors: BatchedTransactionErrorDetails,
333}
334
335#[derive(Debug, Default)]
336pub struct BatchedTransactionCostDetails {
337 pub batched_signature_cost: Saturating<u64>,
338 pub batched_write_lock_cost: Saturating<u64>,
339 pub batched_data_bytes_cost: Saturating<u64>,
340 pub batched_loaded_accounts_data_size_cost: Saturating<u64>,
341 pub batched_programs_execute_cost: Saturating<u64>,
342}
343
344#[derive(Debug, Default)]
345pub struct BatchedTransactionErrorDetails {
346 pub batched_retried_txs_per_block_limit_count: Saturating<u64>,
347 pub batched_retried_txs_per_vote_limit_count: Saturating<u64>,
348 pub batched_retried_txs_per_account_limit_count: Saturating<u64>,
349 pub batched_retried_txs_per_account_data_block_limit_count: Saturating<u64>,
350 pub batched_dropped_txs_per_account_data_total_limit_count: Saturating<u64>,
351}
352
353pub struct BankingStage {
354 context: Option<BankingStageContext>,
356 thread_hdls: Vec<JoinHandle<()>>,
357}
358
359pub trait LikeClusterInfo: Send + Sync + 'static + Clone {
360 fn id(&self) -> Pubkey;
361
362 fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R>;
363}
364
365impl LikeClusterInfo for Arc<ClusterInfo> {
366 fn id(&self) -> Pubkey {
367 self.deref().id()
368 }
369
370 fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R> {
371 self.deref().lookup_contact_info(id, query)
372 }
373}
374
375impl BankingStage {
376 #[allow(clippy::too_many_arguments)]
377 pub fn new_num_threads(
378 block_production_method: BlockProductionMethod,
379 poh_recorder: Arc<RwLock<PohRecorder>>,
380 transaction_recorder: TransactionRecorder,
381 non_vote_receiver: BankingPacketReceiver,
382 tpu_vote_receiver: BankingPacketReceiver,
383 gossip_vote_receiver: BankingPacketReceiver,
384 num_workers: NonZeroUsize,
385 scheduler_config: SchedulerConfig,
386 transaction_status_sender: Option<TransactionStatusSender>,
387 replay_vote_sender: ReplayVoteSender,
388 log_messages_bytes_limit: Option<usize>,
389 bank_forks: Arc<RwLock<BankForks>>,
390 prioritization_fee_cache: Arc<PrioritizationFeeCache>,
391 ) -> Self {
392 let committer = Committer::new(
393 transaction_status_sender,
394 replay_vote_sender,
395 prioritization_fee_cache,
396 );
397
398 let context = BankingStageContext {
399 exit_signal: Arc::new(AtomicBool::new(false)),
400 tpu_vote_receiver,
401 gossip_vote_receiver,
402 non_vote_receiver,
403 transaction_recorder,
404 poh_recorder,
405 bank_forks,
406 committer,
407 log_messages_bytes_limit,
408 };
409 let mut thread_hdls = Vec::with_capacity(num_workers.get() + 2);
412 thread_hdls.push(Self::spawn_vote_worker(&context));
413
414 let receive_and_buffer = TransactionViewReceiveAndBuffer {
415 receiver: context.non_vote_receiver.clone(),
416 bank_forks: context.bank_forks.clone(),
417 };
418 Self::spawn_scheduler_and_workers(
419 &mut thread_hdls,
420 receive_and_buffer,
421 matches!(
422 block_production_method,
423 BlockProductionMethod::CentralSchedulerGreedy
424 ),
425 num_workers,
426 scheduler_config,
427 &context,
428 );
429
430 Self {
431 context: Some(context),
432 thread_hdls,
433 }
434 }
435
436 pub fn spawn_internal_threads(
438 &mut self,
439 block_production_method: BlockProductionMethod,
440 num_workers: NonZeroUsize,
441 scheduler_config: SchedulerConfig,
442 ) -> thread::Result<()> {
443 if let Some(context) = self.context.as_ref() {
444 info!("Shutting down banking stage threads");
445 context.exit_signal.store(true, Ordering::Relaxed);
446 for bank_thread_hdl in self.thread_hdls.drain(..) {
447 bank_thread_hdl.join()?;
448 }
449
450 info!(
451 "Spawning new banking stage threads with block-production-method: \
452 {block_production_method:?} num-workers: {num_workers}"
453 );
454 context.exit_signal.store(false, Ordering::Relaxed);
455 self.thread_hdls.push(Self::spawn_vote_worker(context));
456
457 let receive_and_buffer = TransactionViewReceiveAndBuffer {
458 receiver: context.non_vote_receiver.clone(),
459 bank_forks: context.bank_forks.clone(),
460 };
461 Self::spawn_scheduler_and_workers(
462 &mut self.thread_hdls,
463 receive_and_buffer,
464 matches!(
465 block_production_method,
466 BlockProductionMethod::CentralSchedulerGreedy
467 ),
468 num_workers,
469 scheduler_config,
470 context,
471 );
472 }
473
474 Ok(())
475 }
476
477 fn spawn_scheduler_and_workers<R: ReceiveAndBuffer + Send + Sync + 'static>(
478 non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
479 receive_and_buffer: R,
480 use_greedy_scheduler: bool,
481 num_workers: NonZeroUsize,
482 scheduler_config: SchedulerConfig,
483 context: &BankingStageContext,
484 ) {
485 assert!(num_workers <= BankingStage::max_num_workers());
486 let num_workers = num_workers.get();
487
488 let exit = context.exit_signal.clone();
489
490 let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
492 (0..num_workers).map(|_| unbounded()).unzip();
493 let (finished_work_sender, finished_work_receiver) = unbounded();
494
495 let decision_maker = DecisionMaker::from(context.poh_recorder.read().unwrap().deref());
497 let mut worker_metrics = Vec::with_capacity(num_workers);
498 for (index, work_receiver) in work_receivers.into_iter().enumerate() {
499 let id = index as u32;
500 let consume_worker = ConsumeWorker::new(
501 id,
502 exit.clone(),
503 work_receiver,
504 Consumer::new(
505 context.committer.clone(),
506 context.transaction_recorder.clone(),
507 QosService::new(id),
508 context.log_messages_bytes_limit,
509 ),
510 finished_work_sender.clone(),
511 context.poh_recorder.read().unwrap().shared_leader_state(),
512 );
513
514 worker_metrics.push(consume_worker.metrics_handle());
515 non_vote_thread_hdls.push(
516 Builder::new()
517 .name(format!("solCoWorker{id:02}"))
518 .spawn(move || {
519 let _ = consume_worker.run();
520 })
521 .unwrap(),
522 )
523 }
524
525 macro_rules! spawn_scheduler {
529 ($scheduler:ident) => {
530 let exit = exit.clone();
531 let bank_forks = context.bank_forks.clone();
532 non_vote_thread_hdls.push(
533 Builder::new()
534 .name("solBnkTxSched".to_string())
535 .spawn(move || {
536 let scheduler_controller = SchedulerController::new(
537 exit,
538 scheduler_config,
539 decision_maker,
540 receive_and_buffer,
541 bank_forks,
542 $scheduler,
543 worker_metrics,
544 );
545
546 match scheduler_controller.run() {
547 Ok(_) => {}
548 Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
549 Err(SchedulerError::DisconnectedSendChannel(_)) => {
550 warn!("Unexpected worker disconnect from scheduler")
551 }
552 }
553 })
554 .unwrap(),
555 );
556 };
557 }
558
559 if use_greedy_scheduler {
561 let scheduler = GreedyScheduler::new(
562 work_senders,
563 finished_work_receiver,
564 GreedySchedulerConfig::default(),
565 );
566 spawn_scheduler!(scheduler);
567 } else {
568 let scheduler = PrioGraphScheduler::new(
569 work_senders,
570 finished_work_receiver,
571 PrioGraphSchedulerConfig::default(),
572 );
573 spawn_scheduler!(scheduler);
574 }
575 }
576
577 fn spawn_vote_worker(context: &BankingStageContext) -> JoinHandle<()> {
578 let vote_storage = VoteStorage::new(&context.bank_forks.read().unwrap().working_bank());
579 let tpu_receiver = VotePacketReceiver::new(context.tpu_vote_receiver.clone());
580 let gossip_receiver = VotePacketReceiver::new(context.gossip_vote_receiver.clone());
581 let consumer = Consumer::new(
582 context.committer.clone(),
583 context.transaction_recorder.clone(),
584 QosService::new(0),
585 context.log_messages_bytes_limit,
586 );
587 let decision_maker = DecisionMaker::from(context.poh_recorder.read().unwrap().deref());
588
589 let exit_signal = context.exit_signal.clone();
590 let bank_forks = context.bank_forks.clone();
591 Builder::new()
592 .name("solBanknStgVote".to_string())
593 .spawn(move || {
594 VoteWorker::new(
595 exit_signal,
596 decision_maker,
597 tpu_receiver,
598 gossip_receiver,
599 vote_storage,
600 bank_forks,
601 consumer,
602 )
603 .run()
604 })
605 .unwrap()
606 }
607
608 pub fn default_num_workers() -> NonZeroUsize {
609 DEFAULT_NUM_WORKERS
610 }
611
612 pub const fn max_num_workers() -> NonZeroUsize {
613 MAX_NUM_WORKERS
614 }
615
616 pub const fn default_fill_time_millis() -> NonZeroU64 {
617 DEFAULT_SCHEDULER_PACING_FILL_TIME_MILLIS
618 }
619
620 pub fn join(mut self) -> thread::Result<()> {
621 self.context
622 .take()
623 .expect("non-vote context must be Some")
624 .exit_signal
625 .store(true, Ordering::Relaxed);
626 for bank_thread_hdl in self.thread_hdls {
627 bank_thread_hdl.join()?;
628 }
629 Ok(())
630 }
631}
632
633#[cfg(unix)]
634mod external {
635 use {
636 super::*,
637 crate::banking_stage::consume_worker::external::ExternalWorker,
638 agave_scheduling_utils::handshake::server::{AgaveSession, AgaveWorkerSession},
639 tpu_to_pack::BankingPacketReceivers,
640 };
641
642 impl BankingStage {
643 pub fn spawn_external_threads(
645 &mut self,
646 AgaveSession {
647 tpu_to_pack,
648 progress_tracker,
649 workers,
650 }: AgaveSession,
651 ) -> thread::Result<()> {
652 if let Some(context) = self.context.as_ref() {
653 info!("Shutting down banking stage threads");
655 context.exit_signal.store(true, Ordering::Relaxed);
656 for bank_thread_hdl in self.thread_hdls.drain(..) {
657 bank_thread_hdl.join()?;
658 }
659
660 context.exit_signal.store(false, Ordering::Relaxed);
661
662 self.thread_hdls.push(Self::spawn_vote_worker(context));
664 Self::spawn_external_workers(&mut self.thread_hdls, context, workers);
665
666 self.thread_hdls.push(tpu_to_pack::spawn(
668 context.exit_signal.clone(),
669 BankingPacketReceivers {
670 non_vote_receiver: context.non_vote_receiver.clone(),
671 gossip_vote_receiver: None,
672 tpu_vote_receiver: None,
673 },
674 tpu_to_pack,
675 ));
676
677 let (shared_leader_state, ticks_per_slot) = {
679 let poh = context.poh_recorder.read().unwrap();
680
681 (poh.shared_leader_state(), poh.ticks_per_slot())
682 };
683 self.thread_hdls.push(progress_tracker::spawn(
684 context.exit_signal.clone(),
685 progress_tracker,
686 shared_leader_state,
687 ticks_per_slot,
688 ));
689 }
690
691 Ok(())
692 }
693
694 fn spawn_external_workers(
695 non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
696 context: &BankingStageContext,
697 workers: Vec<AgaveWorkerSession>,
698 ) {
699 static_assertions::const_assert!(
700 agave_scheduling_utils::handshake::MAX_WORKERS
701 == BankingStage::max_num_workers().get()
702 );
703 assert!(workers.len() <= BankingStage::max_num_workers().get());
704
705 let mut worker_metrics = Vec::with_capacity(workers.len());
707 for (
708 index,
709 AgaveWorkerSession {
710 allocator,
711 pack_to_worker,
712 worker_to_pack,
713 },
714 ) in workers.into_iter().enumerate()
715 {
716 let id = index as u32;
717 let consume_worker = ExternalWorker::new(
718 id,
719 context.exit_signal.clone(),
720 pack_to_worker,
721 Consumer::new(
722 context.committer.clone(),
723 context.transaction_recorder.clone(),
724 QosService::new(id),
725 context.log_messages_bytes_limit,
726 ),
727 worker_to_pack,
728 allocator,
729 context.poh_recorder.read().unwrap().shared_leader_state(),
730 context.bank_forks.read().unwrap().sharable_banks(),
731 );
732
733 worker_metrics.push(consume_worker.metrics_handle());
734 non_vote_thread_hdls.push(
735 Builder::new()
736 .name(format!("solECoWorker{id:02}"))
737 .spawn(move || {
738 let _ = consume_worker.run();
739 })
740 .unwrap(),
741 )
742 }
743 }
744 }
745}
746
747#[derive(Clone)]
749struct BankingStageContext {
750 exit_signal: Arc<AtomicBool>,
751 tpu_vote_receiver: BankingPacketReceiver,
752 gossip_vote_receiver: BankingPacketReceiver,
753 non_vote_receiver: BankingPacketReceiver,
754 transaction_recorder: TransactionRecorder,
755 poh_recorder: Arc<RwLock<PohRecorder>>,
756 bank_forks: Arc<RwLock<BankForks>>,
757 committer: Committer,
758 log_messages_bytes_limit: Option<usize>,
759}
760
761#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
762pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
763 bank_forks: &RwLock<BankForks>,
764 poh_controller: &mut PohController,
765 tpu_bank: Bank,
766) {
767 let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
768 if poh_controller.set_bank(tpu_bank).is_err() {
769 warn!("Failed to set poh bank, poh service is disconnected");
770 }
771}
772
773#[cfg(test)]
774mod tests {
775 use {
776 super::*,
777 crate::{
778 banking_trace::{BankingTracer, Channels},
779 validator::SchedulerPacing,
780 },
781 agave_banking_stage_ingress_types::BankingPacketBatch,
782 crossbeam_channel::unbounded,
783 itertools::Itertools,
784 solana_entry::entry::{self, EntrySlice},
785 solana_hash::Hash,
786 solana_keypair::Keypair,
787 solana_ledger::{
788 blockstore::Blockstore,
789 genesis_utils::{
790 create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
791 },
792 get_tmp_ledger_path_auto_delete,
793 },
794 solana_perf::packet::to_packet_batches,
795 solana_poh::{
796 poh_recorder::{create_test_recorder, PohRecorderError},
797 record_channels::record_channels,
798 transaction_recorder::RecordTransactionsSummary,
799 },
800 solana_poh_config::PohConfig,
801 solana_pubkey::Pubkey,
802 solana_runtime::{bank::Bank, genesis_utils::bootstrap_validator_stake_lamports},
803 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
804 solana_signer::Signer,
805 solana_system_transaction as system_transaction,
806 solana_transaction::{sanitized::SanitizedTransaction, Transaction},
807 solana_vote::vote_transaction::new_tower_sync_transaction,
808 solana_vote_program::vote_state::TowerSync,
809 std::{sync::atomic::Ordering, thread::sleep, time::Instant},
810 };
811
812 pub(crate) fn sanitize_transactions(
813 txs: Vec<Transaction>,
814 ) -> Vec<RuntimeTransaction<SanitizedTransaction>> {
815 txs.into_iter()
816 .map(RuntimeTransaction::from_transaction_for_tests)
817 .collect()
818 }
819
820 #[test]
821 fn test_banking_stage_shutdown1() {
822 let genesis_config = create_genesis_config(2).genesis_config;
823 let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
824 let banking_tracer = BankingTracer::new_disabled();
825 let Channels {
826 non_vote_sender,
827 non_vote_receiver,
828 tpu_vote_sender,
829 tpu_vote_receiver,
830 gossip_vote_sender,
831 gossip_vote_receiver,
832 } = banking_tracer.create_channels(false);
833 let ledger_path = get_tmp_ledger_path_auto_delete!();
834 let blockstore = Arc::new(
835 Blockstore::open(ledger_path.path())
836 .expect("Expected to be able to open database ledger"),
837 );
838 let (
839 exit,
840 poh_recorder,
841 _poh_controller,
842 transaction_recorder,
843 poh_service,
844 _entry_receiever,
845 ) = create_test_recorder(bank, blockstore, None, None);
846 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
847
848 let banking_stage = BankingStage::new_num_threads(
849 BlockProductionMethod::CentralScheduler,
850 poh_recorder.clone(),
851 transaction_recorder,
852 non_vote_receiver,
853 tpu_vote_receiver,
854 gossip_vote_receiver,
855 DEFAULT_NUM_WORKERS,
856 SchedulerConfig {
857 scheduler_pacing: SchedulerPacing::Disabled,
858 },
859 None,
860 replay_vote_sender,
861 None,
862 bank_forks,
863 Arc::new(PrioritizationFeeCache::new(0u64)),
864 );
865 drop(non_vote_sender);
866 drop(tpu_vote_sender);
867 drop(gossip_vote_sender);
868 exit.store(true, Ordering::Relaxed);
869 banking_stage.join().unwrap();
870 poh_service.join().unwrap();
871 }
872
873 #[test]
874 fn test_banking_stage_tick() {
875 agave_logger::setup();
876 let GenesisConfigInfo {
877 mut genesis_config, ..
878 } = create_genesis_config(2);
879 genesis_config.ticks_per_slot = 4;
880 let num_extra_ticks = 2;
881 let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
882 let start_hash = bank.last_blockhash();
883 let banking_tracer = BankingTracer::new_disabled();
884 let Channels {
885 non_vote_sender,
886 non_vote_receiver,
887 tpu_vote_sender,
888 tpu_vote_receiver,
889 gossip_vote_sender,
890 gossip_vote_receiver,
891 } = banking_tracer.create_channels(false);
892 let ledger_path = get_tmp_ledger_path_auto_delete!();
893 let blockstore = Arc::new(
894 Blockstore::open(ledger_path.path())
895 .expect("Expected to be able to open database ledger"),
896 );
897 let poh_config = PohConfig {
898 target_tick_count: Some(bank.max_tick_height() + num_extra_ticks),
899 ..PohConfig::default()
900 };
901 let (
902 exit,
903 poh_recorder,
904 _poh_controller,
905 transaction_recorder,
906 poh_service,
907 entry_receiver,
908 ) = create_test_recorder(bank.clone(), blockstore, Some(poh_config), None);
909 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
910
911 let banking_stage = BankingStage::new_num_threads(
912 BlockProductionMethod::CentralScheduler,
913 poh_recorder.clone(),
914 transaction_recorder,
915 non_vote_receiver,
916 tpu_vote_receiver,
917 gossip_vote_receiver,
918 DEFAULT_NUM_WORKERS,
919 SchedulerConfig {
920 scheduler_pacing: SchedulerPacing::Disabled,
921 },
922 None,
923 replay_vote_sender,
924 None,
925 bank_forks,
926 Arc::new(PrioritizationFeeCache::new(0u64)),
927 );
928 trace!("sending bank");
929 drop(non_vote_sender);
930 drop(tpu_vote_sender);
931 drop(gossip_vote_sender);
932 exit.store(true, Ordering::Relaxed);
933 poh_service.join().unwrap();
934 drop(poh_recorder);
935 banking_stage.join().unwrap();
936
937 trace!("getting entries");
938 let entries: Vec<_> = entry_receiver
939 .iter()
940 .map(|(_bank, (entry, _tick_height))| entry)
941 .collect();
942 trace!("done");
943 assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize);
944 assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests()));
945 assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash());
946 }
947
948 #[test]
949 fn test_banking_stage_entries_only_central_scheduler() {
950 agave_logger::setup();
951 let GenesisConfigInfo {
952 genesis_config,
953 mint_keypair,
954 ..
955 } = create_slow_genesis_config(10);
956 let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
957 let start_hash = bank.last_blockhash();
958 let banking_tracer = BankingTracer::new_disabled();
959 let Channels {
960 non_vote_sender,
961 non_vote_receiver,
962 tpu_vote_sender,
963 tpu_vote_receiver,
964 gossip_vote_sender,
965 gossip_vote_receiver,
966 } = banking_tracer.create_channels(false);
967 let ledger_path = get_tmp_ledger_path_auto_delete!();
968 let blockstore = Arc::new(
969 Blockstore::open(ledger_path.path())
970 .expect("Expected to be able to open database ledger"),
971 );
972 let (
973 exit,
974 poh_recorder,
975 _poh_controller,
976 transaction_recorder,
977 poh_service,
978 entry_receiver,
979 ) = create_test_recorder(bank.clone(), blockstore, None, None);
980 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
981
982 let banking_stage = BankingStage::new_num_threads(
983 BlockProductionMethod::CentralScheduler,
984 poh_recorder.clone(),
985 transaction_recorder,
986 non_vote_receiver,
987 tpu_vote_receiver,
988 gossip_vote_receiver,
989 DEFAULT_NUM_WORKERS,
990 SchedulerConfig {
991 scheduler_pacing: SchedulerPacing::Disabled,
992 },
993 None,
994 replay_vote_sender,
995 None,
996 bank_forks.clone(), Arc::new(PrioritizationFeeCache::new(0u64)),
998 );
999
1000 let to = solana_pubkey::new_rand();
1002 let tx_no_ver = system_transaction::transfer(&mint_keypair, &to, 2, start_hash);
1003
1004 let to2 = solana_pubkey::new_rand();
1006 let tx = system_transaction::transfer(&mint_keypair, &to2, 1, start_hash);
1007
1008 let keypair = Keypair::new();
1010 let to3 = solana_pubkey::new_rand();
1011 let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash);
1012
1013 let mut packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3);
1015 packet_batches[0]
1016 .first_mut()
1017 .unwrap()
1018 .meta_mut()
1019 .set_discard(true); assert_eq!(packet_batches.len(), 1);
1023
1024 non_vote_sender .send(BankingPacketBatch::new(packet_batches))
1026 .unwrap();
1027
1028 let mut entries = Vec::with_capacity(100);
1030 loop {
1031 if let Ok((_bank, (entry, _))) = entry_receiver.try_recv() {
1032 let tx_entry = !entry.transactions.is_empty();
1033 entries.push(entry);
1034 if tx_entry {
1035 break; }
1037 }
1038 sleep(Duration::from_millis(10));
1039 }
1040
1041 drop(non_vote_sender);
1042 drop(tpu_vote_sender);
1043 drop(gossip_vote_sender);
1044 banking_stage.join().unwrap();
1045
1046 exit.store(true, Ordering::Relaxed);
1047 poh_service.join().unwrap();
1048 drop(poh_recorder);
1049
1050 let blockhash = start_hash;
1051 let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1052
1053 entries.extend(
1056 entry_receiver
1057 .iter()
1058 .map(|(_bank, (entry, _tick_height))| entry),
1059 );
1060
1061 assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests()));
1062 for entry in entries {
1063 bank.process_entry_transactions(entry.transactions)
1064 .iter()
1065 .for_each(|x| assert_eq!(*x, Ok(())));
1066 }
1067
1068 assert_eq!(bank.get_balance(&to2), 1);
1069 assert_eq!(bank.get_balance(&to), 0);
1070
1071 drop(entry_receiver);
1072 }
1073
1074 #[test]
1075 fn test_banking_stage_entryfication() {
1076 agave_logger::setup();
1077 let GenesisConfigInfo {
1081 genesis_config,
1082 mint_keypair,
1083 ..
1084 } = create_slow_genesis_config(2);
1085 let banking_tracer = BankingTracer::new_disabled();
1086 let Channels {
1087 non_vote_sender,
1088 non_vote_receiver,
1089 tpu_vote_sender,
1090 tpu_vote_receiver,
1091 gossip_vote_sender,
1092 gossip_vote_receiver,
1093 } = banking_tracer.create_channels(false);
1094
1095 let alice = Keypair::new();
1097 let tx =
1098 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());
1099
1100 let packet_batches = to_packet_batches(&[tx], 1);
1101 non_vote_sender
1102 .send(BankingPacketBatch::new(packet_batches))
1103 .unwrap();
1104
1105 let tx =
1107 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
1108 let packet_batches = to_packet_batches(&[tx], 1);
1109 non_vote_sender
1110 .send(BankingPacketBatch::new(packet_batches))
1111 .unwrap();
1112
1113 let ledger_path = get_tmp_ledger_path_auto_delete!();
1114 let blockstore = Arc::new(
1115 Blockstore::open(ledger_path.path())
1116 .expect("Expected to be able to open database ledger"),
1117 );
1118
1119 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
1120 let entry_receiver = {
1121 let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1123 let (
1124 exit,
1125 poh_recorder,
1126 _poh_controller,
1127 transaction_recorder,
1128 poh_service,
1129 entry_receiver,
1130 ) = create_test_recorder(bank.clone(), blockstore, None, None);
1131 let _banking_stage = BankingStage::new_num_threads(
1132 BlockProductionMethod::CentralScheduler,
1133 poh_recorder.clone(),
1134 transaction_recorder,
1135 non_vote_receiver,
1136 tpu_vote_receiver,
1137 gossip_vote_receiver,
1138 DEFAULT_NUM_WORKERS,
1139 SchedulerConfig {
1140 scheduler_pacing: SchedulerPacing::Disabled,
1141 },
1142 None,
1143 replay_vote_sender,
1144 None,
1145 bank_forks,
1146 Arc::new(PrioritizationFeeCache::new(0u64)),
1147 );
1148
1149 const TIMEOUT: Duration = Duration::from_secs(10);
1151 let start = Instant::now();
1152 while bank.get_balance(&alice.pubkey()) < 1 {
1153 if start.elapsed() > TIMEOUT {
1154 panic!("banking stage took too long to process transactions");
1155 }
1156 sleep(Duration::from_millis(10));
1157 }
1158 exit.store(true, Ordering::Relaxed);
1159 poh_service.join().unwrap();
1160 entry_receiver
1161 };
1162 drop(non_vote_sender);
1163 drop(tpu_vote_sender);
1164 drop(gossip_vote_sender);
1165
1166 let entries: Vec<_> = entry_receiver
1169 .iter()
1170 .map(|(_bank, (entry, _tick_height))| entry)
1171 .collect();
1172
1173 let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1174 for entry in entries {
1175 let _ = bank
1176 .try_process_entry_transactions(entry.transactions)
1177 .expect("All transactions should be processed");
1178 }
1179
1180 assert!(bank.get_balance(&alice.pubkey()) != 3);
1184 }
1185
1186 #[test]
1187 fn test_bank_record_transactions() {
1188 agave_logger::setup();
1189
1190 let GenesisConfigInfo {
1191 genesis_config,
1192 mint_keypair,
1193 ..
1194 } = create_genesis_config(10_000);
1195 let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1196
1197 let (record_sender, mut record_receiver) = record_channels(false);
1198 let recorder = TransactionRecorder::new(record_sender);
1199 record_receiver.restart(bank.bank_id());
1200
1201 let pubkey = solana_pubkey::new_rand();
1202 let keypair2 = Keypair::new();
1203 let pubkey2 = solana_pubkey::new_rand();
1204
1205 let txs = vec![
1206 system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()).into(),
1207 system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
1208 ];
1209
1210 let summary = recorder.record_transactions(bank.bank_id(), txs.clone());
1211 assert!(summary.result.is_ok());
1212 assert_eq!(
1213 record_receiver.try_recv().unwrap().transaction_batches,
1214 vec![txs.clone()]
1215 );
1216 assert!(record_receiver.try_recv().is_err());
1217
1218 let next_bank_id = bank.bank_id() + 1;
1221 let RecordTransactionsSummary { result, .. } =
1222 recorder.record_transactions(next_bank_id, txs);
1223 assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
1224 assert!(record_receiver.try_recv().is_err());
1226 }
1227
1228 pub(crate) fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo {
1229 create_slow_genesis_config_with_leader(lamports, &solana_pubkey::new_rand())
1230 }
1231
1232 pub(crate) fn create_slow_genesis_config_with_leader(
1233 lamports: u64,
1234 validator_pubkey: &Pubkey,
1235 ) -> GenesisConfigInfo {
1236 let mut config_info = create_genesis_config_with_leader(
1237 lamports,
1238 validator_pubkey,
1239 bootstrap_validator_stake_lamports(),
1241 );
1242
1243 config_info.genesis_config.ticks_per_slot *= 1024;
1245 config_info
1246 }
1247
1248 #[test]
1249 fn test_vote_storage_full_send() {
1250 agave_logger::setup();
1251 let GenesisConfigInfo {
1252 genesis_config,
1253 mint_keypair,
1254 ..
1255 } = create_slow_genesis_config(10000);
1256 let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
1257 let start_hash = bank.last_blockhash();
1258 let banking_tracer = BankingTracer::new_disabled();
1259 let Channels {
1260 non_vote_sender,
1261 non_vote_receiver,
1262 tpu_vote_sender,
1263 tpu_vote_receiver,
1264 gossip_vote_sender,
1265 gossip_vote_receiver,
1266 } = banking_tracer.create_channels(false);
1267 let ledger_path = get_tmp_ledger_path_auto_delete!();
1268 let blockstore = Arc::new(
1269 Blockstore::open(ledger_path.path())
1270 .expect("Expected to be able to open database ledger"),
1271 );
1272 let (
1273 exit,
1274 poh_recorder,
1275 _poh_controller,
1276 transaction_recorder,
1277 poh_service,
1278 _entry_receiver,
1279 ) = create_test_recorder(bank.clone(), blockstore, None, None);
1280 let (replay_vote_sender, _replay_vote_receiver) = unbounded();
1281
1282 let banking_stage = BankingStage::new_num_threads(
1283 BlockProductionMethod::CentralScheduler,
1284 poh_recorder.clone(),
1285 transaction_recorder,
1286 non_vote_receiver,
1287 tpu_vote_receiver,
1288 gossip_vote_receiver,
1289 DEFAULT_NUM_WORKERS,
1290 SchedulerConfig {
1291 scheduler_pacing: SchedulerPacing::Disabled,
1292 },
1293 None,
1294 replay_vote_sender,
1295 None,
1296 bank_forks,
1297 Arc::new(PrioritizationFeeCache::new(0u64)),
1298 );
1299
1300 let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
1301 let vote_keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
1302 for keypair in keypairs.iter() {
1303 bank.process_transaction(&system_transaction::transfer(
1304 &mint_keypair,
1305 &keypair.pubkey(),
1306 20,
1307 start_hash,
1308 ))
1309 .unwrap();
1310 }
1311
1312 let tpu_votes = (0..100_usize)
1314 .map(|i| {
1315 new_tower_sync_transaction(
1316 TowerSync::from(vec![(0, 8), (1, 7), (i as u64 + 10, 6), (i as u64 + 11, 1)]),
1317 Hash::new_unique(),
1318 &keypairs[i],
1319 &vote_keypairs[i],
1320 &vote_keypairs[i],
1321 None,
1322 )
1323 })
1324 .collect_vec();
1325 let gossip_votes = (0..100_usize)
1326 .map(|i| {
1327 new_tower_sync_transaction(
1328 TowerSync::from(vec![(0, 9), (1, 8), (i as u64 + 5, 6), (i as u64 + 63, 1)]),
1329 Hash::new_unique(),
1330 &keypairs[i],
1331 &vote_keypairs[i],
1332 &vote_keypairs[i],
1333 None,
1334 )
1335 })
1336 .collect_vec();
1337 let txs = (0..100_usize)
1338 .map(|i| {
1339 system_transaction::transfer(
1340 &keypairs[i],
1341 &keypairs[(i + 1) % 100].pubkey(),
1342 10,
1343 start_hash,
1344 );
1345 })
1346 .collect_vec();
1347
1348 let non_vote_packet_batches = to_packet_batches(&txs, 10);
1349 let tpu_packet_batches = to_packet_batches(&tpu_votes, 10);
1350 let gossip_packet_batches = to_packet_batches(&gossip_votes, 10);
1351
1352 [
1354 (non_vote_packet_batches, non_vote_sender),
1355 (tpu_packet_batches, tpu_vote_sender),
1356 (gossip_packet_batches, gossip_vote_sender),
1357 ]
1358 .into_iter()
1359 .map(|(packet_batches, sender)| {
1360 Builder::new()
1361 .spawn(move || {
1362 sender
1363 .send(BankingPacketBatch::new(packet_batches))
1364 .unwrap()
1365 })
1366 .unwrap()
1367 })
1368 .for_each(|handle| handle.join().unwrap());
1369
1370 banking_stage.join().unwrap();
1371 exit.store(true, Ordering::Relaxed);
1372 poh_service.join().unwrap();
1373 }
1374}