solana_core/
tpu.rs

1//! The `tpu` module implements the Transaction Processing Unit, a
2//! multi-stage transaction processing pipeline in software.
3
4pub use solana_sdk::net::DEFAULT_TPU_COALESCE;
5// allow multiple connections for NAT and any open/close overlap
6#[deprecated(
7    since = "2.2.0",
8    note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
9)]
10pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
11use {
12    crate::{
13        banking_stage::BankingStage,
14        banking_trace::{Channels, TracerThread},
15        cluster_info_vote_listener::{
16            ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
17            VerifiedVoteSender, VoteTracker,
18        },
19        fetch_stage::FetchStage,
20        sigverify::TransactionSigVerifier,
21        sigverify_stage::SigVerifyStage,
22        staked_nodes_updater_service::StakedNodesUpdaterService,
23        tpu_entry_notifier::TpuEntryNotifier,
24        validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
25    },
26    bytes::Bytes,
27    crossbeam_channel::{unbounded, Receiver},
28    solana_client::connection_cache::ConnectionCache,
29    solana_gossip::cluster_info::ClusterInfo,
30    solana_ledger::{
31        blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
32        entry_notifier_service::EntryNotifierSender,
33    },
34    solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
35    solana_rpc::{
36        optimistically_confirmed_bank_tracker::BankNotificationSender,
37        rpc_subscriptions::RpcSubscriptions,
38    },
39    solana_runtime::{
40        bank_forks::BankForks,
41        prioritization_fee_cache::PrioritizationFeeCache,
42        vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
43    },
44    solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
45    solana_streamer::{
46        quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
47        streamer::StakedNodes,
48    },
49    solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
50    std::{
51        collections::HashMap,
52        net::{SocketAddr, UdpSocket},
53        sync::{atomic::AtomicBool, Arc, RwLock},
54        thread,
55        time::Duration,
56    },
57    tokio::sync::mpsc::Sender as AsyncSender,
58};
59
60pub struct TpuSockets {
61    pub transactions: Vec<UdpSocket>,
62    pub transaction_forwards: Vec<UdpSocket>,
63    pub vote: Vec<UdpSocket>,
64    pub broadcast: Vec<UdpSocket>,
65    pub transactions_quic: Vec<UdpSocket>,
66    pub transactions_forwards_quic: Vec<UdpSocket>,
67    pub vote_quic: Vec<UdpSocket>,
68}
69
70pub struct Tpu {
71    fetch_stage: FetchStage,
72    sigverify_stage: SigVerifyStage,
73    vote_sigverify_stage: SigVerifyStage,
74    banking_stage: BankingStage,
75    cluster_info_vote_listener: ClusterInfoVoteListener,
76    broadcast_stage: BroadcastStage,
77    tpu_quic_t: thread::JoinHandle<()>,
78    tpu_forwards_quic_t: thread::JoinHandle<()>,
79    tpu_entry_notifier: Option<TpuEntryNotifier>,
80    staked_nodes_updater_service: StakedNodesUpdaterService,
81    tracer_thread_hdl: TracerThread,
82    tpu_vote_quic_t: thread::JoinHandle<()>,
83}
84
85impl Tpu {
86    #[allow(clippy::too_many_arguments)]
87    pub fn new(
88        cluster_info: &Arc<ClusterInfo>,
89        poh_recorder: &Arc<RwLock<PohRecorder>>,
90        entry_receiver: Receiver<WorkingBankEntry>,
91        retransmit_slots_receiver: Receiver<Slot>,
92        sockets: TpuSockets,
93        subscriptions: &Arc<RpcSubscriptions>,
94        transaction_status_sender: Option<TransactionStatusSender>,
95        entry_notification_sender: Option<EntryNotifierSender>,
96        blockstore: Arc<Blockstore>,
97        broadcast_type: &BroadcastStageType,
98        exit: Arc<AtomicBool>,
99        shred_version: u16,
100        vote_tracker: Arc<VoteTracker>,
101        bank_forks: Arc<RwLock<BankForks>>,
102        verified_vote_sender: VerifiedVoteSender,
103        gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
104        replay_vote_receiver: ReplayVoteReceiver,
105        replay_vote_sender: ReplayVoteSender,
106        bank_notification_sender: Option<BankNotificationSender>,
107        tpu_coalesce: Duration,
108        duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
109        connection_cache: &Arc<ConnectionCache>,
110        turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
111        keypair: &Keypair,
112        log_messages_bytes_limit: Option<usize>,
113        staked_nodes: &Arc<RwLock<StakedNodes>>,
114        shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
115        banking_tracer_channels: Channels,
116        tracer_thread_hdl: TracerThread,
117        tpu_enable_udp: bool,
118        tpu_quic_server_config: QuicServerParams,
119        tpu_fwd_quic_server_config: QuicServerParams,
120        vote_quic_server_config: QuicServerParams,
121        prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
122        block_production_method: BlockProductionMethod,
123        transaction_struct: TransactionStructure,
124        enable_block_production_forwarding: bool,
125        _generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
126    ) -> (Self, Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>) {
127        let TpuSockets {
128            transactions: transactions_sockets,
129            transaction_forwards: tpu_forwards_sockets,
130            vote: tpu_vote_sockets,
131            broadcast: broadcast_sockets,
132            transactions_quic: transactions_quic_sockets,
133            transactions_forwards_quic: transactions_forwards_quic_sockets,
134            vote_quic: tpu_vote_quic_sockets,
135        } = sockets;
136
137        let (packet_sender, packet_receiver) = unbounded();
138        let (vote_packet_sender, vote_packet_receiver) = unbounded();
139        let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
140        let fetch_stage = FetchStage::new_with_sender(
141            transactions_sockets,
142            tpu_forwards_sockets,
143            tpu_vote_sockets,
144            exit.clone(),
145            &packet_sender,
146            &vote_packet_sender,
147            &forwarded_packet_sender,
148            forwarded_packet_receiver,
149            poh_recorder,
150            tpu_coalesce,
151            Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
152            tpu_enable_udp,
153        );
154
155        let staked_nodes_updater_service = StakedNodesUpdaterService::new(
156            exit.clone(),
157            bank_forks.clone(),
158            staked_nodes.clone(),
159            shared_staked_nodes_overrides,
160        );
161
162        let Channels {
163            non_vote_sender,
164            non_vote_receiver,
165            tpu_vote_sender,
166            tpu_vote_receiver,
167            gossip_vote_sender,
168            gossip_vote_receiver,
169        } = banking_tracer_channels;
170
171        // Streamer for Votes:
172        let SpawnServerResult {
173            endpoints: _,
174            thread: tpu_vote_quic_t,
175            key_updater: vote_streamer_key_updater,
176        } = spawn_server_multi(
177            "solQuicTVo",
178            "quic_streamer_tpu_vote",
179            tpu_vote_quic_sockets,
180            keypair,
181            vote_packet_sender.clone(),
182            exit.clone(),
183            staked_nodes.clone(),
184            vote_quic_server_config,
185        )
186        .unwrap();
187
188        // Streamer for TPU
189        let SpawnServerResult {
190            endpoints: _,
191            thread: tpu_quic_t,
192            key_updater,
193        } = spawn_server_multi(
194            "solQuicTpu",
195            "quic_streamer_tpu",
196            transactions_quic_sockets,
197            keypair,
198            packet_sender,
199            exit.clone(),
200            staked_nodes.clone(),
201            tpu_quic_server_config,
202        )
203        .unwrap();
204
205        // Streamer for TPU forward
206        let SpawnServerResult {
207            endpoints: _,
208            thread: tpu_forwards_quic_t,
209            key_updater: forwards_key_updater,
210        } = spawn_server_multi(
211            "solQuicTpuFwd",
212            "quic_streamer_tpu_forwards",
213            transactions_forwards_quic_sockets,
214            keypair,
215            forwarded_packet_sender,
216            exit.clone(),
217            staked_nodes.clone(),
218            tpu_fwd_quic_server_config,
219        )
220        .unwrap();
221
222        let sigverify_stage = {
223            let verifier = TransactionSigVerifier::new(non_vote_sender);
224            SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
225        };
226
227        let vote_sigverify_stage = {
228            let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
229            SigVerifyStage::new(
230                vote_packet_receiver,
231                verifier,
232                "solSigVerTpuVot",
233                "tpu-vote-verifier",
234            )
235        };
236
237        let cluster_info_vote_listener = ClusterInfoVoteListener::new(
238            exit.clone(),
239            cluster_info.clone(),
240            gossip_vote_sender,
241            vote_tracker,
242            bank_forks.clone(),
243            subscriptions.clone(),
244            verified_vote_sender,
245            gossip_verified_vote_hash_sender,
246            replay_vote_receiver,
247            blockstore.clone(),
248            bank_notification_sender,
249            duplicate_confirmed_slot_sender,
250        );
251
252        let banking_stage = BankingStage::new(
253            block_production_method,
254            transaction_struct,
255            cluster_info,
256            poh_recorder,
257            non_vote_receiver,
258            tpu_vote_receiver,
259            gossip_vote_receiver,
260            transaction_status_sender,
261            replay_vote_sender,
262            log_messages_bytes_limit,
263            connection_cache.clone(),
264            bank_forks.clone(),
265            prioritization_fee_cache,
266            enable_block_production_forwarding,
267        );
268
269        let (entry_receiver, tpu_entry_notifier) =
270            if let Some(entry_notification_sender) = entry_notification_sender {
271                let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
272                let tpu_entry_notifier = TpuEntryNotifier::new(
273                    entry_receiver,
274                    entry_notification_sender,
275                    broadcast_entry_sender,
276                    exit.clone(),
277                );
278                (broadcast_entry_receiver, Some(tpu_entry_notifier))
279            } else {
280                (entry_receiver, None)
281            };
282
283        let broadcast_stage = broadcast_type.new_broadcast_stage(
284            broadcast_sockets,
285            cluster_info.clone(),
286            entry_receiver,
287            retransmit_slots_receiver,
288            exit,
289            blockstore,
290            bank_forks,
291            shred_version,
292            turbine_quic_endpoint_sender,
293        );
294
295        (
296            Self {
297                fetch_stage,
298                sigverify_stage,
299                vote_sigverify_stage,
300                banking_stage,
301                cluster_info_vote_listener,
302                broadcast_stage,
303                tpu_quic_t,
304                tpu_forwards_quic_t,
305                tpu_entry_notifier,
306                staked_nodes_updater_service,
307                tracer_thread_hdl,
308                tpu_vote_quic_t,
309            },
310            vec![key_updater, forwards_key_updater, vote_streamer_key_updater],
311        )
312    }
313
314    pub fn join(self) -> thread::Result<()> {
315        let results = vec![
316            self.fetch_stage.join(),
317            self.sigverify_stage.join(),
318            self.vote_sigverify_stage.join(),
319            self.cluster_info_vote_listener.join(),
320            self.banking_stage.join(),
321            self.staked_nodes_updater_service.join(),
322            self.tpu_quic_t.join(),
323            self.tpu_forwards_quic_t.join(),
324            self.tpu_vote_quic_t.join(),
325        ];
326        let broadcast_result = self.broadcast_stage.join();
327        for result in results {
328            result?;
329        }
330        if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
331            tpu_entry_notifier.join()?;
332        }
333        let _ = broadcast_result?;
334        if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
335            if let Err(tracer_result) = tracer_thread_hdl.join()? {
336                error!(
337                    "banking tracer thread returned error after successful thread join: {:?}",
338                    tracer_result
339                );
340            }
341        }
342        Ok(())
343    }
344}