1pub use crate::nonblocking::tpu_client::TpuSenderError;
2use {
3 crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
4 rayon::iter::{IntoParallelIterator, ParallelIterator},
5 solana_client_traits::AsyncClient,
6 solana_clock::Slot,
7 solana_connection_cache::{
8 client_connection::ClientConnection,
9 connection_cache::{
10 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
11 },
12 },
13 solana_net_utils::bind_to_unspecified,
14 solana_rpc_client::rpc_client::RpcClient,
15 solana_signature::Signature,
16 solana_transaction::{versioned::VersionedTransaction, Transaction},
17 solana_transaction_error::{TransportError, TransportResult},
18 std::{
19 collections::VecDeque,
20 net::UdpSocket,
21 sync::{Arc, RwLock},
22 },
23};
24#[cfg(feature = "spinner")]
25use {
26 solana_message::Message, solana_signer::signers::Signers,
27 solana_transaction_error::TransactionError, tokio::time::Duration,
28};
29
30pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
31pub const DEFAULT_TPU_USE_QUIC: bool = true;
32pub const DEFAULT_VOTE_USE_QUIC: bool = false;
33
34pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 1;
38
39pub type Result<T> = std::result::Result<T, TpuSenderError>;
40
41#[cfg(feature = "spinner")]
43pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
44#[cfg(feature = "spinner")]
46pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
47
48pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
50
51pub const MAX_FANOUT_SLOTS: u64 = 100;
53
54#[derive(Clone, Debug)]
56pub struct TpuClientConfig {
57 pub fanout_slots: u64,
60}
61
62impl Default for TpuClientConfig {
63 fn default() -> Self {
64 Self {
65 fanout_slots: DEFAULT_FANOUT_SLOTS,
66 }
67 }
68}
69
70pub struct TpuClient<
73 P, M, C, > {
77 _deprecated: UdpSocket, rpc_client: Arc<RpcClient>,
80 tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
81}
82
83impl<P, M, C> TpuClient<P, M, C>
84where
85 P: ConnectionPool<NewConnectionConfig = C>,
86 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
87 C: NewConnectionConfig,
88{
89 pub fn send_transaction(&self, transaction: &Transaction) -> bool {
92 self.invoke(self.tpu_client.send_transaction(transaction))
93 }
94
95 pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
97 self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
98 }
99
100 pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
104 self.invoke(self.tpu_client.try_send_transaction(transaction))
105 }
106
107 pub fn send_transaction_to_upcoming_leaders(
113 &self,
114 transaction: &Transaction,
115 ) -> TransportResult<()> {
116 let wire_transaction =
117 bincode::serialize(&transaction).expect("should serialize transaction");
118
119 let leaders = self
120 .tpu_client
121 .get_leader_tpu_service()
122 .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());
123
124 let mut last_error: Option<TransportError> = None;
125 let mut some_success = false;
126 for tpu_address in &leaders {
127 let cache = self.tpu_client.get_connection_cache();
128 let conn = cache.get_connection(tpu_address);
129 if let Err(err) = conn.send_data_async(wire_transaction.clone()) {
130 last_error = Some(err);
131 } else {
132 some_success = true;
133 }
134 }
135
136 if let Some(err) = last_error {
137 Err(err)
138 } else if !some_success {
139 Err(std::io::Error::other("No sends attempted").into())
140 } else {
141 Ok(())
142 }
143 }
144
145 pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
149 let wire_transactions = transactions
150 .into_par_iter()
151 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
152 .collect::<Vec<_>>();
153 self.invoke(
154 self.tpu_client
155 .try_send_wire_transaction_batch(wire_transactions),
156 )
157 }
158
159 pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
162 self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
163 }
164
165 pub fn try_send_wire_transaction_batch(
166 &self,
167 wire_transactions: Vec<Vec<u8>>,
168 ) -> TransportResult<()> {
169 self.invoke(
170 self.tpu_client
171 .try_send_wire_transaction_batch(wire_transactions),
172 )
173 }
174
175 pub fn new(
177 name: &'static str,
178 rpc_client: Arc<RpcClient>,
179 websocket_url: &str,
180 config: TpuClientConfig,
181 connection_manager: M,
182 ) -> Result<Self> {
183 let create_tpu_client = NonblockingTpuClient::new(
184 name,
185 rpc_client.get_inner_client().clone(),
186 websocket_url,
187 config,
188 connection_manager,
189 );
190 let tpu_client =
191 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
192
193 Ok(Self {
194 _deprecated: bind_to_unspecified().unwrap(),
195 rpc_client,
196 tpu_client: Arc::new(tpu_client),
197 })
198 }
199
200 pub fn new_with_connection_cache(
202 rpc_client: Arc<RpcClient>,
203 websocket_url: &str,
204 config: TpuClientConfig,
205 connection_cache: Arc<ConnectionCache<P, M, C>>,
206 ) -> Result<Self> {
207 let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
208 rpc_client.get_inner_client().clone(),
209 websocket_url,
210 config,
211 connection_cache,
212 );
213 let tpu_client =
214 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
215
216 Ok(Self {
217 _deprecated: bind_to_unspecified().unwrap(),
218 rpc_client,
219 tpu_client: Arc::new(tpu_client),
220 })
221 }
222
223 #[cfg(feature = "spinner")]
224 pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
225 &self,
226 messages: &[Message],
227 signers: &T,
228 ) -> Result<Vec<Option<TransactionError>>> {
229 self.invoke(
230 self.tpu_client
231 .send_and_confirm_messages_with_spinner(messages, signers),
232 )
233 }
234
235 pub fn rpc_client(&self) -> &RpcClient {
236 &self.rpc_client
237 }
238
239 fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
240 tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
244 }
245}
246
247impl<P, M, C> AsyncClient for TpuClient<P, M, C>
250where
251 P: ConnectionPool<NewConnectionConfig = C>,
252 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
253 C: NewConnectionConfig,
254{
255 fn async_send_versioned_transaction(
256 &self,
257 transaction: VersionedTransaction,
258 ) -> TransportResult<Signature> {
259 let wire_transaction =
260 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
261 self.send_wire_transaction(wire_transaction);
262 Ok(transaction.signatures[0])
263 }
264
265 fn async_send_versioned_transaction_batch(
266 &self,
267 batch: Vec<VersionedTransaction>,
268 ) -> TransportResult<()> {
269 let buffers = batch
270 .into_par_iter()
271 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
272 .collect::<Vec<_>>();
273 self.try_send_wire_transaction_batch(buffers)?;
274 Ok(())
275 }
276}
277
278const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
280
281#[derive(Clone, Debug)]
282pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
283impl RecentLeaderSlots {
284 pub(crate) fn new(current_slot: Slot) -> Self {
285 let mut recent_slots = VecDeque::new();
286 recent_slots.push_back(current_slot);
287 Self(Arc::new(RwLock::new(recent_slots)))
288 }
289
290 pub(crate) fn record_slot(&self, current_slot: Slot) {
291 let mut recent_slots = self.0.write().unwrap();
292 recent_slots.push_back(current_slot);
293 while recent_slots.len() > 12 {
296 recent_slots.pop_front();
297 }
298 }
299
300 pub(crate) fn estimated_current_slot(&self) -> Slot {
302 let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
303 assert!(!recent_slots.is_empty());
304 recent_slots.sort_unstable();
305
306 let max_index = recent_slots.len() - 1;
309 let median_index = max_index / 2;
310 let median_recent_slot = recent_slots[median_index];
311 let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
312 let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
313
314 recent_slots
317 .into_iter()
318 .rev()
319 .find(|slot| *slot <= max_reasonable_current_slot)
320 .unwrap()
321 }
322}
323
324#[cfg(test)]
325impl From<Vec<Slot>> for RecentLeaderSlots {
326 fn from(recent_slots: Vec<Slot>) -> Self {
327 assert!(!recent_slots.is_empty());
328 Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
337 assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
338 }
339
340 #[test]
341 fn test_recent_leader_slots() {
342 assert_slot(RecentLeaderSlots::new(0), 0);
343
344 let mut recent_slots: Vec<Slot> = (1..=12).collect();
345 assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
346
347 recent_slots.reverse();
348 assert_slot(RecentLeaderSlots::from(recent_slots), 12);
349
350 assert_slot(
351 RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
352 1 + MAX_SLOT_SKIP_DISTANCE,
353 );
354 assert_slot(
355 RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
356 0,
357 );
358
359 assert_slot(RecentLeaderSlots::from(vec![1]), 1);
360 assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
361 assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
362 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
363 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
364 }
365}