gemachain_client/
tpu_client.rs

1use crate::{
2    pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
3    rpc_client::RpcClient,
4    rpc_response::SlotUpdate,
5};
6use bincode::serialize;
7use log::*;
8use gemachain_sdk::{
9    clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction,
10};
11use std::{
12    collections::{HashMap, HashSet, VecDeque},
13    net::{SocketAddr, UdpSocket},
14    str::FromStr,
15    sync::{
16        atomic::{AtomicBool, Ordering},
17        Arc, RwLock,
18    },
19    thread::JoinHandle,
20    time::{Duration, Instant},
21};
22use thiserror::Error;
23
24#[derive(Error, Debug)]
25pub enum TpuSenderError {
26    #[error("Pubsub error: {0:?}")]
27    PubsubError(#[from] PubsubClientError),
28    #[error("RPC error: {0:?}")]
29    RpcError(#[from] crate::client_error::ClientError),
30    #[error("IO error: {0:?}")]
31    IoError(#[from] std::io::Error),
32}
33
34type Result<T> = std::result::Result<T, TpuSenderError>;
35
36/// Default number of slots used to build TPU socket fanout set
37pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
38
39/// Maximum number of slots used to build TPU socket fanout set
40pub const MAX_FANOUT_SLOTS: u64 = 100;
41
42/// Config params for `TpuClient`
43#[derive(Clone, Debug)]
44pub struct TpuClientConfig {
45    /// The range of upcoming slots to include when determining which
46    /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
47    pub fanout_slots: u64,
48}
49
50impl Default for TpuClientConfig {
51    fn default() -> Self {
52        Self {
53            fanout_slots: DEFAULT_FANOUT_SLOTS,
54        }
55    }
56}
57
58/// Client which sends transactions directly to the current leader's TPU port over UDP.
59/// The client uses RPC to determine the current leader and fetch node contact info
60pub struct TpuClient {
61    send_socket: UdpSocket,
62    fanout_slots: u64,
63    leader_tpu_service: LeaderTpuService,
64    exit: Arc<AtomicBool>,
65}
66
67impl TpuClient {
68    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
69    /// size
70    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
71        let wire_transaction = serialize(transaction).expect("serialization should succeed");
72        self.send_wire_transaction(&wire_transaction)
73    }
74
75    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
76    pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
77        let mut sent = false;
78        for tpu_address in self
79            .leader_tpu_service
80            .leader_tpu_sockets(self.fanout_slots)
81        {
82            if self
83                .send_socket
84                .send_to(wire_transaction, tpu_address)
85                .is_ok()
86            {
87                sent = true;
88            }
89        }
90        sent
91    }
92
93    /// Create a new client that disconnects when dropped
94    pub fn new(
95        rpc_client: Arc<RpcClient>,
96        websocket_url: &str,
97        config: TpuClientConfig,
98    ) -> Result<Self> {
99        let exit = Arc::new(AtomicBool::new(false));
100        let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?;
101
102        Ok(Self {
103            send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
104            fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
105            leader_tpu_service,
106            exit,
107        })
108    }
109}
110
111impl Drop for TpuClient {
112    fn drop(&mut self) {
113        self.exit.store(true, Ordering::Relaxed);
114        self.leader_tpu_service.join();
115    }
116}
117
118struct LeaderTpuCache {
119    first_slot: Slot,
120    leaders: Vec<Pubkey>,
121    leader_tpu_map: HashMap<Pubkey, SocketAddr>,
122    slots_in_epoch: Slot,
123    last_epoch_info_slot: Slot,
124}
125
126impl LeaderTpuCache {
127    fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
128        let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
129        let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?;
130        let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
131        Ok(Self {
132            first_slot,
133            leaders,
134            leader_tpu_map,
135            slots_in_epoch,
136            last_epoch_info_slot: first_slot,
137        })
138    }
139
140    // Last slot that has a cached leader pubkey
141    fn last_slot(&self) -> Slot {
142        self.first_slot + self.leaders.len().saturating_sub(1) as u64
143    }
144
145    // Get the TPU sockets for the current leader and upcoming leaders according to fanout size
146    fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
147        let mut leader_set = HashSet::new();
148        let mut leader_sockets = Vec::new();
149        for leader_slot in current_slot..current_slot + fanout_slots {
150            if let Some(leader) = self.get_slot_leader(leader_slot) {
151                if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
152                    if leader_set.insert(*leader) {
153                        leader_sockets.push(*tpu_socket);
154                    }
155                } else {
156                    // The leader is probably delinquent
157                    trace!("TPU not available for leader {}", leader);
158                }
159            } else {
160                // Overran the local leader schedule cache
161                warn!(
162                    "Leader not known for slot {}; cache holds slots [{},{}]",
163                    leader_slot,
164                    self.first_slot,
165                    self.last_slot()
166                );
167            }
168        }
169        leader_sockets
170    }
171
172    fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
173        if slot >= self.first_slot {
174            let index = slot - self.first_slot;
175            self.leaders.get(index as usize)
176        } else {
177            None
178        }
179    }
180
181    fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result<HashMap<Pubkey, SocketAddr>> {
182        let cluster_contact_info = rpc_client.get_cluster_nodes()?;
183        Ok(cluster_contact_info
184            .into_iter()
185            .filter_map(|contact_info| {
186                Some((
187                    Pubkey::from_str(&contact_info.pubkey).ok()?,
188                    contact_info.tpu?,
189                ))
190            })
191            .collect())
192    }
193
194    fn fetch_slot_leaders(
195        rpc_client: &RpcClient,
196        start_slot: Slot,
197        slots_in_epoch: Slot,
198    ) -> Result<Vec<Pubkey>> {
199        let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch);
200        Ok(rpc_client.get_slot_leaders(start_slot, fanout)?)
201    }
202}
203
204// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
205const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
206
207#[derive(Clone, Debug)]
208struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
209impl RecentLeaderSlots {
210    fn new(current_slot: Slot) -> Self {
211        let mut recent_slots = VecDeque::new();
212        recent_slots.push_back(current_slot);
213        Self(Arc::new(RwLock::new(recent_slots)))
214    }
215
216    fn record_slot(&self, current_slot: Slot) {
217        let mut recent_slots = self.0.write().unwrap();
218        recent_slots.push_back(current_slot);
219        // 12 recent slots should be large enough to avoid a misbehaving
220        // validator from affecting the median recent slot
221        while recent_slots.len() > 12 {
222            recent_slots.pop_front();
223        }
224    }
225
226    // Estimate the current slot from recent slot notifications.
227    fn estimated_current_slot(&self) -> Slot {
228        let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
229        assert!(!recent_slots.is_empty());
230        recent_slots.sort_unstable();
231
232        // Validators can broadcast invalid blocks that are far in the future
233        // so check if the current slot is in line with the recent progression.
234        let max_index = recent_slots.len() - 1;
235        let median_index = max_index / 2;
236        let median_recent_slot = recent_slots[median_index];
237        let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
238        let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
239
240        // Return the highest slot that doesn't exceed what we believe is a
241        // reasonable slot.
242        recent_slots
243            .into_iter()
244            .rev()
245            .find(|slot| *slot <= max_reasonable_current_slot)
246            .unwrap()
247    }
248}
249
250#[cfg(test)]
251impl From<Vec<Slot>> for RecentLeaderSlots {
252    fn from(recent_slots: Vec<Slot>) -> Self {
253        assert!(!recent_slots.is_empty());
254        Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
255    }
256}
257
258/// Service that tracks upcoming leaders and maintains an up-to-date mapping
259/// of leader id to TPU socket address.
260struct LeaderTpuService {
261    recent_slots: RecentLeaderSlots,
262    leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
263    subscription: Option<PubsubClientSubscription<SlotUpdate>>,
264    t_leader_tpu_service: Option<JoinHandle<()>>,
265}
266
267impl LeaderTpuService {
268    fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
269        let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
270
271        let recent_slots = RecentLeaderSlots::new(start_slot);
272        let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?));
273
274        let subscription = if !websocket_url.is_empty() {
275            let recent_slots = recent_slots.clone();
276            Some(PubsubClient::slot_updates_subscribe(
277                websocket_url,
278                move |update| {
279                    let current_slot = match update {
280                        // This update indicates that a full slot was received by the connected
281                        // node so we can stop sending transactions to the leader for that slot
282                        SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
283                        // This update indicates that we have just received the first shred from
284                        // the leader for this slot and they are probably still accepting transactions.
285                        SlotUpdate::FirstShredReceived { slot, .. } => slot,
286                        _ => return,
287                    };
288                    recent_slots.record_slot(current_slot);
289                },
290            )?)
291        } else {
292            None
293        };
294
295        let t_leader_tpu_service = Some({
296            let recent_slots = recent_slots.clone();
297            let leader_tpu_cache = leader_tpu_cache.clone();
298            std::thread::Builder::new()
299                .name("ldr-tpu-srv".to_string())
300                .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
301                .unwrap()
302        });
303
304        Ok(LeaderTpuService {
305            recent_slots,
306            leader_tpu_cache,
307            subscription,
308            t_leader_tpu_service,
309        })
310    }
311
312    fn join(&mut self) {
313        if let Some(mut subscription) = self.subscription.take() {
314            let _ = subscription.send_unsubscribe();
315            let _ = subscription.shutdown();
316        }
317        if let Some(t_handle) = self.t_leader_tpu_service.take() {
318            t_handle.join().unwrap();
319        }
320    }
321
322    fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
323        let current_slot = self.recent_slots.estimated_current_slot();
324        self.leader_tpu_cache
325            .read()
326            .unwrap()
327            .get_leader_sockets(current_slot, fanout_slots)
328    }
329
330    fn run(
331        rpc_client: Arc<RpcClient>,
332        recent_slots: RecentLeaderSlots,
333        leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
334        exit: Arc<AtomicBool>,
335    ) {
336        let mut last_cluster_refresh = Instant::now();
337        let mut sleep_ms = 1000;
338        loop {
339            if exit.load(Ordering::Relaxed) {
340                break;
341            }
342
343            // Sleep a few slots before checking if leader cache needs to be refreshed again
344            std::thread::sleep(Duration::from_millis(sleep_ms));
345            sleep_ms = 1000;
346
347            // Refresh cluster TPU ports every 5min in case validators restart with new port configuration
348            // or new validators come online
349            if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
350                match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
351                    Ok(leader_tpu_map) => {
352                        leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
353                        last_cluster_refresh = Instant::now();
354                    }
355                    Err(err) => {
356                        warn!("Failed to fetch cluster tpu sockets: {}", err);
357                        sleep_ms = 100;
358                    }
359                }
360            }
361
362            let estimated_current_slot = recent_slots.estimated_current_slot();
363            let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = {
364                let leader_tpu_cache = leader_tpu_cache.read().unwrap();
365                (
366                    leader_tpu_cache.last_slot(),
367                    leader_tpu_cache.last_epoch_info_slot,
368                    leader_tpu_cache.slots_in_epoch,
369                )
370            };
371            if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
372                if let Ok(epoch_info) = rpc_client.get_epoch_info() {
373                    slots_in_epoch = epoch_info.slots_in_epoch;
374                    let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
375                    leader_tpu_cache.slots_in_epoch = slots_in_epoch;
376                    leader_tpu_cache.last_epoch_info_slot = estimated_current_slot;
377                }
378            }
379            if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) {
380                match LeaderTpuCache::fetch_slot_leaders(
381                    &rpc_client,
382                    estimated_current_slot,
383                    slots_in_epoch,
384                ) {
385                    Ok(slot_leaders) => {
386                        let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
387                        leader_tpu_cache.first_slot = estimated_current_slot;
388                        leader_tpu_cache.leaders = slot_leaders;
389                    }
390                    Err(err) => {
391                        warn!(
392                            "Failed to fetch slot leaders (current estimated slot: {}): {}",
393                            estimated_current_slot, err
394                        );
395                        sleep_ms = 100;
396                    }
397                }
398            }
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
408        assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
409    }
410
411    #[test]
412    fn test_recent_leader_slots() {
413        assert_slot(RecentLeaderSlots::new(0), 0);
414
415        let mut recent_slots: Vec<Slot> = (1..=12).collect();
416        assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
417
418        recent_slots.reverse();
419        assert_slot(RecentLeaderSlots::from(recent_slots), 12);
420
421        assert_slot(
422            RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
423            1 + MAX_SLOT_SKIP_DISTANCE,
424        );
425        assert_slot(
426            RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
427            0,
428        );
429
430        assert_slot(RecentLeaderSlots::from(vec![1]), 1);
431        assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
432        assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
433        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
434        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
435    }
436}