1pub use crate::forwarding_stage::ForwardingClientOption;
5use {
6 crate::{
7 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
8 banking_stage::{
9 transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
10 },
11 banking_trace::{Channels, TracerThread},
12 cluster_info_vote_listener::{
13 ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
14 VerifiedVoteSender, VoteTracker,
15 },
16 fetch_stage::FetchStage,
17 forwarding_stage::{
18 spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
19 },
20 sigverify::TransactionSigVerifier,
21 sigverify_stage::SigVerifyStage,
22 staked_nodes_updater_service::StakedNodesUpdaterService,
23 tpu_entry_notifier::TpuEntryNotifier,
24 validator::{BlockProductionMethod, GeneratorConfig},
25 vortexor_receiver_adapter::VortexorReceiverAdapter,
26 },
27 bytes::Bytes,
28 crossbeam_channel::{bounded, unbounded, Receiver},
29 solana_clock::Slot,
30 solana_gossip::cluster_info::ClusterInfo,
31 solana_keypair::Keypair,
32 solana_ledger::{
33 blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
34 entry_notifier_service::EntryNotifierSender,
35 },
36 solana_perf::data_budget::DataBudget,
37 solana_poh::{
38 poh_recorder::{PohRecorder, WorkingBankEntry},
39 transaction_recorder::TransactionRecorder,
40 },
41 solana_pubkey::Pubkey,
42 solana_rpc::{
43 optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
44 rpc_subscriptions::RpcSubscriptions,
45 },
46 solana_runtime::{
47 bank_forks::BankForks,
48 prioritization_fee_cache::PrioritizationFeeCache,
49 vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
50 },
51 solana_streamer::{
52 quic::{
53 spawn_server_with_cancel, spawn_simple_qos_server_with_cancel,
54 SimpleQosQuicStreamerConfig, SpawnServerResult, SwQosQuicStreamerConfig,
55 },
56 streamer::StakedNodes,
57 },
58 solana_turbine::{
59 broadcast_stage::{BroadcastStage, BroadcastStageType},
60 xdp::XdpSender,
61 },
62 std::{
63 collections::HashMap,
64 net::{SocketAddr, UdpSocket},
65 num::NonZeroUsize,
66 sync::{atomic::AtomicBool, Arc, RwLock},
67 thread::{self, JoinHandle},
68 time::Duration,
69 },
70 tokio::sync::mpsc::Sender as AsyncSender,
71 tokio_util::sync::CancellationToken,
72};
73
74pub struct TpuSockets {
75 pub transactions: Vec<UdpSocket>,
76 pub transaction_forwards: Vec<UdpSocket>,
77 pub vote: Vec<UdpSocket>,
78 pub broadcast: Vec<UdpSocket>,
79 pub transactions_quic: Vec<UdpSocket>,
80 pub transactions_forwards_quic: Vec<UdpSocket>,
81 pub vote_quic: Vec<UdpSocket>,
82 pub vote_forwarding_client: UdpSocket,
84 pub vortexor_receivers: Option<Vec<UdpSocket>>,
85}
86
87enum SigVerifier {
89 Local(SigVerifyStage),
90 Remote(VortexorReceiverAdapter),
91}
92
93impl SigVerifier {
94 fn join(self) -> thread::Result<()> {
95 match self {
96 SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
97 SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
98 }
99 }
100}
101
102pub const MAX_VOTES_PER_SECOND: u64 = 20;
104
105pub struct Tpu {
106 fetch_stage: FetchStage,
107 sig_verifier: SigVerifier,
108 vote_sigverify_stage: SigVerifyStage,
109 banking_stage: Arc<RwLock<Option<BankingStage>>>,
110 forwarding_stage: JoinHandle<()>,
111 cluster_info_vote_listener: ClusterInfoVoteListener,
112 broadcast_stage: BroadcastStage,
113 tpu_quic_t: Option<thread::JoinHandle<()>>,
114 tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
115 tpu_entry_notifier: Option<TpuEntryNotifier>,
116 staked_nodes_updater_service: StakedNodesUpdaterService,
117 tracer_thread_hdl: TracerThread,
118 tpu_vote_quic_t: thread::JoinHandle<()>,
119}
120
121impl Tpu {
122 #[allow(clippy::too_many_arguments)]
123 pub fn new_with_client(
124 cluster_info: &Arc<ClusterInfo>,
125 poh_recorder: &Arc<RwLock<PohRecorder>>,
126 transaction_recorder: TransactionRecorder,
127 entry_receiver: Receiver<WorkingBankEntry>,
128 retransmit_slots_receiver: Receiver<Slot>,
129 sockets: TpuSockets,
130 subscriptions: Option<Arc<RpcSubscriptions>>,
131 transaction_status_sender: Option<TransactionStatusSender>,
132 entry_notification_sender: Option<EntryNotifierSender>,
133 blockstore: Arc<Blockstore>,
134 broadcast_type: &BroadcastStageType,
135 xdp_sender: Option<XdpSender>,
136 exit: Arc<AtomicBool>,
137 shred_version: u16,
138 vote_tracker: Arc<VoteTracker>,
139 bank_forks: Arc<RwLock<BankForks>>,
140 verified_vote_sender: VerifiedVoteSender,
141 gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
142 replay_vote_receiver: ReplayVoteReceiver,
143 replay_vote_sender: ReplayVoteSender,
144 bank_notification_sender: Option<BankNotificationSenderConfig>,
145 duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
146 client: ForwardingClientOption,
147 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
148 keypair: &Keypair,
149 log_messages_bytes_limit: Option<usize>,
150 staked_nodes: &Arc<RwLock<StakedNodes>>,
151 shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
152 banking_tracer_channels: Channels,
153 tracer_thread_hdl: TracerThread,
154 tpu_enable_udp: bool,
155 tpu_quic_server_config: SwQosQuicStreamerConfig,
156 tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
157 vote_quic_server_config: SimpleQosQuicStreamerConfig,
158 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
159 block_production_method: BlockProductionMethod,
160 block_production_num_workers: NonZeroUsize,
161 block_production_scheduler_config: SchedulerConfig,
162 enable_block_production_forwarding: bool,
163 _generator_config: Option<GeneratorConfig>, key_notifiers: Arc<RwLock<KeyUpdaters>>,
165 cancel: CancellationToken,
166 ) -> Self {
167 let TpuSockets {
168 transactions: transactions_sockets,
169 transaction_forwards: tpu_forwards_sockets,
170 vote: tpu_vote_sockets,
171 broadcast: broadcast_sockets,
172 transactions_quic: transactions_quic_sockets,
173 transactions_forwards_quic: transactions_forwards_quic_sockets,
174 vote_quic: tpu_vote_quic_sockets,
175 vote_forwarding_client: vote_forwarding_client_socket,
176 vortexor_receivers,
177 } = sockets;
178
179 let (packet_sender, packet_receiver) = unbounded();
180 let (vote_packet_sender, vote_packet_receiver) = unbounded();
181 let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
182 let fetch_stage = FetchStage::new_with_sender(
183 transactions_sockets,
184 tpu_forwards_sockets,
185 tpu_vote_sockets,
186 exit.clone(),
187 &packet_sender,
188 &vote_packet_sender,
189 &forwarded_packet_sender,
190 forwarded_packet_receiver,
191 poh_recorder,
192 None, Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
194 tpu_enable_udp,
195 );
196
197 let staked_nodes_updater_service = StakedNodesUpdaterService::new(
198 exit.clone(),
199 bank_forks.clone(),
200 staked_nodes.clone(),
201 shared_staked_nodes_overrides,
202 );
203
204 let Channels {
205 non_vote_sender,
206 non_vote_receiver,
207 tpu_vote_sender,
208 tpu_vote_receiver,
209 gossip_vote_sender,
210 gossip_vote_receiver,
211 } = banking_tracer_channels;
212
213 let SpawnServerResult {
215 endpoints: _,
216 thread: tpu_vote_quic_t,
217 key_updater: vote_streamer_key_updater,
218 } = spawn_simple_qos_server_with_cancel(
219 "solQuicTVo",
220 "quic_streamer_tpu_vote",
221 tpu_vote_quic_sockets,
222 keypair,
223 vote_packet_sender.clone(),
224 staked_nodes.clone(),
225 vote_quic_server_config.quic_streamer_config,
226 vote_quic_server_config.qos_config,
227 cancel.clone(),
228 )
229 .unwrap();
230
231 let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
232 let SpawnServerResult {
234 endpoints: _,
235 thread: tpu_quic_t,
236 key_updater,
237 } = spawn_server_with_cancel(
238 "solQuicTpu",
239 "quic_streamer_tpu",
240 transactions_quic_sockets,
241 keypair,
242 packet_sender,
243 staked_nodes.clone(),
244 tpu_quic_server_config.quic_streamer_config,
245 tpu_quic_server_config.qos_config,
246 cancel.clone(),
247 )
248 .unwrap();
249 (Some(tpu_quic_t), Some(key_updater))
250 } else {
251 (None, None)
252 };
253
254 let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
255 let SpawnServerResult {
257 endpoints: _,
258 thread: tpu_forwards_quic_t,
259 key_updater: forwards_key_updater,
260 } = spawn_server_with_cancel(
261 "solQuicTpuFwd",
262 "quic_streamer_tpu_forwards",
263 transactions_forwards_quic_sockets,
264 keypair,
265 forwarded_packet_sender,
266 staked_nodes.clone(),
267 tpu_fwd_quic_server_config.quic_streamer_config,
268 tpu_fwd_quic_server_config.qos_config,
269 cancel,
270 )
271 .unwrap();
272 (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
273 } else {
274 (None, None)
275 };
276
277 let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
278 let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
279 info!("starting vortexor adapter");
280 let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
281 let adapter = VortexorReceiverAdapter::new(
282 sockets,
283 Duration::from_millis(5),
284 non_vote_sender,
285 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
286 exit.clone(),
287 );
288 SigVerifier::Remote(adapter)
289 } else {
290 info!("starting regular sigverify stage");
291 let verifier = TransactionSigVerifier::new(
292 non_vote_sender,
293 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
294 );
295 SigVerifier::Local(SigVerifyStage::new(
296 packet_receiver,
297 verifier,
298 "solSigVerTpu",
299 "tpu-verifier",
300 ))
301 };
302
303 let vote_sigverify_stage = {
304 let verifier = TransactionSigVerifier::new_reject_non_vote(
305 tpu_vote_sender,
306 Some(forward_stage_sender),
307 );
308 SigVerifyStage::new(
309 vote_packet_receiver,
310 verifier,
311 "solSigVerTpuVot",
312 "tpu-vote-verifier",
313 )
314 };
315
316 let cluster_info_vote_listener = ClusterInfoVoteListener::new(
317 exit.clone(),
318 cluster_info.clone(),
319 gossip_vote_sender,
320 vote_tracker,
321 bank_forks.clone(),
322 subscriptions,
323 verified_vote_sender,
324 gossip_verified_vote_hash_sender,
325 replay_vote_receiver,
326 blockstore.clone(),
327 bank_notification_sender,
328 duplicate_confirmed_slot_sender,
329 );
330
331 let banking_stage = BankingStage::new_num_threads(
332 block_production_method,
333 poh_recorder.clone(),
334 transaction_recorder,
335 non_vote_receiver,
336 tpu_vote_receiver,
337 gossip_vote_receiver,
338 block_production_num_workers,
339 block_production_scheduler_config,
340 transaction_status_sender,
341 replay_vote_sender,
342 log_messages_bytes_limit,
343 bank_forks.clone(),
344 prioritization_fee_cache.clone(),
345 );
346
347 let SpawnForwardingStageResult {
348 join_handle: forwarding_stage,
349 client_updater,
350 } = spawn_forwarding_stage(
351 forward_stage_receiver,
352 client,
353 vote_forwarding_client_socket,
354 bank_forks.read().unwrap().sharable_banks(),
355 ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
356 DataBudget::default(),
357 );
358
359 let (entry_receiver, tpu_entry_notifier) =
360 if let Some(entry_notification_sender) = entry_notification_sender {
361 let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
362 let tpu_entry_notifier = TpuEntryNotifier::new(
363 entry_receiver,
364 entry_notification_sender,
365 broadcast_entry_sender,
366 exit.clone(),
367 );
368 (broadcast_entry_receiver, Some(tpu_entry_notifier))
369 } else {
370 (entry_receiver, None)
371 };
372
373 let broadcast_stage = broadcast_type.new_broadcast_stage(
374 broadcast_sockets,
375 cluster_info.clone(),
376 entry_receiver,
377 retransmit_slots_receiver,
378 exit,
379 blockstore,
380 bank_forks,
381 shred_version,
382 turbine_quic_endpoint_sender,
383 xdp_sender,
384 );
385
386 let mut key_notifiers = key_notifiers.write().unwrap();
387 if let Some(key_updater) = key_updater {
388 key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
389 }
390 if let Some(forwards_key_updater) = forwards_key_updater {
391 key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
392 }
393 key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
394
395 key_notifiers.add(KeyUpdaterType::Forward, client_updater);
396
397 Self {
398 fetch_stage,
399 sig_verifier,
400 vote_sigverify_stage,
401 banking_stage: Arc::new(RwLock::new(Some(banking_stage))),
402 forwarding_stage,
403 cluster_info_vote_listener,
404 broadcast_stage,
405 tpu_quic_t,
406 tpu_forwards_quic_t,
407 tpu_entry_notifier,
408 staked_nodes_updater_service,
409 tracer_thread_hdl,
410 tpu_vote_quic_t,
411 }
412 }
413
414 pub fn banking_stage(&self) -> Arc<RwLock<Option<BankingStage>>> {
415 self.banking_stage.clone()
416 }
417
418 pub fn join(self) -> thread::Result<()> {
419 let results = vec![
420 self.fetch_stage.join(),
421 self.sig_verifier.join(),
422 self.vote_sigverify_stage.join(),
423 self.cluster_info_vote_listener.join(),
424 self.banking_stage
425 .write()
426 .unwrap()
427 .take()
428 .expect("banking_stage must be Some")
429 .join(),
430 self.forwarding_stage.join(),
431 self.staked_nodes_updater_service.join(),
432 self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
433 self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
434 self.tpu_vote_quic_t.join(),
435 ];
436 let broadcast_result = self.broadcast_stage.join();
437 for result in results {
438 result?;
439 }
440 if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
441 tpu_entry_notifier.join()?;
442 }
443 let _ = broadcast_result?;
444 if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
445 if let Err(tracer_result) = tracer_thread_hdl.join()? {
446 error!(
447 "banking tracer thread returned error after successful thread join: \
448 {tracer_result:?}"
449 );
450 }
451 }
452 Ok(())
453 }
454}