solana_core/
tpu.rs

1//! The `tpu` module implements the Transaction Processing Unit, a
2//! multi-stage transaction processing pipeline in software.
3
4// allow multiple connections for NAT and any open/close overlap
5#[deprecated(
6    since = "2.2.0",
7    note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
8)]
9pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
10pub use {
11    crate::forwarding_stage::ForwardingClientOption, solana_streamer::quic::DEFAULT_TPU_COALESCE,
12};
13use {
14    crate::{
15        admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
16        banking_stage::BankingStage,
17        banking_trace::{Channels, TracerThread},
18        cluster_info_vote_listener::{
19            ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
20            VerifiedVoteSender, VoteTracker,
21        },
22        fetch_stage::FetchStage,
23        forwarding_stage::{
24            spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
25        },
26        sigverify::TransactionSigVerifier,
27        sigverify_stage::SigVerifyStage,
28        staked_nodes_updater_service::StakedNodesUpdaterService,
29        tpu_entry_notifier::TpuEntryNotifier,
30        validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
31    },
32    bytes::Bytes,
33    crossbeam_channel::{bounded, unbounded, Receiver},
34    solana_clock::Slot,
35    solana_gossip::cluster_info::ClusterInfo,
36    solana_keypair::Keypair,
37    solana_ledger::{
38        blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
39        entry_notifier_service::EntryNotifierSender,
40    },
41    solana_perf::data_budget::DataBudget,
42    solana_poh::{
43        poh_recorder::{PohRecorder, WorkingBankEntry},
44        transaction_recorder::TransactionRecorder,
45    },
46    solana_pubkey::Pubkey,
47    solana_rpc::{
48        optimistically_confirmed_bank_tracker::BankNotificationSender,
49        rpc_subscriptions::RpcSubscriptions,
50    },
51    solana_runtime::{
52        bank_forks::BankForks,
53        prioritization_fee_cache::PrioritizationFeeCache,
54        root_bank_cache::RootBankCache,
55        vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
56    },
57    solana_streamer::{
58        quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
59        streamer::StakedNodes,
60    },
61    solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
62    std::{
63        collections::HashMap,
64        net::{SocketAddr, UdpSocket},
65        sync::{atomic::AtomicBool, Arc, RwLock},
66        thread::{self, JoinHandle},
67        time::Duration,
68    },
69    tokio::sync::mpsc::Sender as AsyncSender,
70};
71
72pub struct TpuSockets {
73    pub transactions: Vec<UdpSocket>,
74    pub transaction_forwards: Vec<UdpSocket>,
75    pub vote: Vec<UdpSocket>,
76    pub broadcast: Vec<UdpSocket>,
77    pub transactions_quic: Vec<UdpSocket>,
78    pub transactions_forwards_quic: Vec<UdpSocket>,
79    pub vote_quic: Vec<UdpSocket>,
80    /// Client-side socket for the forwarding votes.
81    pub vote_forwarding_client: UdpSocket,
82}
83
84pub struct Tpu {
85    fetch_stage: FetchStage,
86    sigverify_stage: SigVerifyStage,
87    vote_sigverify_stage: SigVerifyStage,
88    banking_stage: BankingStage,
89    forwarding_stage: JoinHandle<()>,
90    cluster_info_vote_listener: ClusterInfoVoteListener,
91    broadcast_stage: BroadcastStage,
92    tpu_quic_t: thread::JoinHandle<()>,
93    tpu_forwards_quic_t: thread::JoinHandle<()>,
94    tpu_entry_notifier: Option<TpuEntryNotifier>,
95    staked_nodes_updater_service: StakedNodesUpdaterService,
96    tracer_thread_hdl: TracerThread,
97    tpu_vote_quic_t: thread::JoinHandle<()>,
98}
99
100impl Tpu {
101    #[allow(clippy::too_many_arguments)]
102    pub fn new_with_client(
103        cluster_info: &Arc<ClusterInfo>,
104        poh_recorder: &Arc<RwLock<PohRecorder>>,
105        transaction_recorder: TransactionRecorder,
106        entry_receiver: Receiver<WorkingBankEntry>,
107        retransmit_slots_receiver: Receiver<Slot>,
108        sockets: TpuSockets,
109        subscriptions: &Arc<RpcSubscriptions>,
110        transaction_status_sender: Option<TransactionStatusSender>,
111        entry_notification_sender: Option<EntryNotifierSender>,
112        blockstore: Arc<Blockstore>,
113        broadcast_type: &BroadcastStageType,
114        exit: Arc<AtomicBool>,
115        shred_version: u16,
116        vote_tracker: Arc<VoteTracker>,
117        bank_forks: Arc<RwLock<BankForks>>,
118        verified_vote_sender: VerifiedVoteSender,
119        gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
120        replay_vote_receiver: ReplayVoteReceiver,
121        replay_vote_sender: ReplayVoteSender,
122        bank_notification_sender: Option<BankNotificationSender>,
123        tpu_coalesce: Duration,
124        duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
125        client: ForwardingClientOption,
126        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
127        keypair: &Keypair,
128        log_messages_bytes_limit: Option<usize>,
129        staked_nodes: &Arc<RwLock<StakedNodes>>,
130        shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
131        banking_tracer_channels: Channels,
132        tracer_thread_hdl: TracerThread,
133        tpu_enable_udp: bool,
134        tpu_quic_server_config: QuicServerParams,
135        tpu_fwd_quic_server_config: QuicServerParams,
136        vote_quic_server_config: QuicServerParams,
137        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
138        block_production_method: BlockProductionMethod,
139        transaction_struct: TransactionStructure,
140        enable_block_production_forwarding: bool,
141        _generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
142        key_notifiers: Arc<RwLock<KeyUpdaters>>,
143    ) -> Self {
144        let TpuSockets {
145            transactions: transactions_sockets,
146            transaction_forwards: tpu_forwards_sockets,
147            vote: tpu_vote_sockets,
148            broadcast: broadcast_sockets,
149            transactions_quic: transactions_quic_sockets,
150            transactions_forwards_quic: transactions_forwards_quic_sockets,
151            vote_quic: tpu_vote_quic_sockets,
152            vote_forwarding_client: vote_forwarding_client_socket,
153        } = sockets;
154
155        let (packet_sender, packet_receiver) = unbounded();
156        let (vote_packet_sender, vote_packet_receiver) = unbounded();
157        let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
158        let fetch_stage = FetchStage::new_with_sender(
159            transactions_sockets,
160            tpu_forwards_sockets,
161            tpu_vote_sockets,
162            exit.clone(),
163            &packet_sender,
164            &vote_packet_sender,
165            &forwarded_packet_sender,
166            forwarded_packet_receiver,
167            poh_recorder,
168            Some(tpu_coalesce),
169            Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
170            tpu_enable_udp,
171        );
172
173        let staked_nodes_updater_service = StakedNodesUpdaterService::new(
174            exit.clone(),
175            bank_forks.clone(),
176            staked_nodes.clone(),
177            shared_staked_nodes_overrides,
178        );
179
180        let Channels {
181            non_vote_sender,
182            non_vote_receiver,
183            tpu_vote_sender,
184            tpu_vote_receiver,
185            gossip_vote_sender,
186            gossip_vote_receiver,
187        } = banking_tracer_channels;
188
189        // Streamer for Votes:
190        let SpawnServerResult {
191            endpoints: _,
192            thread: tpu_vote_quic_t,
193            key_updater: vote_streamer_key_updater,
194        } = spawn_server_multi(
195            "solQuicTVo",
196            "quic_streamer_tpu_vote",
197            tpu_vote_quic_sockets,
198            keypair,
199            vote_packet_sender.clone(),
200            exit.clone(),
201            staked_nodes.clone(),
202            vote_quic_server_config,
203        )
204        .unwrap();
205
206        // Streamer for TPU
207        let SpawnServerResult {
208            endpoints: _,
209            thread: tpu_quic_t,
210            key_updater,
211        } = spawn_server_multi(
212            "solQuicTpu",
213            "quic_streamer_tpu",
214            transactions_quic_sockets,
215            keypair,
216            packet_sender,
217            exit.clone(),
218            staked_nodes.clone(),
219            tpu_quic_server_config,
220        )
221        .unwrap();
222
223        // Streamer for TPU forward
224        let SpawnServerResult {
225            endpoints: _,
226            thread: tpu_forwards_quic_t,
227            key_updater: forwards_key_updater,
228        } = spawn_server_multi(
229            "solQuicTpuFwd",
230            "quic_streamer_tpu_forwards",
231            transactions_forwards_quic_sockets,
232            keypair,
233            forwarded_packet_sender,
234            exit.clone(),
235            staked_nodes.clone(),
236            tpu_fwd_quic_server_config,
237        )
238        .unwrap();
239
240        let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
241        let sigverify_stage = {
242            let verifier = TransactionSigVerifier::new(
243                non_vote_sender,
244                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
245            );
246            SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
247        };
248
249        let vote_sigverify_stage = {
250            let verifier = TransactionSigVerifier::new_reject_non_vote(
251                tpu_vote_sender,
252                Some(forward_stage_sender),
253            );
254            SigVerifyStage::new(
255                vote_packet_receiver,
256                verifier,
257                "solSigVerTpuVot",
258                "tpu-vote-verifier",
259            )
260        };
261
262        let cluster_info_vote_listener = ClusterInfoVoteListener::new(
263            exit.clone(),
264            cluster_info.clone(),
265            gossip_vote_sender,
266            vote_tracker,
267            bank_forks.clone(),
268            subscriptions.clone(),
269            verified_vote_sender,
270            gossip_verified_vote_hash_sender,
271            replay_vote_receiver,
272            blockstore.clone(),
273            bank_notification_sender,
274            duplicate_confirmed_slot_sender,
275        );
276
277        let banking_stage = BankingStage::new(
278            block_production_method,
279            transaction_struct,
280            cluster_info,
281            poh_recorder,
282            transaction_recorder,
283            non_vote_receiver,
284            tpu_vote_receiver,
285            gossip_vote_receiver,
286            transaction_status_sender,
287            replay_vote_sender,
288            log_messages_bytes_limit,
289            bank_forks.clone(),
290            prioritization_fee_cache,
291        );
292
293        let SpawnForwardingStageResult {
294            join_handle: forwarding_stage,
295            client_updater,
296        } = spawn_forwarding_stage(
297            forward_stage_receiver,
298            client,
299            vote_forwarding_client_socket,
300            RootBankCache::new(bank_forks.clone()),
301            ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
302            DataBudget::default(),
303        );
304
305        let (entry_receiver, tpu_entry_notifier) =
306            if let Some(entry_notification_sender) = entry_notification_sender {
307                let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
308                let tpu_entry_notifier = TpuEntryNotifier::new(
309                    entry_receiver,
310                    entry_notification_sender,
311                    broadcast_entry_sender,
312                    exit.clone(),
313                );
314                (broadcast_entry_receiver, Some(tpu_entry_notifier))
315            } else {
316                (entry_receiver, None)
317            };
318
319        let broadcast_stage = broadcast_type.new_broadcast_stage(
320            broadcast_sockets,
321            cluster_info.clone(),
322            entry_receiver,
323            retransmit_slots_receiver,
324            exit,
325            blockstore,
326            bank_forks,
327            shred_version,
328            turbine_quic_endpoint_sender,
329        );
330
331        let mut key_notifiers = key_notifiers.write().unwrap();
332        key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
333        key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
334        key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
335        key_notifiers.add(KeyUpdaterType::Forward, client_updater);
336
337        Self {
338            fetch_stage,
339            sigverify_stage,
340            vote_sigverify_stage,
341            banking_stage,
342            forwarding_stage,
343            cluster_info_vote_listener,
344            broadcast_stage,
345            tpu_quic_t,
346            tpu_forwards_quic_t,
347            tpu_entry_notifier,
348            staked_nodes_updater_service,
349            tracer_thread_hdl,
350            tpu_vote_quic_t,
351        }
352    }
353
354    pub fn join(self) -> thread::Result<()> {
355        let results = vec![
356            self.fetch_stage.join(),
357            self.sigverify_stage.join(),
358            self.vote_sigverify_stage.join(),
359            self.cluster_info_vote_listener.join(),
360            self.banking_stage.join(),
361            self.forwarding_stage.join(),
362            self.staked_nodes_updater_service.join(),
363            self.tpu_quic_t.join(),
364            self.tpu_forwards_quic_t.join(),
365            self.tpu_vote_quic_t.join(),
366        ];
367        let broadcast_result = self.broadcast_stage.join();
368        for result in results {
369            result?;
370        }
371        if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
372            tpu_entry_notifier.join()?;
373        }
374        let _ = broadcast_result?;
375        if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
376            if let Err(tracer_result) = tracer_thread_hdl.join()? {
377                error!(
378                    "banking tracer thread returned error after successful thread join: {:?}",
379                    tracer_result
380                );
381            }
382        }
383        Ok(())
384    }
385}