solana_send_transaction_service/
transaction_client.rs1use {
2 crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo},
3 async_trait::async_trait,
4 log::warn,
5 solana_client::connection_cache::{ConnectionCache, Protocol},
6 solana_connection_cache::client_connection::ClientConnection as TpuConnection,
7 solana_keypair::Keypair,
8 solana_measure::measure::Measure,
9 solana_quic_definitions::NotifyKeyUpdate,
10 solana_tpu_client_next::{
11 connection_workers_scheduler::{
12 BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
13 },
14 leader_updater::LeaderUpdater,
15 transaction_batch::TransactionBatch,
16 ConnectionWorkersScheduler,
17 },
18 std::{
19 net::{SocketAddr, UdpSocket},
20 sync::{atomic::Ordering, Arc, Mutex},
21 time::{Duration, Instant},
22 },
23 tokio::{
24 runtime::Handle,
25 sync::{
26 mpsc::{self},
27 watch,
28 },
29 },
30 tokio_util::sync::CancellationToken,
31};
32
33const MAX_CONNECTIONS: usize = 1024;
36
37pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {}
39impl<T> TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {}
40
41pub trait TransactionClient {
42 fn send_transactions_in_batch(
43 &self,
44 wire_transactions: Vec<Vec<u8>>,
45 stats: &SendTransactionServiceStats,
46 );
47
48 #[cfg(any(test, feature = "dev-context-only-utils"))]
49 fn protocol(&self) -> Protocol;
50}
51
52pub struct ConnectionCacheClient<T: TpuInfoWithSendStatic> {
53 connection_cache: Arc<ConnectionCache>,
54 tpu_address: SocketAddr,
55 tpu_peers: Option<Vec<SocketAddr>>,
56 leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
57 leader_forward_count: u64,
58}
59
60impl<T> Clone for ConnectionCacheClient<T>
62where
63 T: TpuInfoWithSendStatic,
64{
65 fn clone(&self) -> Self {
66 Self {
67 connection_cache: Arc::clone(&self.connection_cache),
68 tpu_address: self.tpu_address,
69 tpu_peers: self.tpu_peers.clone(),
70 leader_info_provider: Arc::clone(&self.leader_info_provider),
71 leader_forward_count: self.leader_forward_count,
72 }
73 }
74}
75
76impl<T> ConnectionCacheClient<T>
77where
78 T: TpuInfoWithSendStatic,
79{
80 pub fn new(
81 connection_cache: Arc<ConnectionCache>,
82 tpu_address: SocketAddr,
83 tpu_peers: Option<Vec<SocketAddr>>,
84 leader_info: Option<T>,
85 leader_forward_count: u64,
86 ) -> Self {
87 let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));
88 Self {
89 connection_cache,
90 tpu_address,
91 tpu_peers,
92 leader_info_provider,
93 leader_forward_count,
94 }
95 }
96
97 fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> {
98 leader_info
99 .map(|leader_info| {
100 leader_info
101 .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol())
102 })
103 .filter(|addresses| !addresses.is_empty())
104 .unwrap_or_else(|| vec![&self.tpu_address])
105 }
106
107 fn send_transactions(
108 &self,
109 peer: &SocketAddr,
110 wire_transactions: Vec<Vec<u8>>,
111 stats: &SendTransactionServiceStats,
112 ) {
113 let mut measure = Measure::start("send-us");
114 let conn = self.connection_cache.get_connection(peer);
115 let result = conn.send_data_batch_async(wire_transactions);
116
117 if let Err(err) = result {
118 warn!(
119 "Failed to send transaction transaction to {}: {:?}",
120 self.tpu_address, err
121 );
122 stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
123 }
124
125 measure.stop();
126 stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed);
127 stats.send_attempt_count.fetch_add(1, Ordering::Relaxed);
128 }
129}
130
131impl<T> TransactionClient for ConnectionCacheClient<T>
132where
133 T: TpuInfoWithSendStatic,
134{
135 fn send_transactions_in_batch(
136 &self,
137 wire_transactions: Vec<Vec<u8>>,
138 stats: &SendTransactionServiceStats,
139 ) {
140 let mut addresses = self
142 .tpu_peers
143 .as_ref()
144 .map(|addrs| addrs.iter().collect::<Vec<_>>())
145 .unwrap_or_default();
146 let mut leader_info_provider = self.leader_info_provider.lock().unwrap();
147 let leader_info = leader_info_provider.get_leader_info();
148 let leader_addresses = self.get_tpu_addresses(leader_info);
149 addresses.extend(leader_addresses);
150
151 for address in &addresses {
152 self.send_transactions(address, wire_transactions.clone(), stats);
153 }
154 }
155
156 #[cfg(any(test, feature = "dev-context-only-utils"))]
157 fn protocol(&self) -> Protocol {
158 self.connection_cache.protocol()
159 }
160}
161
162impl<T> NotifyKeyUpdate for ConnectionCacheClient<T>
163where
164 T: TpuInfoWithSendStatic,
165{
166 fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
167 self.connection_cache.update_key(identity)
168 }
169}
170
171pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000;
173
174#[derive(Clone)]
177pub struct CurrentLeaderInfo<T>
178where
179 T: TpuInfoWithSendStatic,
180{
181 last_leader_refresh: Option<Instant>,
183
184 leader_info: Option<T>,
186
187 refresh_rate: Duration,
189}
190
191impl<T> CurrentLeaderInfo<T>
192where
193 T: TpuInfoWithSendStatic,
194{
195 pub fn get_leader_info(&mut self) -> Option<&T> {
197 if let Some(leader_info) = self.leader_info.as_mut() {
198 let now = Instant::now();
199 let need_refresh = self
200 .last_leader_refresh
201 .map(|last| now.duration_since(last) >= self.refresh_rate)
202 .unwrap_or(true);
203
204 if need_refresh {
205 leader_info.refresh_recent_peers();
206 self.last_leader_refresh = Some(now);
207 }
208 }
209 self.leader_info.as_ref()
210 }
211
212 pub fn new(leader_info: Option<T>) -> Self {
213 Self {
214 last_leader_refresh: None,
215 leader_info,
216 refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS),
217 }
218 }
219}
220
221#[derive(Clone)]
231pub struct TpuClientNextClient {
232 runtime_handle: Handle,
233 sender: mpsc::Sender<TransactionBatch>,
234 update_certificate_sender: watch::Sender<Option<StakeIdentity>>,
235 #[cfg(any(test, feature = "dev-context-only-utils"))]
236 cancel: CancellationToken,
237}
238
239const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
240impl TpuClientNextClient {
241 pub fn new<T>(
242 runtime_handle: Handle,
243 my_tpu_address: SocketAddr,
244 tpu_peers: Option<Vec<SocketAddr>>,
245 leader_info: Option<T>,
246 leader_forward_count: u64,
247 identity: Option<&Keypair>,
248 bind_socket: UdpSocket,
249 cancel: CancellationToken,
250 ) -> Self
251 where
252 T: TpuInfoWithSendStatic + Clone,
253 {
254 let (sender, receiver) = mpsc::channel(128);
257
258 let (update_certificate_sender, update_certificate_receiver) = watch::channel(None);
259
260 let leader_info_provider = CurrentLeaderInfo::new(leader_info);
261 let leader_updater: SendTransactionServiceLeaderUpdater<T> =
262 SendTransactionServiceLeaderUpdater {
263 leader_info_provider,
264 my_tpu_address,
265 tpu_peers,
266 };
267 let config = Self::create_config(bind_socket, identity, leader_forward_count as usize);
268
269 let scheduler = ConnectionWorkersScheduler::new(
270 Box::new(leader_updater),
271 receiver,
272 update_certificate_receiver,
273 cancel.clone(),
274 );
275 runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
277 "send-transaction-service-TPU-client",
278 METRICS_REPORTING_INTERVAL,
279 cancel.clone(),
280 ));
281 let _handle = runtime_handle.spawn(scheduler.run(config));
282 Self {
283 runtime_handle,
284 sender,
285 update_certificate_sender,
286 #[cfg(any(test, feature = "dev-context-only-utils"))]
287 cancel,
288 }
289 }
290
291 fn create_config(
292 bind_socket: UdpSocket,
293 stake_identity: Option<&Keypair>,
294 leader_forward_count: usize,
295 ) -> ConnectionWorkersSchedulerConfig {
296 ConnectionWorkersSchedulerConfig {
297 bind: BindTarget::Socket(bind_socket),
298 stake_identity: stake_identity.map(StakeIdentity::new),
299 num_connections: MAX_CONNECTIONS,
300 skip_check_transaction_age: true,
301 worker_channel_size: 64,
303 max_reconnect_attempts: 4,
304 leaders_fanout: Fanout {
306 connect: leader_forward_count + 1,
307 send: leader_forward_count,
308 },
309 }
310 }
311
312 #[cfg(any(test, feature = "dev-context-only-utils"))]
313 pub fn cancel(&self) {
314 self.cancel.cancel();
315 }
316}
317
318impl NotifyKeyUpdate for TpuClientNextClient {
319 fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
320 let stake_identity = StakeIdentity::new(identity);
321 self.update_certificate_sender
322 .send(Some(stake_identity))
323 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
324 }
325}
326
327impl TransactionClient for TpuClientNextClient {
328 fn send_transactions_in_batch(
329 &self,
330 wire_transactions: Vec<Vec<u8>>,
331 stats: &SendTransactionServiceStats,
332 ) {
333 let mut measure = Measure::start("send-us");
334 self.runtime_handle.spawn({
335 let sender = self.sender.clone();
336 async move {
337 let res = sender.send(TransactionBatch::new(wire_transactions)).await;
338 if res.is_err() {
339 warn!("Failed to send transaction to channel: it is closed.");
340 }
341 }
342 });
343
344 measure.stop();
345 stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed);
346 stats.send_attempt_count.fetch_add(1, Ordering::Relaxed);
347 }
348
349 #[cfg(any(test, feature = "dev-context-only-utils"))]
350 fn protocol(&self) -> Protocol {
351 Protocol::QUIC
352 }
353}
354
355#[derive(Clone)]
356pub struct SendTransactionServiceLeaderUpdater<T: TpuInfoWithSendStatic> {
357 leader_info_provider: CurrentLeaderInfo<T>,
358 my_tpu_address: SocketAddr,
359 tpu_peers: Option<Vec<SocketAddr>>,
360}
361
362#[async_trait]
363impl<T> LeaderUpdater for SendTransactionServiceLeaderUpdater<T>
364where
365 T: TpuInfoWithSendStatic,
366{
367 fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
368 let discovered_peers = self
369 .leader_info_provider
370 .get_leader_info()
371 .map(|leader_info| {
372 leader_info.get_not_unique_leader_tpus(lookahead_leaders as u64, Protocol::QUIC)
373 })
374 .filter(|addresses| !addresses.is_empty())
375 .unwrap_or_else(|| vec![&self.my_tpu_address]);
376 let mut all_peers = self.tpu_peers.clone().unwrap_or_default();
377 all_peers.extend(discovered_peers.into_iter().cloned());
378 all_peers
379 }
380 async fn stop(&mut self) {}
381}