1pub use solana_sdk::net::DEFAULT_TPU_COALESCE;
5#[deprecated(
7 since = "2.2.0",
8 note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
9)]
10pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
11use {
12 crate::{
13 banking_stage::BankingStage,
14 banking_trace::{Channels, TracerThread},
15 cluster_info_vote_listener::{
16 ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
17 VerifiedVoteSender, VoteTracker,
18 },
19 fetch_stage::FetchStage,
20 sigverify::TransactionSigVerifier,
21 sigverify_stage::SigVerifyStage,
22 staked_nodes_updater_service::StakedNodesUpdaterService,
23 tpu_entry_notifier::TpuEntryNotifier,
24 validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
25 },
26 bytes::Bytes,
27 crossbeam_channel::{unbounded, Receiver},
28 solana_client::connection_cache::ConnectionCache,
29 solana_gossip::cluster_info::ClusterInfo,
30 solana_ledger::{
31 blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
32 entry_notifier_service::EntryNotifierSender,
33 },
34 solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
35 solana_rpc::{
36 optimistically_confirmed_bank_tracker::BankNotificationSender,
37 rpc_subscriptions::RpcSubscriptions,
38 },
39 solana_runtime::{
40 bank_forks::BankForks,
41 prioritization_fee_cache::PrioritizationFeeCache,
42 vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
43 },
44 solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
45 solana_streamer::{
46 quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
47 streamer::StakedNodes,
48 },
49 solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
50 std::{
51 collections::HashMap,
52 net::{SocketAddr, UdpSocket},
53 sync::{atomic::AtomicBool, Arc, RwLock},
54 thread,
55 time::Duration,
56 },
57 tokio::sync::mpsc::Sender as AsyncSender,
58};
59
60pub struct TpuSockets {
61 pub transactions: Vec<UdpSocket>,
62 pub transaction_forwards: Vec<UdpSocket>,
63 pub vote: Vec<UdpSocket>,
64 pub broadcast: Vec<UdpSocket>,
65 pub transactions_quic: Vec<UdpSocket>,
66 pub transactions_forwards_quic: Vec<UdpSocket>,
67 pub vote_quic: Vec<UdpSocket>,
68}
69
70pub struct Tpu {
71 fetch_stage: FetchStage,
72 sigverify_stage: SigVerifyStage,
73 vote_sigverify_stage: SigVerifyStage,
74 banking_stage: BankingStage,
75 cluster_info_vote_listener: ClusterInfoVoteListener,
76 broadcast_stage: BroadcastStage,
77 tpu_quic_t: thread::JoinHandle<()>,
78 tpu_forwards_quic_t: thread::JoinHandle<()>,
79 tpu_entry_notifier: Option<TpuEntryNotifier>,
80 staked_nodes_updater_service: StakedNodesUpdaterService,
81 tracer_thread_hdl: TracerThread,
82 tpu_vote_quic_t: thread::JoinHandle<()>,
83}
84
85impl Tpu {
86 #[allow(clippy::too_many_arguments)]
87 pub fn new(
88 cluster_info: &Arc<ClusterInfo>,
89 poh_recorder: &Arc<RwLock<PohRecorder>>,
90 entry_receiver: Receiver<WorkingBankEntry>,
91 retransmit_slots_receiver: Receiver<Slot>,
92 sockets: TpuSockets,
93 subscriptions: &Arc<RpcSubscriptions>,
94 transaction_status_sender: Option<TransactionStatusSender>,
95 entry_notification_sender: Option<EntryNotifierSender>,
96 blockstore: Arc<Blockstore>,
97 broadcast_type: &BroadcastStageType,
98 exit: Arc<AtomicBool>,
99 shred_version: u16,
100 vote_tracker: Arc<VoteTracker>,
101 bank_forks: Arc<RwLock<BankForks>>,
102 verified_vote_sender: VerifiedVoteSender,
103 gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
104 replay_vote_receiver: ReplayVoteReceiver,
105 replay_vote_sender: ReplayVoteSender,
106 bank_notification_sender: Option<BankNotificationSender>,
107 tpu_coalesce: Duration,
108 duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender,
109 connection_cache: &Arc<ConnectionCache>,
110 turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
111 keypair: &Keypair,
112 log_messages_bytes_limit: Option<usize>,
113 staked_nodes: &Arc<RwLock<StakedNodes>>,
114 shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
115 banking_tracer_channels: Channels,
116 tracer_thread_hdl: TracerThread,
117 tpu_enable_udp: bool,
118 tpu_quic_server_config: QuicServerParams,
119 tpu_fwd_quic_server_config: QuicServerParams,
120 vote_quic_server_config: QuicServerParams,
121 prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
122 block_production_method: BlockProductionMethod,
123 transaction_struct: TransactionStructure,
124 enable_block_production_forwarding: bool,
125 _generator_config: Option<GeneratorConfig>, ) -> (Self, Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>) {
127 let TpuSockets {
128 transactions: transactions_sockets,
129 transaction_forwards: tpu_forwards_sockets,
130 vote: tpu_vote_sockets,
131 broadcast: broadcast_sockets,
132 transactions_quic: transactions_quic_sockets,
133 transactions_forwards_quic: transactions_forwards_quic_sockets,
134 vote_quic: tpu_vote_quic_sockets,
135 } = sockets;
136
137 let (packet_sender, packet_receiver) = unbounded();
138 let (vote_packet_sender, vote_packet_receiver) = unbounded();
139 let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
140 let fetch_stage = FetchStage::new_with_sender(
141 transactions_sockets,
142 tpu_forwards_sockets,
143 tpu_vote_sockets,
144 exit.clone(),
145 &packet_sender,
146 &vote_packet_sender,
147 &forwarded_packet_sender,
148 forwarded_packet_receiver,
149 poh_recorder,
150 tpu_coalesce,
151 Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
152 tpu_enable_udp,
153 );
154
155 let staked_nodes_updater_service = StakedNodesUpdaterService::new(
156 exit.clone(),
157 bank_forks.clone(),
158 staked_nodes.clone(),
159 shared_staked_nodes_overrides,
160 );
161
162 let Channels {
163 non_vote_sender,
164 non_vote_receiver,
165 tpu_vote_sender,
166 tpu_vote_receiver,
167 gossip_vote_sender,
168 gossip_vote_receiver,
169 } = banking_tracer_channels;
170
171 let SpawnServerResult {
173 endpoints: _,
174 thread: tpu_vote_quic_t,
175 key_updater: vote_streamer_key_updater,
176 } = spawn_server_multi(
177 "solQuicTVo",
178 "quic_streamer_tpu_vote",
179 tpu_vote_quic_sockets,
180 keypair,
181 vote_packet_sender.clone(),
182 exit.clone(),
183 staked_nodes.clone(),
184 vote_quic_server_config,
185 )
186 .unwrap();
187
188 let SpawnServerResult {
190 endpoints: _,
191 thread: tpu_quic_t,
192 key_updater,
193 } = spawn_server_multi(
194 "solQuicTpu",
195 "quic_streamer_tpu",
196 transactions_quic_sockets,
197 keypair,
198 packet_sender,
199 exit.clone(),
200 staked_nodes.clone(),
201 tpu_quic_server_config,
202 )
203 .unwrap();
204
205 let SpawnServerResult {
207 endpoints: _,
208 thread: tpu_forwards_quic_t,
209 key_updater: forwards_key_updater,
210 } = spawn_server_multi(
211 "solQuicTpuFwd",
212 "quic_streamer_tpu_forwards",
213 transactions_forwards_quic_sockets,
214 keypair,
215 forwarded_packet_sender,
216 exit.clone(),
217 staked_nodes.clone(),
218 tpu_fwd_quic_server_config,
219 )
220 .unwrap();
221
222 let sigverify_stage = {
223 let verifier = TransactionSigVerifier::new(non_vote_sender);
224 SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
225 };
226
227 let vote_sigverify_stage = {
228 let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
229 SigVerifyStage::new(
230 vote_packet_receiver,
231 verifier,
232 "solSigVerTpuVot",
233 "tpu-vote-verifier",
234 )
235 };
236
237 let cluster_info_vote_listener = ClusterInfoVoteListener::new(
238 exit.clone(),
239 cluster_info.clone(),
240 gossip_vote_sender,
241 vote_tracker,
242 bank_forks.clone(),
243 subscriptions.clone(),
244 verified_vote_sender,
245 gossip_verified_vote_hash_sender,
246 replay_vote_receiver,
247 blockstore.clone(),
248 bank_notification_sender,
249 duplicate_confirmed_slot_sender,
250 );
251
252 let banking_stage = BankingStage::new(
253 block_production_method,
254 transaction_struct,
255 cluster_info,
256 poh_recorder,
257 non_vote_receiver,
258 tpu_vote_receiver,
259 gossip_vote_receiver,
260 transaction_status_sender,
261 replay_vote_sender,
262 log_messages_bytes_limit,
263 connection_cache.clone(),
264 bank_forks.clone(),
265 prioritization_fee_cache,
266 enable_block_production_forwarding,
267 );
268
269 let (entry_receiver, tpu_entry_notifier) =
270 if let Some(entry_notification_sender) = entry_notification_sender {
271 let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
272 let tpu_entry_notifier = TpuEntryNotifier::new(
273 entry_receiver,
274 entry_notification_sender,
275 broadcast_entry_sender,
276 exit.clone(),
277 );
278 (broadcast_entry_receiver, Some(tpu_entry_notifier))
279 } else {
280 (entry_receiver, None)
281 };
282
283 let broadcast_stage = broadcast_type.new_broadcast_stage(
284 broadcast_sockets,
285 cluster_info.clone(),
286 entry_receiver,
287 retransmit_slots_receiver,
288 exit,
289 blockstore,
290 bank_forks,
291 shred_version,
292 turbine_quic_endpoint_sender,
293 );
294
295 (
296 Self {
297 fetch_stage,
298 sigverify_stage,
299 vote_sigverify_stage,
300 banking_stage,
301 cluster_info_vote_listener,
302 broadcast_stage,
303 tpu_quic_t,
304 tpu_forwards_quic_t,
305 tpu_entry_notifier,
306 staked_nodes_updater_service,
307 tracer_thread_hdl,
308 tpu_vote_quic_t,
309 },
310 vec![key_updater, forwards_key_updater, vote_streamer_key_updater],
311 )
312 }
313
314 pub fn join(self) -> thread::Result<()> {
315 let results = vec![
316 self.fetch_stage.join(),
317 self.sigverify_stage.join(),
318 self.vote_sigverify_stage.join(),
319 self.cluster_info_vote_listener.join(),
320 self.banking_stage.join(),
321 self.staked_nodes_updater_service.join(),
322 self.tpu_quic_t.join(),
323 self.tpu_forwards_quic_t.join(),
324 self.tpu_vote_quic_t.join(),
325 ];
326 let broadcast_result = self.broadcast_stage.join();
327 for result in results {
328 result?;
329 }
330 if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
331 tpu_entry_notifier.join()?;
332 }
333 let _ = broadcast_result?;
334 if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
335 if let Err(tracer_result) = tracer_thread_hdl.join()? {
336 error!(
337 "banking tracer thread returned error after successful thread join: {:?}",
338 tracer_result
339 );
340 }
341 }
342 Ok(())
343 }
344}