1pub use crate::nonblocking::tpu_client::TpuSenderError;
2use {
3 crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
4 rayon::iter::{IntoParallelIterator, ParallelIterator},
5 solana_client_traits::AsyncClient,
6 solana_clock::Slot,
7 solana_connection_cache::{
8 client_connection::ClientConnection,
9 connection_cache::{
10 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
11 },
12 },
13 solana_rpc_client::rpc_client::RpcClient,
14 solana_signature::Signature,
15 solana_transaction::{versioned::VersionedTransaction, Transaction},
16 solana_transaction_error::{TransportError, TransportResult},
17 std::{
18 collections::VecDeque,
19 sync::{Arc, RwLock},
20 },
21};
22#[cfg(feature = "spinner")]
23use {
24 solana_message::Message, solana_signer::signers::Signers,
25 solana_transaction_error::TransactionError, tokio::time::Duration,
26};
27
28pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
29pub const DEFAULT_TPU_USE_QUIC: bool = true;
30pub const DEFAULT_VOTE_USE_QUIC: bool = false;
31
32pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 1;
36
37pub type Result<T> = std::result::Result<T, TpuSenderError>;
38
39#[cfg(feature = "spinner")]
41pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
42#[cfg(feature = "spinner")]
44pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
45
46pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
48
49pub const MAX_FANOUT_SLOTS: u64 = 100;
51
52#[derive(Clone, Debug)]
54pub struct TpuClientConfig {
55 pub fanout_slots: u64,
58}
59
60impl Default for TpuClientConfig {
61 fn default() -> Self {
62 Self {
63 fanout_slots: DEFAULT_FANOUT_SLOTS,
64 }
65 }
66}
67
68pub struct TpuClient<
71 P, M, C, > {
75 rpc_client: Arc<RpcClient>,
76 tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
77}
78
79impl<P, M, C> TpuClient<P, M, C>
80where
81 P: ConnectionPool<NewConnectionConfig = C>,
82 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
83 C: NewConnectionConfig,
84{
85 pub fn send_transaction(&self, transaction: &Transaction) -> bool {
88 self.invoke(self.tpu_client.send_transaction(transaction))
89 }
90
91 pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
93 self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
94 }
95
96 pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
100 self.invoke(self.tpu_client.try_send_transaction(transaction))
101 }
102
103 pub fn send_transaction_to_upcoming_leaders(
109 &self,
110 transaction: &Transaction,
111 ) -> TransportResult<()> {
112 let wire_transaction =
113 Arc::new(bincode::serialize(&transaction).expect("should serialize transaction"));
114
115 let leaders = self
116 .tpu_client
117 .get_leader_tpu_service()
118 .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());
119
120 let mut last_error: Option<TransportError> = None;
121 let mut some_success = false;
122 for tpu_address in &leaders {
123 let cache = self.tpu_client.get_connection_cache();
124 let conn = cache.get_connection(tpu_address);
125 if let Err(err) = conn.send_data_async(wire_transaction.clone()) {
126 last_error = Some(err);
127 } else {
128 some_success = true;
129 }
130 }
131
132 if let Some(err) = last_error {
133 Err(err)
134 } else if !some_success {
135 Err(std::io::Error::other("No sends attempted").into())
136 } else {
137 Ok(())
138 }
139 }
140
141 pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
145 let wire_transactions = transactions
146 .into_par_iter()
147 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
148 .collect::<Vec<_>>();
149 self.invoke(
150 self.tpu_client
151 .try_send_wire_transaction_batch(wire_transactions),
152 )
153 }
154
155 pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
158 self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
159 }
160
161 pub fn try_send_wire_transaction_batch(
162 &self,
163 wire_transactions: Vec<Vec<u8>>,
164 ) -> TransportResult<()> {
165 self.invoke(
166 self.tpu_client
167 .try_send_wire_transaction_batch(wire_transactions),
168 )
169 }
170
171 pub fn new(
173 name: &'static str,
174 rpc_client: Arc<RpcClient>,
175 websocket_url: &str,
176 config: TpuClientConfig,
177 connection_manager: M,
178 ) -> Result<Self> {
179 let create_tpu_client = NonblockingTpuClient::new(
180 name,
181 rpc_client.get_inner_client().clone(),
182 websocket_url,
183 config,
184 connection_manager,
185 );
186 let tpu_client =
187 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
188
189 Ok(Self {
190 rpc_client,
191 tpu_client: Arc::new(tpu_client),
192 })
193 }
194
195 pub fn new_with_connection_cache(
197 rpc_client: Arc<RpcClient>,
198 websocket_url: &str,
199 config: TpuClientConfig,
200 connection_cache: Arc<ConnectionCache<P, M, C>>,
201 ) -> Result<Self> {
202 let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
203 rpc_client.get_inner_client().clone(),
204 websocket_url,
205 config,
206 connection_cache,
207 );
208 let tpu_client =
209 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
210
211 Ok(Self {
212 rpc_client,
213 tpu_client: Arc::new(tpu_client),
214 })
215 }
216
217 #[cfg(feature = "spinner")]
218 pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
219 &self,
220 messages: &[Message],
221 signers: &T,
222 ) -> Result<Vec<Option<TransactionError>>> {
223 self.invoke(
224 self.tpu_client
225 .send_and_confirm_messages_with_spinner(messages, signers),
226 )
227 }
228
229 pub fn rpc_client(&self) -> &RpcClient {
230 &self.rpc_client
231 }
232
233 fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
234 tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
238 }
239}
240
241impl<P, M, C> AsyncClient for TpuClient<P, M, C>
244where
245 P: ConnectionPool<NewConnectionConfig = C>,
246 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
247 C: NewConnectionConfig,
248{
249 fn async_send_versioned_transaction(
250 &self,
251 transaction: VersionedTransaction,
252 ) -> TransportResult<Signature> {
253 let wire_transaction =
254 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
255 self.send_wire_transaction(wire_transaction);
256 Ok(transaction.signatures[0])
257 }
258
259 fn async_send_versioned_transaction_batch(
260 &self,
261 batch: Vec<VersionedTransaction>,
262 ) -> TransportResult<()> {
263 let buffers = batch
264 .into_par_iter()
265 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
266 .collect::<Vec<_>>();
267 self.try_send_wire_transaction_batch(buffers)?;
268 Ok(())
269 }
270}
271
272const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
274
275#[derive(Clone, Debug)]
276pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
277impl RecentLeaderSlots {
278 pub(crate) fn new(current_slot: Slot) -> Self {
279 let mut recent_slots = VecDeque::new();
280 recent_slots.push_back(current_slot);
281 Self(Arc::new(RwLock::new(recent_slots)))
282 }
283
284 pub(crate) fn record_slot(&self, current_slot: Slot) {
285 let mut recent_slots = self.0.write().unwrap();
286 recent_slots.push_back(current_slot);
287 while recent_slots.len() > 12 {
290 recent_slots.pop_front();
291 }
292 }
293
294 pub(crate) fn estimated_current_slot(&self) -> Slot {
296 let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
297 assert!(!recent_slots.is_empty());
298 recent_slots.sort_unstable();
299
300 let max_index = recent_slots.len() - 1;
303 let median_index = max_index / 2;
304 let median_recent_slot = recent_slots[median_index];
305 let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
306 let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
307
308 recent_slots
311 .into_iter()
312 .rev()
313 .find(|slot| *slot <= max_reasonable_current_slot)
314 .unwrap()
315 }
316}
317
318#[cfg(test)]
319impl From<Vec<Slot>> for RecentLeaderSlots {
320 fn from(recent_slots: Vec<Slot>) -> Self {
321 assert!(!recent_slots.is_empty());
322 Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
331 assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
332 }
333
334 #[test]
335 fn test_recent_leader_slots() {
336 assert_slot(RecentLeaderSlots::new(0), 0);
337
338 let mut recent_slots: Vec<Slot> = (1..=12).collect();
339 assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
340
341 recent_slots.reverse();
342 assert_slot(RecentLeaderSlots::from(recent_slots), 12);
343
344 assert_slot(
345 RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
346 1 + MAX_SLOT_SKIP_DISTANCE,
347 );
348 assert_slot(
349 RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
350 0,
351 );
352
353 assert_slot(RecentLeaderSlots::from(vec![1]), 1);
354 assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
355 assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
356 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
357 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
358 }
359}