1use crate::{
2 pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
3 rpc_client::RpcClient,
4 rpc_response::SlotUpdate,
5};
6use bincode::serialize;
7use log::*;
8use gemachain_sdk::{
9 clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction,
10};
11use std::{
12 collections::{HashMap, HashSet, VecDeque},
13 net::{SocketAddr, UdpSocket},
14 str::FromStr,
15 sync::{
16 atomic::{AtomicBool, Ordering},
17 Arc, RwLock,
18 },
19 thread::JoinHandle,
20 time::{Duration, Instant},
21};
22use thiserror::Error;
23
24#[derive(Error, Debug)]
25pub enum TpuSenderError {
26 #[error("Pubsub error: {0:?}")]
27 PubsubError(#[from] PubsubClientError),
28 #[error("RPC error: {0:?}")]
29 RpcError(#[from] crate::client_error::ClientError),
30 #[error("IO error: {0:?}")]
31 IoError(#[from] std::io::Error),
32}
33
34type Result<T> = std::result::Result<T, TpuSenderError>;
35
36pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
38
39pub const MAX_FANOUT_SLOTS: u64 = 100;
41
42#[derive(Clone, Debug)]
44pub struct TpuClientConfig {
45 pub fanout_slots: u64,
48}
49
50impl Default for TpuClientConfig {
51 fn default() -> Self {
52 Self {
53 fanout_slots: DEFAULT_FANOUT_SLOTS,
54 }
55 }
56}
57
58pub struct TpuClient {
61 send_socket: UdpSocket,
62 fanout_slots: u64,
63 leader_tpu_service: LeaderTpuService,
64 exit: Arc<AtomicBool>,
65}
66
67impl TpuClient {
68 pub fn send_transaction(&self, transaction: &Transaction) -> bool {
71 let wire_transaction = serialize(transaction).expect("serialization should succeed");
72 self.send_wire_transaction(&wire_transaction)
73 }
74
75 pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
77 let mut sent = false;
78 for tpu_address in self
79 .leader_tpu_service
80 .leader_tpu_sockets(self.fanout_slots)
81 {
82 if self
83 .send_socket
84 .send_to(wire_transaction, tpu_address)
85 .is_ok()
86 {
87 sent = true;
88 }
89 }
90 sent
91 }
92
93 pub fn new(
95 rpc_client: Arc<RpcClient>,
96 websocket_url: &str,
97 config: TpuClientConfig,
98 ) -> Result<Self> {
99 let exit = Arc::new(AtomicBool::new(false));
100 let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?;
101
102 Ok(Self {
103 send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
104 fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
105 leader_tpu_service,
106 exit,
107 })
108 }
109}
110
111impl Drop for TpuClient {
112 fn drop(&mut self) {
113 self.exit.store(true, Ordering::Relaxed);
114 self.leader_tpu_service.join();
115 }
116}
117
118struct LeaderTpuCache {
119 first_slot: Slot,
120 leaders: Vec<Pubkey>,
121 leader_tpu_map: HashMap<Pubkey, SocketAddr>,
122 slots_in_epoch: Slot,
123 last_epoch_info_slot: Slot,
124}
125
126impl LeaderTpuCache {
127 fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
128 let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
129 let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?;
130 let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
131 Ok(Self {
132 first_slot,
133 leaders,
134 leader_tpu_map,
135 slots_in_epoch,
136 last_epoch_info_slot: first_slot,
137 })
138 }
139
140 fn last_slot(&self) -> Slot {
142 self.first_slot + self.leaders.len().saturating_sub(1) as u64
143 }
144
145 fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
147 let mut leader_set = HashSet::new();
148 let mut leader_sockets = Vec::new();
149 for leader_slot in current_slot..current_slot + fanout_slots {
150 if let Some(leader) = self.get_slot_leader(leader_slot) {
151 if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
152 if leader_set.insert(*leader) {
153 leader_sockets.push(*tpu_socket);
154 }
155 } else {
156 trace!("TPU not available for leader {}", leader);
158 }
159 } else {
160 warn!(
162 "Leader not known for slot {}; cache holds slots [{},{}]",
163 leader_slot,
164 self.first_slot,
165 self.last_slot()
166 );
167 }
168 }
169 leader_sockets
170 }
171
172 fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
173 if slot >= self.first_slot {
174 let index = slot - self.first_slot;
175 self.leaders.get(index as usize)
176 } else {
177 None
178 }
179 }
180
181 fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result<HashMap<Pubkey, SocketAddr>> {
182 let cluster_contact_info = rpc_client.get_cluster_nodes()?;
183 Ok(cluster_contact_info
184 .into_iter()
185 .filter_map(|contact_info| {
186 Some((
187 Pubkey::from_str(&contact_info.pubkey).ok()?,
188 contact_info.tpu?,
189 ))
190 })
191 .collect())
192 }
193
194 fn fetch_slot_leaders(
195 rpc_client: &RpcClient,
196 start_slot: Slot,
197 slots_in_epoch: Slot,
198 ) -> Result<Vec<Pubkey>> {
199 let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch);
200 Ok(rpc_client.get_slot_leaders(start_slot, fanout)?)
201 }
202}
203
204const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
206
207#[derive(Clone, Debug)]
208struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
209impl RecentLeaderSlots {
210 fn new(current_slot: Slot) -> Self {
211 let mut recent_slots = VecDeque::new();
212 recent_slots.push_back(current_slot);
213 Self(Arc::new(RwLock::new(recent_slots)))
214 }
215
216 fn record_slot(&self, current_slot: Slot) {
217 let mut recent_slots = self.0.write().unwrap();
218 recent_slots.push_back(current_slot);
219 while recent_slots.len() > 12 {
222 recent_slots.pop_front();
223 }
224 }
225
226 fn estimated_current_slot(&self) -> Slot {
228 let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
229 assert!(!recent_slots.is_empty());
230 recent_slots.sort_unstable();
231
232 let max_index = recent_slots.len() - 1;
235 let median_index = max_index / 2;
236 let median_recent_slot = recent_slots[median_index];
237 let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
238 let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
239
240 recent_slots
243 .into_iter()
244 .rev()
245 .find(|slot| *slot <= max_reasonable_current_slot)
246 .unwrap()
247 }
248}
249
250#[cfg(test)]
251impl From<Vec<Slot>> for RecentLeaderSlots {
252 fn from(recent_slots: Vec<Slot>) -> Self {
253 assert!(!recent_slots.is_empty());
254 Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
255 }
256}
257
258struct LeaderTpuService {
261 recent_slots: RecentLeaderSlots,
262 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
263 subscription: Option<PubsubClientSubscription<SlotUpdate>>,
264 t_leader_tpu_service: Option<JoinHandle<()>>,
265}
266
267impl LeaderTpuService {
268 fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
269 let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
270
271 let recent_slots = RecentLeaderSlots::new(start_slot);
272 let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?));
273
274 let subscription = if !websocket_url.is_empty() {
275 let recent_slots = recent_slots.clone();
276 Some(PubsubClient::slot_updates_subscribe(
277 websocket_url,
278 move |update| {
279 let current_slot = match update {
280 SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
283 SlotUpdate::FirstShredReceived { slot, .. } => slot,
286 _ => return,
287 };
288 recent_slots.record_slot(current_slot);
289 },
290 )?)
291 } else {
292 None
293 };
294
295 let t_leader_tpu_service = Some({
296 let recent_slots = recent_slots.clone();
297 let leader_tpu_cache = leader_tpu_cache.clone();
298 std::thread::Builder::new()
299 .name("ldr-tpu-srv".to_string())
300 .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
301 .unwrap()
302 });
303
304 Ok(LeaderTpuService {
305 recent_slots,
306 leader_tpu_cache,
307 subscription,
308 t_leader_tpu_service,
309 })
310 }
311
312 fn join(&mut self) {
313 if let Some(mut subscription) = self.subscription.take() {
314 let _ = subscription.send_unsubscribe();
315 let _ = subscription.shutdown();
316 }
317 if let Some(t_handle) = self.t_leader_tpu_service.take() {
318 t_handle.join().unwrap();
319 }
320 }
321
322 fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
323 let current_slot = self.recent_slots.estimated_current_slot();
324 self.leader_tpu_cache
325 .read()
326 .unwrap()
327 .get_leader_sockets(current_slot, fanout_slots)
328 }
329
330 fn run(
331 rpc_client: Arc<RpcClient>,
332 recent_slots: RecentLeaderSlots,
333 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
334 exit: Arc<AtomicBool>,
335 ) {
336 let mut last_cluster_refresh = Instant::now();
337 let mut sleep_ms = 1000;
338 loop {
339 if exit.load(Ordering::Relaxed) {
340 break;
341 }
342
343 std::thread::sleep(Duration::from_millis(sleep_ms));
345 sleep_ms = 1000;
346
347 if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
350 match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
351 Ok(leader_tpu_map) => {
352 leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
353 last_cluster_refresh = Instant::now();
354 }
355 Err(err) => {
356 warn!("Failed to fetch cluster tpu sockets: {}", err);
357 sleep_ms = 100;
358 }
359 }
360 }
361
362 let estimated_current_slot = recent_slots.estimated_current_slot();
363 let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = {
364 let leader_tpu_cache = leader_tpu_cache.read().unwrap();
365 (
366 leader_tpu_cache.last_slot(),
367 leader_tpu_cache.last_epoch_info_slot,
368 leader_tpu_cache.slots_in_epoch,
369 )
370 };
371 if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
372 if let Ok(epoch_info) = rpc_client.get_epoch_info() {
373 slots_in_epoch = epoch_info.slots_in_epoch;
374 let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
375 leader_tpu_cache.slots_in_epoch = slots_in_epoch;
376 leader_tpu_cache.last_epoch_info_slot = estimated_current_slot;
377 }
378 }
379 if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) {
380 match LeaderTpuCache::fetch_slot_leaders(
381 &rpc_client,
382 estimated_current_slot,
383 slots_in_epoch,
384 ) {
385 Ok(slot_leaders) => {
386 let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
387 leader_tpu_cache.first_slot = estimated_current_slot;
388 leader_tpu_cache.leaders = slot_leaders;
389 }
390 Err(err) => {
391 warn!(
392 "Failed to fetch slot leaders (current estimated slot: {}): {}",
393 estimated_current_slot, err
394 );
395 sleep_ms = 100;
396 }
397 }
398 }
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
408 assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
409 }
410
411 #[test]
412 fn test_recent_leader_slots() {
413 assert_slot(RecentLeaderSlots::new(0), 0);
414
415 let mut recent_slots: Vec<Slot> = (1..=12).collect();
416 assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
417
418 recent_slots.reverse();
419 assert_slot(RecentLeaderSlots::from(recent_slots), 12);
420
421 assert_slot(
422 RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
423 1 + MAX_SLOT_SKIP_DISTANCE,
424 );
425 assert_slot(
426 RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
427 0,
428 );
429
430 assert_slot(RecentLeaderSlots::from(vec![1]), 1);
431 assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
432 assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
433 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
434 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
435 }
436}