solana_core/
tpu.rs

1//! The `tpu` module implements the Transaction Processing Unit, a
2//! multi-stage transaction processing pipeline in software.
3
4pub use {
5    crate::forwarding_stage::ForwardingClientOption, solana_streamer::quic::DEFAULT_TPU_COALESCE,
6};
7use {
8    crate::{
9        admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
10        banking_stage::BankingStage,
11        banking_trace::{Channels, TracerThread},
12        cluster_info_vote_listener::{
13            ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
14            VerifiedVoteSender, VoteTracker,
15        },
16        fetch_stage::FetchStage,
17        forwarding_stage::{
18            spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
19        },
20        sigverify::TransactionSigVerifier,
21        sigverify_stage::SigVerifyStage,
22        staked_nodes_updater_service::StakedNodesUpdaterService,
23        tpu_entry_notifier::TpuEntryNotifier,
24        validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
25        vortexor_receiver_adapter::VortexorReceiverAdapter,
26    },
27    bytes::Bytes,
28    crossbeam_channel::{bounded, unbounded, Receiver},
29    solana_clock::Slot,
30    solana_gossip::cluster_info::ClusterInfo,
31    solana_keypair::Keypair,
32    solana_ledger::{
33        blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
34        entry_notifier_service::EntryNotifierSender,
35    },
36    solana_perf::data_budget::DataBudget,
37    solana_poh::{
38        poh_recorder::{PohRecorder, WorkingBankEntry},
39        transaction_recorder::TransactionRecorder,
40    },
41    solana_pubkey::Pubkey,
42    solana_rpc::{
43        optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
44        rpc_subscriptions::RpcSubscriptions,
45    },
46    solana_runtime::{
47        bank_forks::BankForks,
48        prioritization_fee_cache::PrioritizationFeeCache,
49        vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
50    },
51    solana_streamer::{
52        quic::{spawn_server, QuicServerParams, SpawnServerResult},
53        streamer::StakedNodes,
54    },
55    solana_turbine::{
56        broadcast_stage::{BroadcastStage, BroadcastStageType},
57        xdp::XdpSender,
58    },
59    std::{
60        collections::HashMap,
61        net::{SocketAddr, UdpSocket},
62        num::NonZeroUsize,
63        sync::{atomic::AtomicBool, Arc, RwLock},
64        thread::{self, JoinHandle},
65        time::Duration,
66    },
67    tokio::sync::mpsc::Sender as AsyncSender,
68};
69
70pub struct TpuSockets {
71    pub transactions: Vec<UdpSocket>,
72    pub transaction_forwards: Vec<UdpSocket>,
73    pub vote: Vec<UdpSocket>,
74    pub broadcast: Vec<UdpSocket>,
75    pub transactions_quic: Vec<UdpSocket>,
76    pub transactions_forwards_quic: Vec<UdpSocket>,
77    pub vote_quic: Vec<UdpSocket>,
78    /// Client-side socket for the forwarding votes.
79    pub vote_forwarding_client: UdpSocket,
80    pub vortexor_receivers: Option<Vec<UdpSocket>>,
81}
82
83/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
84enum SigVerifier {
85    Local(SigVerifyStage),
86    Remote(VortexorReceiverAdapter),
87}
88
89impl SigVerifier {
90    fn join(self) -> thread::Result<()> {
91        match self {
92            SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
93            SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
94        }
95    }
96}
97
98pub struct Tpu {
99    fetch_stage: FetchStage,
100    sig_verifier: SigVerifier,
101    vote_sigverify_stage: SigVerifyStage,
102    banking_stage: BankingStage,
103    forwarding_stage: JoinHandle<()>,
104    cluster_info_vote_listener: ClusterInfoVoteListener,
105    broadcast_stage: BroadcastStage,
106    tpu_quic_t: Option<thread::JoinHandle<()>>,
107    tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
108    tpu_entry_notifier: Option<TpuEntryNotifier>,
109    staked_nodes_updater_service: StakedNodesUpdaterService,
110    tracer_thread_hdl: TracerThread,
111    tpu_vote_quic_t: thread::JoinHandle<()>,
112}
113
114impl Tpu {
115    #[allow(clippy::too_many_arguments)]
116    pub fn new_with_client(
117        cluster_info: &Arc<ClusterInfo>,
118        poh_recorder: &Arc<RwLock<PohRecorder>>,
119        transaction_recorder: TransactionRecorder,
120        entry_receiver: Receiver<WorkingBankEntry>,
121        retransmit_slots_receiver: Receiver<Slot>,
122        sockets: TpuSockets,
123        subscriptions: Option<Arc<RpcSubscriptions>>,
124        transaction_status_sender: Option<TransactionStatusSender>,
125        entry_notification_sender: Option<EntryNotifierSender>,
126        blockstore: Arc<Blockstore>,
127        broadcast_type: &BroadcastStageType,
128        xdp_sender: Option<XdpSender>,
129        exit: Arc<AtomicBool>,
130        shred_version: u16,
131        vote_tracker: Arc<VoteTracker>,
132        bank_forks: Arc<RwLock<BankForks>>,
133        verified_vote_sender: VerifiedVoteSender,
134        gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
135        replay_vote_receiver: ReplayVoteReceiver,
136        replay_vote_sender: ReplayVoteSender,
137        bank_notification_sender: Option<BankNotificationSenderConfig>,
138        tpu_coalesce: Duration,
139        duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
140        client: ForwardingClientOption,
141        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
142        keypair: &Keypair,
143        log_messages_bytes_limit: Option<usize>,
144        staked_nodes: &Arc<RwLock<StakedNodes>>,
145        shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
146        banking_tracer_channels: Channels,
147        tracer_thread_hdl: TracerThread,
148        tpu_enable_udp: bool,
149        tpu_quic_server_config: QuicServerParams,
150        tpu_fwd_quic_server_config: QuicServerParams,
151        vote_quic_server_config: QuicServerParams,
152        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
153        block_production_method: BlockProductionMethod,
154        block_production_num_workers: NonZeroUsize,
155        transaction_struct: TransactionStructure,
156        enable_block_production_forwarding: bool,
157        _generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
158        key_notifiers: Arc<RwLock<KeyUpdaters>>,
159    ) -> Self {
160        let TpuSockets {
161            transactions: transactions_sockets,
162            transaction_forwards: tpu_forwards_sockets,
163            vote: tpu_vote_sockets,
164            broadcast: broadcast_sockets,
165            transactions_quic: transactions_quic_sockets,
166            transactions_forwards_quic: transactions_forwards_quic_sockets,
167            vote_quic: tpu_vote_quic_sockets,
168            vote_forwarding_client: vote_forwarding_client_socket,
169            vortexor_receivers,
170        } = sockets;
171
172        let (packet_sender, packet_receiver) = unbounded();
173        let (vote_packet_sender, vote_packet_receiver) = unbounded();
174        let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
175        let fetch_stage = FetchStage::new_with_sender(
176            transactions_sockets,
177            tpu_forwards_sockets,
178            tpu_vote_sockets,
179            exit.clone(),
180            &packet_sender,
181            &vote_packet_sender,
182            &forwarded_packet_sender,
183            forwarded_packet_receiver,
184            poh_recorder,
185            Some(tpu_coalesce),
186            Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
187            tpu_enable_udp,
188        );
189
190        let staked_nodes_updater_service = StakedNodesUpdaterService::new(
191            exit.clone(),
192            bank_forks.clone(),
193            staked_nodes.clone(),
194            shared_staked_nodes_overrides,
195        );
196
197        let Channels {
198            non_vote_sender,
199            non_vote_receiver,
200            tpu_vote_sender,
201            tpu_vote_receiver,
202            gossip_vote_sender,
203            gossip_vote_receiver,
204        } = banking_tracer_channels;
205
206        // Streamer for Votes:
207        let SpawnServerResult {
208            endpoints: _,
209            thread: tpu_vote_quic_t,
210            key_updater: vote_streamer_key_updater,
211        } = spawn_server(
212            "solQuicTVo",
213            "quic_streamer_tpu_vote",
214            tpu_vote_quic_sockets,
215            keypair,
216            vote_packet_sender.clone(),
217            exit.clone(),
218            staked_nodes.clone(),
219            vote_quic_server_config,
220        )
221        .unwrap();
222
223        let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
224            // Streamer for TPU
225            let SpawnServerResult {
226                endpoints: _,
227                thread: tpu_quic_t,
228                key_updater,
229            } = spawn_server(
230                "solQuicTpu",
231                "quic_streamer_tpu",
232                transactions_quic_sockets,
233                keypair,
234                packet_sender,
235                exit.clone(),
236                staked_nodes.clone(),
237                tpu_quic_server_config,
238            )
239            .unwrap();
240            (Some(tpu_quic_t), Some(key_updater))
241        } else {
242            (None, None)
243        };
244
245        let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
246            // Streamer for TPU forward
247            let SpawnServerResult {
248                endpoints: _,
249                thread: tpu_forwards_quic_t,
250                key_updater: forwards_key_updater,
251            } = spawn_server(
252                "solQuicTpuFwd",
253                "quic_streamer_tpu_forwards",
254                transactions_forwards_quic_sockets,
255                keypair,
256                forwarded_packet_sender,
257                exit.clone(),
258                staked_nodes.clone(),
259                tpu_fwd_quic_server_config,
260            )
261            .unwrap();
262            (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
263        } else {
264            (None, None)
265        };
266
267        let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
268        let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
269            info!("starting vortexor adapter");
270            let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
271            let adapter = VortexorReceiverAdapter::new(
272                sockets,
273                Duration::from_millis(5),
274                tpu_coalesce,
275                non_vote_sender,
276                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
277                exit.clone(),
278            );
279            SigVerifier::Remote(adapter)
280        } else {
281            info!("starting regular sigverify stage");
282            let verifier = TransactionSigVerifier::new(
283                non_vote_sender,
284                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
285            );
286            SigVerifier::Local(SigVerifyStage::new(
287                packet_receiver,
288                verifier,
289                "solSigVerTpu",
290                "tpu-verifier",
291            ))
292        };
293
294        let vote_sigverify_stage = {
295            let verifier = TransactionSigVerifier::new_reject_non_vote(
296                tpu_vote_sender,
297                Some(forward_stage_sender),
298            );
299            SigVerifyStage::new(
300                vote_packet_receiver,
301                verifier,
302                "solSigVerTpuVot",
303                "tpu-vote-verifier",
304            )
305        };
306
307        let cluster_info_vote_listener = ClusterInfoVoteListener::new(
308            exit.clone(),
309            cluster_info.clone(),
310            gossip_vote_sender,
311            vote_tracker,
312            bank_forks.clone(),
313            subscriptions,
314            verified_vote_sender,
315            gossip_verified_vote_hash_sender,
316            replay_vote_receiver,
317            blockstore.clone(),
318            bank_notification_sender,
319            duplicate_confirmed_slot_sender,
320        );
321
322        let banking_stage = BankingStage::new_num_threads(
323            block_production_method,
324            transaction_struct,
325            poh_recorder.clone(),
326            transaction_recorder,
327            non_vote_receiver,
328            tpu_vote_receiver,
329            gossip_vote_receiver,
330            block_production_num_workers,
331            transaction_status_sender,
332            replay_vote_sender,
333            log_messages_bytes_limit,
334            bank_forks.clone(),
335            prioritization_fee_cache.clone(),
336        );
337
338        let SpawnForwardingStageResult {
339            join_handle: forwarding_stage,
340            client_updater,
341        } = spawn_forwarding_stage(
342            forward_stage_receiver,
343            client,
344            vote_forwarding_client_socket,
345            bank_forks.read().unwrap().sharable_root_bank(),
346            ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
347            DataBudget::default(),
348        );
349
350        let (entry_receiver, tpu_entry_notifier) =
351            if let Some(entry_notification_sender) = entry_notification_sender {
352                let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
353                let tpu_entry_notifier = TpuEntryNotifier::new(
354                    entry_receiver,
355                    entry_notification_sender,
356                    broadcast_entry_sender,
357                    exit.clone(),
358                );
359                (broadcast_entry_receiver, Some(tpu_entry_notifier))
360            } else {
361                (entry_receiver, None)
362            };
363
364        let broadcast_stage = broadcast_type.new_broadcast_stage(
365            broadcast_sockets,
366            cluster_info.clone(),
367            entry_receiver,
368            retransmit_slots_receiver,
369            exit,
370            blockstore,
371            bank_forks,
372            shred_version,
373            turbine_quic_endpoint_sender,
374            xdp_sender,
375        );
376
377        let mut key_notifiers = key_notifiers.write().unwrap();
378        if let Some(key_updater) = key_updater {
379            key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
380        }
381        if let Some(forwards_key_updater) = forwards_key_updater {
382            key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
383        }
384        key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
385
386        key_notifiers.add(KeyUpdaterType::Forward, client_updater);
387
388        Self {
389            fetch_stage,
390            sig_verifier,
391            vote_sigverify_stage,
392            banking_stage,
393            forwarding_stage,
394            cluster_info_vote_listener,
395            broadcast_stage,
396            tpu_quic_t,
397            tpu_forwards_quic_t,
398            tpu_entry_notifier,
399            staked_nodes_updater_service,
400            tracer_thread_hdl,
401            tpu_vote_quic_t,
402        }
403    }
404
405    pub fn join(self) -> thread::Result<()> {
406        let results = vec![
407            self.fetch_stage.join(),
408            self.sig_verifier.join(),
409            self.vote_sigverify_stage.join(),
410            self.cluster_info_vote_listener.join(),
411            self.banking_stage.join(),
412            self.forwarding_stage.join(),
413            self.staked_nodes_updater_service.join(),
414            self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
415            self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
416            self.tpu_vote_quic_t.join(),
417        ];
418        let broadcast_result = self.broadcast_stage.join();
419        for result in results {
420            result?;
421        }
422        if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
423            tpu_entry_notifier.join()?;
424        }
425        let _ = broadcast_result?;
426        if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
427            if let Err(tracer_result) = tracer_thread_hdl.join()? {
428                error!(
429                    "banking tracer thread returned error after successful thread join: \
430                     {tracer_result:?}"
431                );
432            }
433        }
434        Ok(())
435    }
436}