1pub use {
5 crate::forwarding_stage::ForwardingClientOption, solana_streamer::quic::DEFAULT_TPU_COALESCE,
6};
7use {
8 crate::{
9 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
10 banking_stage::BankingStage,
11 banking_trace::{Channels, TracerThread},
12 cluster_info_vote_listener::{
13 ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
14 VerifiedVoteSender, VoteTracker,
15 },
16 fetch_stage::FetchStage,
17 forwarding_stage::{
18 spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
19 },
20 sigverify::TransactionSigVerifier,
21 sigverify_stage::SigVerifyStage,
22 staked_nodes_updater_service::StakedNodesUpdaterService,
23 tpu_entry_notifier::TpuEntryNotifier,
24 validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
25 vortexor_receiver_adapter::VortexorReceiverAdapter,
26 },
27 bytes::Bytes,
28 crossbeam_channel::{bounded, unbounded, Receiver},
29 solana_clock::Slot,
30 solana_gossip::cluster_info::ClusterInfo,
31 solana_keypair::Keypair,
32 solana_ledger::{
33 blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
34 entry_notifier_service::EntryNotifierSender,
35 },
36 solana_perf::data_budget::DataBudget,
37 solana_poh::{
38 poh_recorder::{PohRecorder, WorkingBankEntry},
39 transaction_recorder::TransactionRecorder,
40 },
41 solana_pubkey::Pubkey,
42 solana_rpc::{
43 optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
44 rpc_subscriptions::RpcSubscriptions,
45 },
46 solana_runtime::{
47 bank_forks::BankForks,
48 prioritization_fee_cache::PrioritizationFeeCache,
49 vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
50 },
51 solana_streamer::{
52 quic::{spawn_server, QuicServerParams, SpawnServerResult},
53 streamer::StakedNodes,
54 },
55 solana_turbine::{
56 broadcast_stage::{BroadcastStage, BroadcastStageType},
57 xdp::XdpSender,
58 },
59 std::{
60 collections::HashMap,
61 net::{SocketAddr, UdpSocket},
62 num::NonZeroUsize,
63 sync::{atomic::AtomicBool, Arc, RwLock},
64 thread::{self, JoinHandle},
65 time::Duration,
66 },
67 tokio::sync::mpsc::Sender as AsyncSender,
68};
69
70pub struct TpuSockets {
71 pub transactions: Vec<UdpSocket>,
72 pub transaction_forwards: Vec<UdpSocket>,
73 pub vote: Vec<UdpSocket>,
74 pub broadcast: Vec<UdpSocket>,
75 pub transactions_quic: Vec<UdpSocket>,
76 pub transactions_forwards_quic: Vec<UdpSocket>,
77 pub vote_quic: Vec<UdpSocket>,
78 pub vote_forwarding_client: UdpSocket,
80 pub vortexor_receivers: Option<Vec<UdpSocket>>,
81}
82
83enum SigVerifier {
85 Local(SigVerifyStage),
86 Remote(VortexorReceiverAdapter),
87}
88
89impl SigVerifier {
90 fn join(self) -> thread::Result<()> {
91 match self {
92 SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
93 SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
94 }
95 }
96}
97
98pub struct Tpu {
99 fetch_stage: FetchStage,
100 sig_verifier: SigVerifier,
101 vote_sigverify_stage: SigVerifyStage,
102 banking_stage: BankingStage,
103 forwarding_stage: JoinHandle<()>,
104 cluster_info_vote_listener: ClusterInfoVoteListener,
105 broadcast_stage: BroadcastStage,
106 tpu_quic_t: Option<thread::JoinHandle<()>>,
107 tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
108 tpu_entry_notifier: Option<TpuEntryNotifier>,
109 staked_nodes_updater_service: StakedNodesUpdaterService,
110 tracer_thread_hdl: TracerThread,
111 tpu_vote_quic_t: thread::JoinHandle<()>,
112}
113
114impl Tpu {
115 #[allow(clippy::too_many_arguments)]
116 pub fn new_with_client(
117 cluster_info: &Arc<ClusterInfo>,
118 poh_recorder: &Arc<RwLock<PohRecorder>>,
119 transaction_recorder: TransactionRecorder,
120 entry_receiver: Receiver<WorkingBankEntry>,
121 retransmit_slots_receiver: Receiver<Slot>,
122 sockets: TpuSockets,
123 subscriptions: Option<Arc<RpcSubscriptions>>,
124 transaction_status_sender: Option<TransactionStatusSender>,
125 entry_notification_sender: Option<EntryNotifierSender>,
126 blockstore: Arc<Blockstore>,
127 broadcast_type: &BroadcastStageType,
128 xdp_sender: Option<XdpSender>,
129 exit: Arc<AtomicBool>,
130 shred_version: u16,
131 vote_tracker: Arc<VoteTracker>,
132 bank_forks: Arc<RwLock<BankForks>>,
133 verified_vote_sender: VerifiedVoteSender,
134 gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
135 replay_vote_receiver: ReplayVoteReceiver,
136 replay_vote_sender: ReplayVoteSender,
137 bank_notification_sender: Option<BankNotificationSenderConfig>,
138 tpu_coalesce: Duration,
139 duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
140 client: ForwardingClientOption,
141 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
142 keypair: &Keypair,
143 log_messages_bytes_limit: Option<usize>,
144 staked_nodes: &Arc<RwLock<StakedNodes>>,
145 shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
146 banking_tracer_channels: Channels,
147 tracer_thread_hdl: TracerThread,
148 tpu_enable_udp: bool,
149 tpu_quic_server_config: QuicServerParams,
150 tpu_fwd_quic_server_config: QuicServerParams,
151 vote_quic_server_config: QuicServerParams,
152 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
153 block_production_method: BlockProductionMethod,
154 block_production_num_workers: NonZeroUsize,
155 transaction_struct: TransactionStructure,
156 enable_block_production_forwarding: bool,
157 _generator_config: Option<GeneratorConfig>, key_notifiers: Arc<RwLock<KeyUpdaters>>,
159 ) -> Self {
160 let TpuSockets {
161 transactions: transactions_sockets,
162 transaction_forwards: tpu_forwards_sockets,
163 vote: tpu_vote_sockets,
164 broadcast: broadcast_sockets,
165 transactions_quic: transactions_quic_sockets,
166 transactions_forwards_quic: transactions_forwards_quic_sockets,
167 vote_quic: tpu_vote_quic_sockets,
168 vote_forwarding_client: vote_forwarding_client_socket,
169 vortexor_receivers,
170 } = sockets;
171
172 let (packet_sender, packet_receiver) = unbounded();
173 let (vote_packet_sender, vote_packet_receiver) = unbounded();
174 let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
175 let fetch_stage = FetchStage::new_with_sender(
176 transactions_sockets,
177 tpu_forwards_sockets,
178 tpu_vote_sockets,
179 exit.clone(),
180 &packet_sender,
181 &vote_packet_sender,
182 &forwarded_packet_sender,
183 forwarded_packet_receiver,
184 poh_recorder,
185 Some(tpu_coalesce),
186 Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
187 tpu_enable_udp,
188 );
189
190 let staked_nodes_updater_service = StakedNodesUpdaterService::new(
191 exit.clone(),
192 bank_forks.clone(),
193 staked_nodes.clone(),
194 shared_staked_nodes_overrides,
195 );
196
197 let Channels {
198 non_vote_sender,
199 non_vote_receiver,
200 tpu_vote_sender,
201 tpu_vote_receiver,
202 gossip_vote_sender,
203 gossip_vote_receiver,
204 } = banking_tracer_channels;
205
206 let SpawnServerResult {
208 endpoints: _,
209 thread: tpu_vote_quic_t,
210 key_updater: vote_streamer_key_updater,
211 } = spawn_server(
212 "solQuicTVo",
213 "quic_streamer_tpu_vote",
214 tpu_vote_quic_sockets,
215 keypair,
216 vote_packet_sender.clone(),
217 exit.clone(),
218 staked_nodes.clone(),
219 vote_quic_server_config,
220 )
221 .unwrap();
222
223 let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
224 let SpawnServerResult {
226 endpoints: _,
227 thread: tpu_quic_t,
228 key_updater,
229 } = spawn_server(
230 "solQuicTpu",
231 "quic_streamer_tpu",
232 transactions_quic_sockets,
233 keypair,
234 packet_sender,
235 exit.clone(),
236 staked_nodes.clone(),
237 tpu_quic_server_config,
238 )
239 .unwrap();
240 (Some(tpu_quic_t), Some(key_updater))
241 } else {
242 (None, None)
243 };
244
245 let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
246 let SpawnServerResult {
248 endpoints: _,
249 thread: tpu_forwards_quic_t,
250 key_updater: forwards_key_updater,
251 } = spawn_server(
252 "solQuicTpuFwd",
253 "quic_streamer_tpu_forwards",
254 transactions_forwards_quic_sockets,
255 keypair,
256 forwarded_packet_sender,
257 exit.clone(),
258 staked_nodes.clone(),
259 tpu_fwd_quic_server_config,
260 )
261 .unwrap();
262 (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
263 } else {
264 (None, None)
265 };
266
267 let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
268 let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
269 info!("starting vortexor adapter");
270 let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
271 let adapter = VortexorReceiverAdapter::new(
272 sockets,
273 Duration::from_millis(5),
274 tpu_coalesce,
275 non_vote_sender,
276 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
277 exit.clone(),
278 );
279 SigVerifier::Remote(adapter)
280 } else {
281 info!("starting regular sigverify stage");
282 let verifier = TransactionSigVerifier::new(
283 non_vote_sender,
284 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
285 );
286 SigVerifier::Local(SigVerifyStage::new(
287 packet_receiver,
288 verifier,
289 "solSigVerTpu",
290 "tpu-verifier",
291 ))
292 };
293
294 let vote_sigverify_stage = {
295 let verifier = TransactionSigVerifier::new_reject_non_vote(
296 tpu_vote_sender,
297 Some(forward_stage_sender),
298 );
299 SigVerifyStage::new(
300 vote_packet_receiver,
301 verifier,
302 "solSigVerTpuVot",
303 "tpu-vote-verifier",
304 )
305 };
306
307 let cluster_info_vote_listener = ClusterInfoVoteListener::new(
308 exit.clone(),
309 cluster_info.clone(),
310 gossip_vote_sender,
311 vote_tracker,
312 bank_forks.clone(),
313 subscriptions,
314 verified_vote_sender,
315 gossip_verified_vote_hash_sender,
316 replay_vote_receiver,
317 blockstore.clone(),
318 bank_notification_sender,
319 duplicate_confirmed_slot_sender,
320 );
321
322 let banking_stage = BankingStage::new_num_threads(
323 block_production_method,
324 transaction_struct,
325 poh_recorder.clone(),
326 transaction_recorder,
327 non_vote_receiver,
328 tpu_vote_receiver,
329 gossip_vote_receiver,
330 block_production_num_workers,
331 transaction_status_sender,
332 replay_vote_sender,
333 log_messages_bytes_limit,
334 bank_forks.clone(),
335 prioritization_fee_cache.clone(),
336 );
337
338 let SpawnForwardingStageResult {
339 join_handle: forwarding_stage,
340 client_updater,
341 } = spawn_forwarding_stage(
342 forward_stage_receiver,
343 client,
344 vote_forwarding_client_socket,
345 bank_forks.read().unwrap().sharable_root_bank(),
346 ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
347 DataBudget::default(),
348 );
349
350 let (entry_receiver, tpu_entry_notifier) =
351 if let Some(entry_notification_sender) = entry_notification_sender {
352 let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
353 let tpu_entry_notifier = TpuEntryNotifier::new(
354 entry_receiver,
355 entry_notification_sender,
356 broadcast_entry_sender,
357 exit.clone(),
358 );
359 (broadcast_entry_receiver, Some(tpu_entry_notifier))
360 } else {
361 (entry_receiver, None)
362 };
363
364 let broadcast_stage = broadcast_type.new_broadcast_stage(
365 broadcast_sockets,
366 cluster_info.clone(),
367 entry_receiver,
368 retransmit_slots_receiver,
369 exit,
370 blockstore,
371 bank_forks,
372 shred_version,
373 turbine_quic_endpoint_sender,
374 xdp_sender,
375 );
376
377 let mut key_notifiers = key_notifiers.write().unwrap();
378 if let Some(key_updater) = key_updater {
379 key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
380 }
381 if let Some(forwards_key_updater) = forwards_key_updater {
382 key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
383 }
384 key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
385
386 key_notifiers.add(KeyUpdaterType::Forward, client_updater);
387
388 Self {
389 fetch_stage,
390 sig_verifier,
391 vote_sigverify_stage,
392 banking_stage,
393 forwarding_stage,
394 cluster_info_vote_listener,
395 broadcast_stage,
396 tpu_quic_t,
397 tpu_forwards_quic_t,
398 tpu_entry_notifier,
399 staked_nodes_updater_service,
400 tracer_thread_hdl,
401 tpu_vote_quic_t,
402 }
403 }
404
405 pub fn join(self) -> thread::Result<()> {
406 let results = vec![
407 self.fetch_stage.join(),
408 self.sig_verifier.join(),
409 self.vote_sigverify_stage.join(),
410 self.cluster_info_vote_listener.join(),
411 self.banking_stage.join(),
412 self.forwarding_stage.join(),
413 self.staked_nodes_updater_service.join(),
414 self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
415 self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
416 self.tpu_vote_quic_t.join(),
417 ];
418 let broadcast_result = self.broadcast_stage.join();
419 for result in results {
420 result?;
421 }
422 if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
423 tpu_entry_notifier.join()?;
424 }
425 let _ = broadcast_result?;
426 if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
427 if let Err(tracer_result) = tracer_thread_hdl.join()? {
428 error!(
429 "banking tracer thread returned error after successful thread join: \
430 {tracer_result:?}"
431 );
432 }
433 }
434 Ok(())
435 }
436}