1#[deprecated(
6 since = "2.2.0",
7 note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
8)]
9pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
10pub use {
11 crate::forwarding_stage::ForwardingClientOption, solana_streamer::quic::DEFAULT_TPU_COALESCE,
12};
13use {
14 crate::{
15 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
16 banking_stage::BankingStage,
17 banking_trace::{Channels, TracerThread},
18 cluster_info_vote_listener::{
19 ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
20 VerifiedVoteSender, VoteTracker,
21 },
22 fetch_stage::FetchStage,
23 forwarding_stage::{
24 spawn_forwarding_stage, ForwardAddressGetter, SpawnForwardingStageResult,
25 },
26 sigverify::TransactionSigVerifier,
27 sigverify_stage::SigVerifyStage,
28 staked_nodes_updater_service::StakedNodesUpdaterService,
29 tpu_entry_notifier::TpuEntryNotifier,
30 validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
31 },
32 bytes::Bytes,
33 crossbeam_channel::{bounded, unbounded, Receiver},
34 solana_clock::Slot,
35 solana_gossip::cluster_info::ClusterInfo,
36 solana_keypair::Keypair,
37 solana_ledger::{
38 blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
39 entry_notifier_service::EntryNotifierSender,
40 },
41 solana_perf::data_budget::DataBudget,
42 solana_poh::{
43 poh_recorder::{PohRecorder, WorkingBankEntry},
44 transaction_recorder::TransactionRecorder,
45 },
46 solana_pubkey::Pubkey,
47 solana_rpc::{
48 optimistically_confirmed_bank_tracker::BankNotificationSender,
49 rpc_subscriptions::RpcSubscriptions,
50 },
51 solana_runtime::{
52 bank_forks::BankForks,
53 prioritization_fee_cache::PrioritizationFeeCache,
54 root_bank_cache::RootBankCache,
55 vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
56 },
57 solana_streamer::{
58 quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
59 streamer::StakedNodes,
60 },
61 solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
62 std::{
63 collections::HashMap,
64 net::{SocketAddr, UdpSocket},
65 sync::{atomic::AtomicBool, Arc, RwLock},
66 thread::{self, JoinHandle},
67 time::Duration,
68 },
69 tokio::sync::mpsc::Sender as AsyncSender,
70};
71
72pub struct TpuSockets {
73 pub transactions: Vec<UdpSocket>,
74 pub transaction_forwards: Vec<UdpSocket>,
75 pub vote: Vec<UdpSocket>,
76 pub broadcast: Vec<UdpSocket>,
77 pub transactions_quic: Vec<UdpSocket>,
78 pub transactions_forwards_quic: Vec<UdpSocket>,
79 pub vote_quic: Vec<UdpSocket>,
80 pub vote_forwarding_client: UdpSocket,
82}
83
84pub struct Tpu {
85 fetch_stage: FetchStage,
86 sigverify_stage: SigVerifyStage,
87 vote_sigverify_stage: SigVerifyStage,
88 banking_stage: BankingStage,
89 forwarding_stage: JoinHandle<()>,
90 cluster_info_vote_listener: ClusterInfoVoteListener,
91 broadcast_stage: BroadcastStage,
92 tpu_quic_t: thread::JoinHandle<()>,
93 tpu_forwards_quic_t: thread::JoinHandle<()>,
94 tpu_entry_notifier: Option<TpuEntryNotifier>,
95 staked_nodes_updater_service: StakedNodesUpdaterService,
96 tracer_thread_hdl: TracerThread,
97 tpu_vote_quic_t: thread::JoinHandle<()>,
98}
99
100impl Tpu {
101 #[allow(clippy::too_many_arguments)]
102 pub fn new_with_client(
103 cluster_info: &Arc<ClusterInfo>,
104 poh_recorder: &Arc<RwLock<PohRecorder>>,
105 transaction_recorder: TransactionRecorder,
106 entry_receiver: Receiver<WorkingBankEntry>,
107 retransmit_slots_receiver: Receiver<Slot>,
108 sockets: TpuSockets,
109 subscriptions: &Arc<RpcSubscriptions>,
110 transaction_status_sender: Option<TransactionStatusSender>,
111 entry_notification_sender: Option<EntryNotifierSender>,
112 blockstore: Arc<Blockstore>,
113 broadcast_type: &BroadcastStageType,
114 exit: Arc<AtomicBool>,
115 shred_version: u16,
116 vote_tracker: Arc<VoteTracker>,
117 bank_forks: Arc<RwLock<BankForks>>,
118 verified_vote_sender: VerifiedVoteSender,
119 gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
120 replay_vote_receiver: ReplayVoteReceiver,
121 replay_vote_sender: ReplayVoteSender,
122 bank_notification_sender: Option<BankNotificationSender>,
123 tpu_coalesce: Duration,
124 duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
125 client: ForwardingClientOption,
126 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
127 keypair: &Keypair,
128 log_messages_bytes_limit: Option<usize>,
129 staked_nodes: &Arc<RwLock<StakedNodes>>,
130 shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
131 banking_tracer_channels: Channels,
132 tracer_thread_hdl: TracerThread,
133 tpu_enable_udp: bool,
134 tpu_quic_server_config: QuicServerParams,
135 tpu_fwd_quic_server_config: QuicServerParams,
136 vote_quic_server_config: QuicServerParams,
137 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
138 block_production_method: BlockProductionMethod,
139 transaction_struct: TransactionStructure,
140 enable_block_production_forwarding: bool,
141 _generator_config: Option<GeneratorConfig>, key_notifiers: Arc<RwLock<KeyUpdaters>>,
143 ) -> Self {
144 let TpuSockets {
145 transactions: transactions_sockets,
146 transaction_forwards: tpu_forwards_sockets,
147 vote: tpu_vote_sockets,
148 broadcast: broadcast_sockets,
149 transactions_quic: transactions_quic_sockets,
150 transactions_forwards_quic: transactions_forwards_quic_sockets,
151 vote_quic: tpu_vote_quic_sockets,
152 vote_forwarding_client: vote_forwarding_client_socket,
153 } = sockets;
154
155 let (packet_sender, packet_receiver) = unbounded();
156 let (vote_packet_sender, vote_packet_receiver) = unbounded();
157 let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
158 let fetch_stage = FetchStage::new_with_sender(
159 transactions_sockets,
160 tpu_forwards_sockets,
161 tpu_vote_sockets,
162 exit.clone(),
163 &packet_sender,
164 &vote_packet_sender,
165 &forwarded_packet_sender,
166 forwarded_packet_receiver,
167 poh_recorder,
168 Some(tpu_coalesce),
169 Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
170 tpu_enable_udp,
171 );
172
173 let staked_nodes_updater_service = StakedNodesUpdaterService::new(
174 exit.clone(),
175 bank_forks.clone(),
176 staked_nodes.clone(),
177 shared_staked_nodes_overrides,
178 );
179
180 let Channels {
181 non_vote_sender,
182 non_vote_receiver,
183 tpu_vote_sender,
184 tpu_vote_receiver,
185 gossip_vote_sender,
186 gossip_vote_receiver,
187 } = banking_tracer_channels;
188
189 let SpawnServerResult {
191 endpoints: _,
192 thread: tpu_vote_quic_t,
193 key_updater: vote_streamer_key_updater,
194 } = spawn_server_multi(
195 "solQuicTVo",
196 "quic_streamer_tpu_vote",
197 tpu_vote_quic_sockets,
198 keypair,
199 vote_packet_sender.clone(),
200 exit.clone(),
201 staked_nodes.clone(),
202 vote_quic_server_config,
203 )
204 .unwrap();
205
206 let SpawnServerResult {
208 endpoints: _,
209 thread: tpu_quic_t,
210 key_updater,
211 } = spawn_server_multi(
212 "solQuicTpu",
213 "quic_streamer_tpu",
214 transactions_quic_sockets,
215 keypair,
216 packet_sender,
217 exit.clone(),
218 staked_nodes.clone(),
219 tpu_quic_server_config,
220 )
221 .unwrap();
222
223 let SpawnServerResult {
225 endpoints: _,
226 thread: tpu_forwards_quic_t,
227 key_updater: forwards_key_updater,
228 } = spawn_server_multi(
229 "solQuicTpuFwd",
230 "quic_streamer_tpu_forwards",
231 transactions_forwards_quic_sockets,
232 keypair,
233 forwarded_packet_sender,
234 exit.clone(),
235 staked_nodes.clone(),
236 tpu_fwd_quic_server_config,
237 )
238 .unwrap();
239
240 let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
241 let sigverify_stage = {
242 let verifier = TransactionSigVerifier::new(
243 non_vote_sender,
244 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
245 );
246 SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
247 };
248
249 let vote_sigverify_stage = {
250 let verifier = TransactionSigVerifier::new_reject_non_vote(
251 tpu_vote_sender,
252 Some(forward_stage_sender),
253 );
254 SigVerifyStage::new(
255 vote_packet_receiver,
256 verifier,
257 "solSigVerTpuVot",
258 "tpu-vote-verifier",
259 )
260 };
261
262 let cluster_info_vote_listener = ClusterInfoVoteListener::new(
263 exit.clone(),
264 cluster_info.clone(),
265 gossip_vote_sender,
266 vote_tracker,
267 bank_forks.clone(),
268 subscriptions.clone(),
269 verified_vote_sender,
270 gossip_verified_vote_hash_sender,
271 replay_vote_receiver,
272 blockstore.clone(),
273 bank_notification_sender,
274 duplicate_confirmed_slot_sender,
275 );
276
277 let banking_stage = BankingStage::new(
278 block_production_method,
279 transaction_struct,
280 cluster_info,
281 poh_recorder,
282 transaction_recorder,
283 non_vote_receiver,
284 tpu_vote_receiver,
285 gossip_vote_receiver,
286 transaction_status_sender,
287 replay_vote_sender,
288 log_messages_bytes_limit,
289 bank_forks.clone(),
290 prioritization_fee_cache,
291 );
292
293 let SpawnForwardingStageResult {
294 join_handle: forwarding_stage,
295 client_updater,
296 } = spawn_forwarding_stage(
297 forward_stage_receiver,
298 client,
299 vote_forwarding_client_socket,
300 RootBankCache::new(bank_forks.clone()),
301 ForwardAddressGetter::new(cluster_info.clone(), poh_recorder.clone()),
302 DataBudget::default(),
303 );
304
305 let (entry_receiver, tpu_entry_notifier) =
306 if let Some(entry_notification_sender) = entry_notification_sender {
307 let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
308 let tpu_entry_notifier = TpuEntryNotifier::new(
309 entry_receiver,
310 entry_notification_sender,
311 broadcast_entry_sender,
312 exit.clone(),
313 );
314 (broadcast_entry_receiver, Some(tpu_entry_notifier))
315 } else {
316 (entry_receiver, None)
317 };
318
319 let broadcast_stage = broadcast_type.new_broadcast_stage(
320 broadcast_sockets,
321 cluster_info.clone(),
322 entry_receiver,
323 retransmit_slots_receiver,
324 exit,
325 blockstore,
326 bank_forks,
327 shred_version,
328 turbine_quic_endpoint_sender,
329 );
330
331 let mut key_notifiers = key_notifiers.write().unwrap();
332 key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
333 key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
334 key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
335 key_notifiers.add(KeyUpdaterType::Forward, client_updater);
336
337 Self {
338 fetch_stage,
339 sigverify_stage,
340 vote_sigverify_stage,
341 banking_stage,
342 forwarding_stage,
343 cluster_info_vote_listener,
344 broadcast_stage,
345 tpu_quic_t,
346 tpu_forwards_quic_t,
347 tpu_entry_notifier,
348 staked_nodes_updater_service,
349 tracer_thread_hdl,
350 tpu_vote_quic_t,
351 }
352 }
353
354 pub fn join(self) -> thread::Result<()> {
355 let results = vec![
356 self.fetch_stage.join(),
357 self.sigverify_stage.join(),
358 self.vote_sigverify_stage.join(),
359 self.cluster_info_vote_listener.join(),
360 self.banking_stage.join(),
361 self.forwarding_stage.join(),
362 self.staked_nodes_updater_service.join(),
363 self.tpu_quic_t.join(),
364 self.tpu_forwards_quic_t.join(),
365 self.tpu_vote_quic_t.join(),
366 ];
367 let broadcast_result = self.broadcast_stage.join();
368 for result in results {
369 result?;
370 }
371 if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
372 tpu_entry_notifier.join()?;
373 }
374 let _ = broadcast_result?;
375 if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
376 if let Err(tracer_result) = tracer_thread_hdl.join()? {
377 error!(
378 "banking tracer thread returned error after successful thread join: {:?}",
379 tracer_result
380 );
381 }
382 }
383 Ok(())
384 }
385}