solana_tpu_client/
tpu_client.rs

1pub use crate::nonblocking::tpu_client::TpuSenderError;
2use {
3    crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
4    rayon::iter::{IntoParallelIterator, ParallelIterator},
5    solana_client_traits::AsyncClient,
6    solana_clock::Slot,
7    solana_connection_cache::{
8        client_connection::ClientConnection,
9        connection_cache::{
10            ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
11        },
12    },
13    solana_net_utils::bind_to_unspecified,
14    solana_rpc_client::rpc_client::RpcClient,
15    solana_signature::Signature,
16    solana_transaction::{versioned::VersionedTransaction, Transaction},
17    solana_transaction_error::{TransportError, TransportResult},
18    std::{
19        collections::VecDeque,
20        net::UdpSocket,
21        sync::{Arc, RwLock},
22    },
23};
24#[cfg(feature = "spinner")]
25use {
26    solana_message::Message, solana_signer::signers::Signers,
27    solana_transaction_error::TransactionError, tokio::time::Duration,
28};
29
30pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
31pub const DEFAULT_TPU_USE_QUIC: bool = true;
32pub const DEFAULT_VOTE_USE_QUIC: bool = false;
33
34/// The default connection count is set to 1 -- it should
35/// be sufficient for most use cases. Validators can use
36/// --tpu-connection-pool-size to override this default value.
37pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 1;
38
39pub type Result<T> = std::result::Result<T, TpuSenderError>;
40
41/// Send at ~100 TPS
42#[cfg(feature = "spinner")]
43pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
44/// Retry batch send after 4 seconds
45#[cfg(feature = "spinner")]
46pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
47
48/// Default number of slots used to build TPU socket fanout set
49pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
50
51/// Maximum number of slots used to build TPU socket fanout set
52pub const MAX_FANOUT_SLOTS: u64 = 100;
53
54/// Config params for `TpuClient`
55#[derive(Clone, Debug)]
56pub struct TpuClientConfig {
57    /// The range of upcoming slots to include when determining which
58    /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
59    pub fanout_slots: u64,
60}
61
62impl Default for TpuClientConfig {
63    fn default() -> Self {
64        Self {
65            fanout_slots: DEFAULT_FANOUT_SLOTS,
66        }
67    }
68}
69
70/// Client which sends transactions directly to the current leader's TPU port over UDP.
71/// The client uses RPC to determine the current leader and fetch node contact info
72pub struct TpuClient<
73    P, // ConnectionPool
74    M, // ConnectionManager
75    C, // NewConnectionConfig
76> {
77    _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
78    //todo: get rid of this field
79    rpc_client: Arc<RpcClient>,
80    tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
81}
82
83impl<P, M, C> TpuClient<P, M, C>
84where
85    P: ConnectionPool<NewConnectionConfig = C>,
86    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
87    C: NewConnectionConfig,
88{
89    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
90    /// size
91    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
92        self.invoke(self.tpu_client.send_transaction(transaction))
93    }
94
95    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
96    pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
97        self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
98    }
99
100    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
101    /// size
102    /// Returns the last error if all sends fail
103    pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
104        self.invoke(self.tpu_client.try_send_transaction(transaction))
105    }
106
107    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout.
108    ///
109    /// Returns an error if:
110    /// 1. there are no known tpu sockets to send to
111    /// 2. any of the sends fail, even if other sends succeeded.
112    pub fn send_transaction_to_upcoming_leaders(
113        &self,
114        transaction: &Transaction,
115    ) -> TransportResult<()> {
116        let wire_transaction =
117            bincode::serialize(&transaction).expect("should serialize transaction");
118
119        let leaders = self
120            .tpu_client
121            .get_leader_tpu_service()
122            .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());
123
124        let mut last_error: Option<TransportError> = None;
125        let mut some_success = false;
126        for tpu_address in &leaders {
127            let cache = self.tpu_client.get_connection_cache();
128            let conn = cache.get_connection(tpu_address);
129            if let Err(err) = conn.send_data_async(wire_transaction.clone()) {
130                last_error = Some(err);
131            } else {
132                some_success = true;
133            }
134        }
135
136        if let Some(err) = last_error {
137            Err(err)
138        } else if !some_success {
139            Err(std::io::Error::other("No sends attempted").into())
140        } else {
141            Ok(())
142        }
143    }
144
145    /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
146    /// to fanout size
147    /// Returns the last error if all sends fail
148    pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
149        let wire_transactions = transactions
150            .into_par_iter()
151            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
152            .collect::<Vec<_>>();
153        self.invoke(
154            self.tpu_client
155                .try_send_wire_transaction_batch(wire_transactions),
156        )
157    }
158
159    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
160    /// Returns the last error if all sends fail
161    pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
162        self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
163    }
164
165    pub fn try_send_wire_transaction_batch(
166        &self,
167        wire_transactions: Vec<Vec<u8>>,
168    ) -> TransportResult<()> {
169        self.invoke(
170            self.tpu_client
171                .try_send_wire_transaction_batch(wire_transactions),
172        )
173    }
174
175    /// Create a new client that disconnects when dropped
176    pub fn new(
177        name: &'static str,
178        rpc_client: Arc<RpcClient>,
179        websocket_url: &str,
180        config: TpuClientConfig,
181        connection_manager: M,
182    ) -> Result<Self> {
183        let create_tpu_client = NonblockingTpuClient::new(
184            name,
185            rpc_client.get_inner_client().clone(),
186            websocket_url,
187            config,
188            connection_manager,
189        );
190        let tpu_client =
191            tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
192
193        Ok(Self {
194            _deprecated: bind_to_unspecified().unwrap(),
195            rpc_client,
196            tpu_client: Arc::new(tpu_client),
197        })
198    }
199
200    /// Create a new client that disconnects when dropped
201    pub fn new_with_connection_cache(
202        rpc_client: Arc<RpcClient>,
203        websocket_url: &str,
204        config: TpuClientConfig,
205        connection_cache: Arc<ConnectionCache<P, M, C>>,
206    ) -> Result<Self> {
207        let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
208            rpc_client.get_inner_client().clone(),
209            websocket_url,
210            config,
211            connection_cache,
212        );
213        let tpu_client =
214            tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
215
216        Ok(Self {
217            _deprecated: bind_to_unspecified().unwrap(),
218            rpc_client,
219            tpu_client: Arc::new(tpu_client),
220        })
221    }
222
223    #[cfg(feature = "spinner")]
224    pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
225        &self,
226        messages: &[Message],
227        signers: &T,
228    ) -> Result<Vec<Option<TransactionError>>> {
229        self.invoke(
230            self.tpu_client
231                .send_and_confirm_messages_with_spinner(messages, signers),
232        )
233    }
234
235    pub fn rpc_client(&self) -> &RpcClient {
236        &self.rpc_client
237    }
238
239    fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
240        // `block_on()` panics if called within an asynchronous execution context. Whereas
241        // `block_in_place()` only panics if called from a current_thread runtime, which is the
242        // lesser evil.
243        tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
244    }
245}
246
247// Methods below are required for calls to client.async_transfer()
248// where client is of type TpuClient<P, M, C>
249impl<P, M, C> AsyncClient for TpuClient<P, M, C>
250where
251    P: ConnectionPool<NewConnectionConfig = C>,
252    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
253    C: NewConnectionConfig,
254{
255    fn async_send_versioned_transaction(
256        &self,
257        transaction: VersionedTransaction,
258    ) -> TransportResult<Signature> {
259        let wire_transaction =
260            bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
261        self.send_wire_transaction(wire_transaction);
262        Ok(transaction.signatures[0])
263    }
264
265    fn async_send_versioned_transaction_batch(
266        &self,
267        batch: Vec<VersionedTransaction>,
268    ) -> TransportResult<()> {
269        let buffers = batch
270            .into_par_iter()
271            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
272            .collect::<Vec<_>>();
273        self.try_send_wire_transaction_batch(buffers)?;
274        Ok(())
275    }
276}
277
278// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
279const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
280
281#[derive(Clone, Debug)]
282pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
283impl RecentLeaderSlots {
284    pub(crate) fn new(current_slot: Slot) -> Self {
285        let mut recent_slots = VecDeque::new();
286        recent_slots.push_back(current_slot);
287        Self(Arc::new(RwLock::new(recent_slots)))
288    }
289
290    pub(crate) fn record_slot(&self, current_slot: Slot) {
291        let mut recent_slots = self.0.write().unwrap();
292        recent_slots.push_back(current_slot);
293        // 12 recent slots should be large enough to avoid a misbehaving
294        // validator from affecting the median recent slot
295        while recent_slots.len() > 12 {
296            recent_slots.pop_front();
297        }
298    }
299
300    // Estimate the current slot from recent slot notifications.
301    pub(crate) fn estimated_current_slot(&self) -> Slot {
302        let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
303        assert!(!recent_slots.is_empty());
304        recent_slots.sort_unstable();
305
306        // Validators can broadcast invalid blocks that are far in the future
307        // so check if the current slot is in line with the recent progression.
308        let max_index = recent_slots.len() - 1;
309        let median_index = max_index / 2;
310        let median_recent_slot = recent_slots[median_index];
311        let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
312        let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
313
314        // Return the highest slot that doesn't exceed what we believe is a
315        // reasonable slot.
316        recent_slots
317            .into_iter()
318            .rev()
319            .find(|slot| *slot <= max_reasonable_current_slot)
320            .unwrap()
321    }
322}
323
324#[cfg(test)]
325impl From<Vec<Slot>> for RecentLeaderSlots {
326    fn from(recent_slots: Vec<Slot>) -> Self {
327        assert!(!recent_slots.is_empty());
328        Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
337        assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
338    }
339
340    #[test]
341    fn test_recent_leader_slots() {
342        assert_slot(RecentLeaderSlots::new(0), 0);
343
344        let mut recent_slots: Vec<Slot> = (1..=12).collect();
345        assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
346
347        recent_slots.reverse();
348        assert_slot(RecentLeaderSlots::from(recent_slots), 12);
349
350        assert_slot(
351            RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
352            1 + MAX_SLOT_SKIP_DISTANCE,
353        );
354        assert_slot(
355            RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
356            0,
357        );
358
359        assert_slot(RecentLeaderSlots::from(vec![1]), 1);
360        assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
361        assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
362        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
363        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
364    }
365}