solana_tpu_client/
tpu_client.rs

1pub use crate::nonblocking::tpu_client::TpuSenderError;
2use {
3    crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
4    rayon::iter::{IntoParallelIterator, ParallelIterator},
5    solana_client_traits::AsyncClient,
6    solana_clock::Slot,
7    solana_connection_cache::{
8        client_connection::ClientConnection,
9        connection_cache::{
10            ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
11        },
12    },
13    solana_rpc_client::rpc_client::RpcClient,
14    solana_signature::Signature,
15    solana_transaction::{versioned::VersionedTransaction, Transaction},
16    solana_transaction_error::{TransportError, TransportResult},
17    std::{
18        collections::VecDeque,
19        sync::{Arc, RwLock},
20    },
21};
22#[cfg(feature = "spinner")]
23use {
24    solana_message::Message, solana_signer::signers::Signers,
25    solana_transaction_error::TransactionError, tokio::time::Duration,
26};
27
28pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
29pub const DEFAULT_TPU_USE_QUIC: bool = true;
30pub const DEFAULT_VOTE_USE_QUIC: bool = false;
31
32/// The default connection count is set to 1 -- it should
33/// be sufficient for most use cases. Validators can use
34/// --tpu-connection-pool-size to override this default value.
35pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 1;
36
37pub type Result<T> = std::result::Result<T, TpuSenderError>;
38
39/// Send at ~100 TPS
40#[cfg(feature = "spinner")]
41pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
42/// Retry batch send after 4 seconds
43#[cfg(feature = "spinner")]
44pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
45
46/// Default number of slots used to build TPU socket fanout set
47pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
48
49/// Maximum number of slots used to build TPU socket fanout set
50pub const MAX_FANOUT_SLOTS: u64 = 100;
51
52/// Config params for `TpuClient`
53#[derive(Clone, Debug)]
54pub struct TpuClientConfig {
55    /// The range of upcoming slots to include when determining which
56    /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
57    pub fanout_slots: u64,
58}
59
60impl Default for TpuClientConfig {
61    fn default() -> Self {
62        Self {
63            fanout_slots: DEFAULT_FANOUT_SLOTS,
64        }
65    }
66}
67
68/// Client which sends transactions directly to the current leader's TPU port over UDP.
69/// The client uses RPC to determine the current leader and fetch node contact info
70pub struct TpuClient<
71    P, // ConnectionPool
72    M, // ConnectionManager
73    C, // NewConnectionConfig
74> {
75    rpc_client: Arc<RpcClient>,
76    tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
77}
78
79impl<P, M, C> TpuClient<P, M, C>
80where
81    P: ConnectionPool<NewConnectionConfig = C>,
82    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
83    C: NewConnectionConfig,
84{
85    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
86    /// size
87    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
88        self.invoke(self.tpu_client.send_transaction(transaction))
89    }
90
91    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
92    pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
93        self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
94    }
95
96    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
97    /// size
98    /// Returns the last error if all sends fail
99    pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
100        self.invoke(self.tpu_client.try_send_transaction(transaction))
101    }
102
103    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout.
104    ///
105    /// Returns an error if:
106    /// 1. there are no known tpu sockets to send to
107    /// 2. any of the sends fail, even if other sends succeeded.
108    pub fn send_transaction_to_upcoming_leaders(
109        &self,
110        transaction: &Transaction,
111    ) -> TransportResult<()> {
112        let wire_transaction =
113            Arc::new(bincode::serialize(&transaction).expect("should serialize transaction"));
114
115        let leaders = self
116            .tpu_client
117            .get_leader_tpu_service()
118            .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());
119
120        let mut last_error: Option<TransportError> = None;
121        let mut some_success = false;
122        for tpu_address in &leaders {
123            let cache = self.tpu_client.get_connection_cache();
124            let conn = cache.get_connection(tpu_address);
125            if let Err(err) = conn.send_data_async(wire_transaction.clone()) {
126                last_error = Some(err);
127            } else {
128                some_success = true;
129            }
130        }
131
132        if let Some(err) = last_error {
133            Err(err)
134        } else if !some_success {
135            Err(std::io::Error::other("No sends attempted").into())
136        } else {
137            Ok(())
138        }
139    }
140
141    /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
142    /// to fanout size
143    /// Returns the last error if all sends fail
144    pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
145        let wire_transactions = transactions
146            .into_par_iter()
147            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
148            .collect::<Vec<_>>();
149        self.invoke(
150            self.tpu_client
151                .try_send_wire_transaction_batch(wire_transactions),
152        )
153    }
154
155    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
156    /// Returns the last error if all sends fail
157    pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
158        self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
159    }
160
161    pub fn try_send_wire_transaction_batch(
162        &self,
163        wire_transactions: Vec<Vec<u8>>,
164    ) -> TransportResult<()> {
165        self.invoke(
166            self.tpu_client
167                .try_send_wire_transaction_batch(wire_transactions),
168        )
169    }
170
171    /// Create a new client that disconnects when dropped
172    pub fn new(
173        name: &'static str,
174        rpc_client: Arc<RpcClient>,
175        websocket_url: &str,
176        config: TpuClientConfig,
177        connection_manager: M,
178    ) -> Result<Self> {
179        let create_tpu_client = NonblockingTpuClient::new(
180            name,
181            rpc_client.get_inner_client().clone(),
182            websocket_url,
183            config,
184            connection_manager,
185        );
186        let tpu_client =
187            tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
188
189        Ok(Self {
190            rpc_client,
191            tpu_client: Arc::new(tpu_client),
192        })
193    }
194
195    /// Create a new client that disconnects when dropped
196    pub fn new_with_connection_cache(
197        rpc_client: Arc<RpcClient>,
198        websocket_url: &str,
199        config: TpuClientConfig,
200        connection_cache: Arc<ConnectionCache<P, M, C>>,
201    ) -> Result<Self> {
202        let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
203            rpc_client.get_inner_client().clone(),
204            websocket_url,
205            config,
206            connection_cache,
207        );
208        let tpu_client =
209            tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
210
211        Ok(Self {
212            rpc_client,
213            tpu_client: Arc::new(tpu_client),
214        })
215    }
216
217    #[cfg(feature = "spinner")]
218    pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
219        &self,
220        messages: &[Message],
221        signers: &T,
222    ) -> Result<Vec<Option<TransactionError>>> {
223        self.invoke(
224            self.tpu_client
225                .send_and_confirm_messages_with_spinner(messages, signers),
226        )
227    }
228
229    pub fn rpc_client(&self) -> &RpcClient {
230        &self.rpc_client
231    }
232
233    fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
234        // `block_on()` panics if called within an asynchronous execution context. Whereas
235        // `block_in_place()` only panics if called from a current_thread runtime, which is the
236        // lesser evil.
237        tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
238    }
239}
240
241// Methods below are required for calls to client.async_transfer()
242// where client is of type TpuClient<P, M, C>
243impl<P, M, C> AsyncClient for TpuClient<P, M, C>
244where
245    P: ConnectionPool<NewConnectionConfig = C>,
246    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
247    C: NewConnectionConfig,
248{
249    fn async_send_versioned_transaction(
250        &self,
251        transaction: VersionedTransaction,
252    ) -> TransportResult<Signature> {
253        let wire_transaction =
254            bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
255        self.send_wire_transaction(wire_transaction);
256        Ok(transaction.signatures[0])
257    }
258
259    fn async_send_versioned_transaction_batch(
260        &self,
261        batch: Vec<VersionedTransaction>,
262    ) -> TransportResult<()> {
263        let buffers = batch
264            .into_par_iter()
265            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
266            .collect::<Vec<_>>();
267        self.try_send_wire_transaction_batch(buffers)?;
268        Ok(())
269    }
270}
271
272// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
273const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
274
275#[derive(Clone, Debug)]
276pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
277impl RecentLeaderSlots {
278    pub(crate) fn new(current_slot: Slot) -> Self {
279        let mut recent_slots = VecDeque::new();
280        recent_slots.push_back(current_slot);
281        Self(Arc::new(RwLock::new(recent_slots)))
282    }
283
284    pub(crate) fn record_slot(&self, current_slot: Slot) {
285        let mut recent_slots = self.0.write().unwrap();
286        recent_slots.push_back(current_slot);
287        // 12 recent slots should be large enough to avoid a misbehaving
288        // validator from affecting the median recent slot
289        while recent_slots.len() > 12 {
290            recent_slots.pop_front();
291        }
292    }
293
294    // Estimate the current slot from recent slot notifications.
295    pub(crate) fn estimated_current_slot(&self) -> Slot {
296        let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
297        assert!(!recent_slots.is_empty());
298        recent_slots.sort_unstable();
299
300        // Validators can broadcast invalid blocks that are far in the future
301        // so check if the current slot is in line with the recent progression.
302        let max_index = recent_slots.len() - 1;
303        let median_index = max_index / 2;
304        let median_recent_slot = recent_slots[median_index];
305        let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
306        let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
307
308        // Return the highest slot that doesn't exceed what we believe is a
309        // reasonable slot.
310        recent_slots
311            .into_iter()
312            .rev()
313            .find(|slot| *slot <= max_reasonable_current_slot)
314            .unwrap()
315    }
316}
317
318#[cfg(test)]
319impl From<Vec<Slot>> for RecentLeaderSlots {
320    fn from(recent_slots: Vec<Slot>) -> Self {
321        assert!(!recent_slots.is_empty());
322        Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
331        assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
332    }
333
334    #[test]
335    fn test_recent_leader_slots() {
336        assert_slot(RecentLeaderSlots::new(0), 0);
337
338        let mut recent_slots: Vec<Slot> = (1..=12).collect();
339        assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
340
341        recent_slots.reverse();
342        assert_slot(RecentLeaderSlots::from(recent_slots), 12);
343
344        assert_slot(
345            RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
346            1 + MAX_SLOT_SKIP_DISTANCE,
347        );
348        assert_slot(
349            RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
350            0,
351        );
352
353        assert_slot(RecentLeaderSlots::from(vec![1]), 1);
354        assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
355        assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
356        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
357        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
358    }
359}