solana_core/
tpu.rs

1//! The `tpu` module implements the Transaction Processing Unit, a
2//! multi-stage transaction processing pipeline in software.
3
4pub use crate::forwarding_stage::ForwardingClientOption;
5use {
6    crate::{
7        admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
8        banking_stage::{
9            transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
10        },
11        banking_trace::{Channels, TracerThread},
12        cluster_info_vote_listener::{
13            ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
14            VerifiedVoteSender, VoteTracker,
15        },
16        fetch_stage::FetchStage,
17        forwarding_stage::{
18            spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
19        },
20        sigverify::TransactionSigVerifier,
21        sigverify_stage::SigVerifyStage,
22        staked_nodes_updater_service::StakedNodesUpdaterService,
23        tpu_entry_notifier::TpuEntryNotifier,
24        validator::{BlockProductionMethod, GeneratorConfig},
25        vortexor_receiver_adapter::VortexorReceiverAdapter,
26    },
27    bytes::Bytes,
28    crossbeam_channel::{bounded, unbounded, Receiver},
29    solana_clock::Slot,
30    solana_gossip::cluster_info::ClusterInfo,
31    solana_keypair::Keypair,
32    solana_ledger::{
33        blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
34        entry_notifier_service::EntryNotifierSender,
35    },
36    solana_perf::data_budget::DataBudget,
37    solana_poh::{
38        poh_recorder::{PohRecorder, WorkingBankEntry},
39        transaction_recorder::TransactionRecorder,
40    },
41    solana_pubkey::Pubkey,
42    solana_rpc::{
43        optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
44        rpc_subscriptions::RpcSubscriptions,
45    },
46    solana_runtime::{
47        bank_forks::BankForks,
48        prioritization_fee_cache::PrioritizationFeeCache,
49        vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
50    },
51    solana_streamer::{
52        quic::{
53            spawn_server_with_cancel, spawn_simple_qos_server_with_cancel,
54            SimpleQosQuicStreamerConfig, SpawnServerResult, SwQosQuicStreamerConfig,
55        },
56        streamer::StakedNodes,
57    },
58    solana_turbine::{
59        broadcast_stage::{BroadcastStage, BroadcastStageType},
60        xdp::XdpSender,
61    },
62    std::{
63        collections::HashMap,
64        net::{SocketAddr, UdpSocket},
65        num::NonZeroUsize,
66        sync::{atomic::AtomicBool, Arc, RwLock},
67        thread::{self, JoinHandle},
68        time::Duration,
69    },
70    tokio::sync::mpsc::Sender as AsyncSender,
71    tokio_util::sync::CancellationToken,
72};
73
74pub struct TpuSockets {
75    pub transactions: Vec<UdpSocket>,
76    pub transaction_forwards: Vec<UdpSocket>,
77    pub vote: Vec<UdpSocket>,
78    pub broadcast: Vec<UdpSocket>,
79    pub transactions_quic: Vec<UdpSocket>,
80    pub transactions_forwards_quic: Vec<UdpSocket>,
81    pub vote_quic: Vec<UdpSocket>,
82    /// Client-side socket for the forwarding votes.
83    pub vote_forwarding_client: UdpSocket,
84    pub vortexor_receivers: Option<Vec<UdpSocket>>,
85}
86
87/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
88enum SigVerifier {
89    Local(SigVerifyStage),
90    Remote(VortexorReceiverAdapter),
91}
92
93impl SigVerifier {
94    fn join(self) -> thread::Result<()> {
95        match self {
96            SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
97            SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
98        }
99    }
100}
101
102// Conservatively allow 20 TPS per validator.
103pub const MAX_VOTES_PER_SECOND: u64 = 20;
104
105pub struct Tpu {
106    fetch_stage: FetchStage,
107    sig_verifier: SigVerifier,
108    vote_sigverify_stage: SigVerifyStage,
109    banking_stage: Arc<RwLock<Option<BankingStage>>>,
110    forwarding_stage: JoinHandle<()>,
111    cluster_info_vote_listener: ClusterInfoVoteListener,
112    broadcast_stage: BroadcastStage,
113    tpu_quic_t: Option<thread::JoinHandle<()>>,
114    tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
115    tpu_entry_notifier: Option<TpuEntryNotifier>,
116    staked_nodes_updater_service: StakedNodesUpdaterService,
117    tracer_thread_hdl: TracerThread,
118    tpu_vote_quic_t: thread::JoinHandle<()>,
119}
120
121impl Tpu {
122    #[allow(clippy::too_many_arguments)]
123    pub fn new_with_client(
124        cluster_info: &Arc<ClusterInfo>,
125        poh_recorder: &Arc<RwLock<PohRecorder>>,
126        transaction_recorder: TransactionRecorder,
127        entry_receiver: Receiver<WorkingBankEntry>,
128        retransmit_slots_receiver: Receiver<Slot>,
129        sockets: TpuSockets,
130        subscriptions: Option<Arc<RpcSubscriptions>>,
131        transaction_status_sender: Option<TransactionStatusSender>,
132        entry_notification_sender: Option<EntryNotifierSender>,
133        blockstore: Arc<Blockstore>,
134        broadcast_type: &BroadcastStageType,
135        xdp_sender: Option<XdpSender>,
136        exit: Arc<AtomicBool>,
137        shred_version: u16,
138        vote_tracker: Arc<VoteTracker>,
139        bank_forks: Arc<RwLock<BankForks>>,
140        verified_vote_sender: VerifiedVoteSender,
141        gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
142        replay_vote_receiver: ReplayVoteReceiver,
143        replay_vote_sender: ReplayVoteSender,
144        bank_notification_sender: Option<BankNotificationSenderConfig>,
145        duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
146        client: ForwardingClientOption,
147        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
148        keypair: &Keypair,
149        log_messages_bytes_limit: Option<usize>,
150        staked_nodes: &Arc<RwLock<StakedNodes>>,
151        shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
152        banking_tracer_channels: Channels,
153        tracer_thread_hdl: TracerThread,
154        tpu_enable_udp: bool,
155        tpu_quic_server_config: SwQosQuicStreamerConfig,
156        tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
157        vote_quic_server_config: SimpleQosQuicStreamerConfig,
158        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
159        block_production_method: BlockProductionMethod,
160        block_production_num_workers: NonZeroUsize,
161        block_production_scheduler_config: SchedulerConfig,
162        enable_block_production_forwarding: bool,
163        _generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
164        key_notifiers: Arc<RwLock<KeyUpdaters>>,
165        cancel: CancellationToken,
166    ) -> Self {
167        let TpuSockets {
168            transactions: transactions_sockets,
169            transaction_forwards: tpu_forwards_sockets,
170            vote: tpu_vote_sockets,
171            broadcast: broadcast_sockets,
172            transactions_quic: transactions_quic_sockets,
173            transactions_forwards_quic: transactions_forwards_quic_sockets,
174            vote_quic: tpu_vote_quic_sockets,
175            vote_forwarding_client: vote_forwarding_client_socket,
176            vortexor_receivers,
177        } = sockets;
178
179        let (packet_sender, packet_receiver) = unbounded();
180        let (vote_packet_sender, vote_packet_receiver) = unbounded();
181        let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
182        let fetch_stage = FetchStage::new_with_sender(
183            transactions_sockets,
184            tpu_forwards_sockets,
185            tpu_vote_sockets,
186            exit.clone(),
187            &packet_sender,
188            &vote_packet_sender,
189            &forwarded_packet_sender,
190            forwarded_packet_receiver,
191            poh_recorder,
192            None, // coalesce
193            Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
194            tpu_enable_udp,
195        );
196
197        let staked_nodes_updater_service = StakedNodesUpdaterService::new(
198            exit.clone(),
199            bank_forks.clone(),
200            staked_nodes.clone(),
201            shared_staked_nodes_overrides,
202        );
203
204        let Channels {
205            non_vote_sender,
206            non_vote_receiver,
207            tpu_vote_sender,
208            tpu_vote_receiver,
209            gossip_vote_sender,
210            gossip_vote_receiver,
211        } = banking_tracer_channels;
212
213        // Streamer for Votes:
214        let SpawnServerResult {
215            endpoints: _,
216            thread: tpu_vote_quic_t,
217            key_updater: vote_streamer_key_updater,
218        } = spawn_simple_qos_server_with_cancel(
219            "solQuicTVo",
220            "quic_streamer_tpu_vote",
221            tpu_vote_quic_sockets,
222            keypair,
223            vote_packet_sender.clone(),
224            staked_nodes.clone(),
225            vote_quic_server_config.quic_streamer_config,
226            vote_quic_server_config.qos_config,
227            cancel.clone(),
228        )
229        .unwrap();
230
231        let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
232            // Streamer for TPU
233            let SpawnServerResult {
234                endpoints: _,
235                thread: tpu_quic_t,
236                key_updater,
237            } = spawn_server_with_cancel(
238                "solQuicTpu",
239                "quic_streamer_tpu",
240                transactions_quic_sockets,
241                keypair,
242                packet_sender,
243                staked_nodes.clone(),
244                tpu_quic_server_config.quic_streamer_config,
245                tpu_quic_server_config.qos_config,
246                cancel.clone(),
247            )
248            .unwrap();
249            (Some(tpu_quic_t), Some(key_updater))
250        } else {
251            (None, None)
252        };
253
254        let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
255            // Streamer for TPU forward
256            let SpawnServerResult {
257                endpoints: _,
258                thread: tpu_forwards_quic_t,
259                key_updater: forwards_key_updater,
260            } = spawn_server_with_cancel(
261                "solQuicTpuFwd",
262                "quic_streamer_tpu_forwards",
263                transactions_forwards_quic_sockets,
264                keypair,
265                forwarded_packet_sender,
266                staked_nodes.clone(),
267                tpu_fwd_quic_server_config.quic_streamer_config,
268                tpu_fwd_quic_server_config.qos_config,
269                cancel,
270            )
271            .unwrap();
272            (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
273        } else {
274            (None, None)
275        };
276
277        let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
278        let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
279            info!("starting vortexor adapter");
280            let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
281            let adapter = VortexorReceiverAdapter::new(
282                sockets,
283                Duration::from_millis(5),
284                non_vote_sender,
285                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
286                exit.clone(),
287            );
288            SigVerifier::Remote(adapter)
289        } else {
290            info!("starting regular sigverify stage");
291            let verifier = TransactionSigVerifier::new(
292                non_vote_sender,
293                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
294            );
295            SigVerifier::Local(SigVerifyStage::new(
296                packet_receiver,
297                verifier,
298                "solSigVerTpu",
299                "tpu-verifier",
300            ))
301        };
302
303        let vote_sigverify_stage = {
304            let verifier = TransactionSigVerifier::new_reject_non_vote(
305                tpu_vote_sender,
306                Some(forward_stage_sender),
307            );
308            SigVerifyStage::new(
309                vote_packet_receiver,
310                verifier,
311                "solSigVerTpuVot",
312                "tpu-vote-verifier",
313            )
314        };
315
316        let cluster_info_vote_listener = ClusterInfoVoteListener::new(
317            exit.clone(),
318            cluster_info.clone(),
319            gossip_vote_sender,
320            vote_tracker,
321            bank_forks.clone(),
322            subscriptions,
323            verified_vote_sender,
324            gossip_verified_vote_hash_sender,
325            replay_vote_receiver,
326            blockstore.clone(),
327            bank_notification_sender,
328            duplicate_confirmed_slot_sender,
329        );
330
331        let banking_stage = BankingStage::new_num_threads(
332            block_production_method,
333            poh_recorder.clone(),
334            transaction_recorder,
335            non_vote_receiver,
336            tpu_vote_receiver,
337            gossip_vote_receiver,
338            block_production_num_workers,
339            block_production_scheduler_config,
340            transaction_status_sender,
341            replay_vote_sender,
342            log_messages_bytes_limit,
343            bank_forks.clone(),
344            prioritization_fee_cache.clone(),
345        );
346
347        let SpawnForwardingStageResult {
348            join_handle: forwarding_stage,
349            client_updater,
350        } = spawn_forwarding_stage(
351            forward_stage_receiver,
352            client,
353            vote_forwarding_client_socket,
354            bank_forks.read().unwrap().sharable_banks(),
355            ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
356            DataBudget::default(),
357        );
358
359        let (entry_receiver, tpu_entry_notifier) =
360            if let Some(entry_notification_sender) = entry_notification_sender {
361                let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
362                let tpu_entry_notifier = TpuEntryNotifier::new(
363                    entry_receiver,
364                    entry_notification_sender,
365                    broadcast_entry_sender,
366                    exit.clone(),
367                );
368                (broadcast_entry_receiver, Some(tpu_entry_notifier))
369            } else {
370                (entry_receiver, None)
371            };
372
373        let broadcast_stage = broadcast_type.new_broadcast_stage(
374            broadcast_sockets,
375            cluster_info.clone(),
376            entry_receiver,
377            retransmit_slots_receiver,
378            exit,
379            blockstore,
380            bank_forks,
381            shred_version,
382            turbine_quic_endpoint_sender,
383            xdp_sender,
384        );
385
386        let mut key_notifiers = key_notifiers.write().unwrap();
387        if let Some(key_updater) = key_updater {
388            key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
389        }
390        if let Some(forwards_key_updater) = forwards_key_updater {
391            key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
392        }
393        key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
394
395        key_notifiers.add(KeyUpdaterType::Forward, client_updater);
396
397        Self {
398            fetch_stage,
399            sig_verifier,
400            vote_sigverify_stage,
401            banking_stage: Arc::new(RwLock::new(Some(banking_stage))),
402            forwarding_stage,
403            cluster_info_vote_listener,
404            broadcast_stage,
405            tpu_quic_t,
406            tpu_forwards_quic_t,
407            tpu_entry_notifier,
408            staked_nodes_updater_service,
409            tracer_thread_hdl,
410            tpu_vote_quic_t,
411        }
412    }
413
414    pub fn banking_stage(&self) -> Arc<RwLock<Option<BankingStage>>> {
415        self.banking_stage.clone()
416    }
417
418    pub fn join(self) -> thread::Result<()> {
419        let results = vec![
420            self.fetch_stage.join(),
421            self.sig_verifier.join(),
422            self.vote_sigverify_stage.join(),
423            self.cluster_info_vote_listener.join(),
424            self.banking_stage
425                .write()
426                .unwrap()
427                .take()
428                .expect("banking_stage must be Some")
429                .join(),
430            self.forwarding_stage.join(),
431            self.staked_nodes_updater_service.join(),
432            self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
433            self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
434            self.tpu_vote_quic_t.join(),
435        ];
436        let broadcast_result = self.broadcast_stage.join();
437        for result in results {
438            result?;
439        }
440        if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
441            tpu_entry_notifier.join()?;
442        }
443        let _ = broadcast_result?;
444        if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
445            if let Err(tracer_result) = tracer_thread_hdl.join()? {
446                error!(
447                    "banking tracer thread returned error after successful thread join: \
448                     {tracer_result:?}"
449                );
450            }
451        }
452        Ok(())
453    }
454}