solana_send_transaction_service/
transaction_client.rs

1use {
2    crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo},
3    async_trait::async_trait,
4    log::warn,
5    solana_client::connection_cache::{ConnectionCache, Protocol},
6    solana_connection_cache::client_connection::ClientConnection as TpuConnection,
7    solana_keypair::Keypair,
8    solana_measure::measure::Measure,
9    solana_quic_definitions::NotifyKeyUpdate,
10    solana_tpu_client_next::{
11        connection_workers_scheduler::{
12            BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
13        },
14        leader_updater::LeaderUpdater,
15        transaction_batch::TransactionBatch,
16        ConnectionWorkersScheduler,
17    },
18    std::{
19        net::{SocketAddr, UdpSocket},
20        sync::{atomic::Ordering, Arc, Mutex},
21        time::{Duration, Instant},
22    },
23    tokio::{
24        runtime::Handle,
25        sync::{
26            mpsc::{self},
27            watch,
28        },
29    },
30    tokio_util::sync::CancellationToken,
31};
32
33/// How many connections to maintain the tpu-client-next cache. The value is
34/// chosen to match MAX_CONNECTIONS from ConnectionCache
35const MAX_CONNECTIONS: usize = 1024;
36
37// Alias trait to shorten function definitions.
38pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {}
39impl<T> TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {}
40
41pub trait TransactionClient {
42    fn send_transactions_in_batch(
43        &self,
44        wire_transactions: Vec<Vec<u8>>,
45        stats: &SendTransactionServiceStats,
46    );
47
48    #[cfg(any(test, feature = "dev-context-only-utils"))]
49    fn protocol(&self) -> Protocol;
50}
51
52pub struct ConnectionCacheClient<T: TpuInfoWithSendStatic> {
53    connection_cache: Arc<ConnectionCache>,
54    tpu_address: SocketAddr,
55    tpu_peers: Option<Vec<SocketAddr>>,
56    leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
57    leader_forward_count: u64,
58}
59
60// Manual implementation of Clone without requiring T to be Clone
61impl<T> Clone for ConnectionCacheClient<T>
62where
63    T: TpuInfoWithSendStatic,
64{
65    fn clone(&self) -> Self {
66        Self {
67            connection_cache: Arc::clone(&self.connection_cache),
68            tpu_address: self.tpu_address,
69            tpu_peers: self.tpu_peers.clone(),
70            leader_info_provider: Arc::clone(&self.leader_info_provider),
71            leader_forward_count: self.leader_forward_count,
72        }
73    }
74}
75
76impl<T> ConnectionCacheClient<T>
77where
78    T: TpuInfoWithSendStatic,
79{
80    pub fn new(
81        connection_cache: Arc<ConnectionCache>,
82        tpu_address: SocketAddr,
83        tpu_peers: Option<Vec<SocketAddr>>,
84        leader_info: Option<T>,
85        leader_forward_count: u64,
86    ) -> Self {
87        let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));
88        Self {
89            connection_cache,
90            tpu_address,
91            tpu_peers,
92            leader_info_provider,
93            leader_forward_count,
94        }
95    }
96
97    fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> {
98        leader_info
99            .map(|leader_info| {
100                leader_info
101                    .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol())
102            })
103            .filter(|addresses| !addresses.is_empty())
104            .unwrap_or_else(|| vec![&self.tpu_address])
105    }
106
107    fn send_transactions(
108        &self,
109        peer: &SocketAddr,
110        wire_transactions: Vec<Vec<u8>>,
111        stats: &SendTransactionServiceStats,
112    ) {
113        let mut measure = Measure::start("send-us");
114        let conn = self.connection_cache.get_connection(peer);
115        let result = conn.send_data_batch_async(wire_transactions);
116
117        if let Err(err) = result {
118            warn!(
119                "Failed to send transaction transaction to {}: {:?}",
120                self.tpu_address, err
121            );
122            stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
123        }
124
125        measure.stop();
126        stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed);
127        stats.send_attempt_count.fetch_add(1, Ordering::Relaxed);
128    }
129}
130
131impl<T> TransactionClient for ConnectionCacheClient<T>
132where
133    T: TpuInfoWithSendStatic,
134{
135    fn send_transactions_in_batch(
136        &self,
137        wire_transactions: Vec<Vec<u8>>,
138        stats: &SendTransactionServiceStats,
139    ) {
140        // Processing the transactions in batch
141        let mut addresses = self
142            .tpu_peers
143            .as_ref()
144            .map(|addrs| addrs.iter().collect::<Vec<_>>())
145            .unwrap_or_default();
146        let mut leader_info_provider = self.leader_info_provider.lock().unwrap();
147        let leader_info = leader_info_provider.get_leader_info();
148        let leader_addresses = self.get_tpu_addresses(leader_info);
149        addresses.extend(leader_addresses);
150
151        for address in &addresses {
152            self.send_transactions(address, wire_transactions.clone(), stats);
153        }
154    }
155
156    #[cfg(any(test, feature = "dev-context-only-utils"))]
157    fn protocol(&self) -> Protocol {
158        self.connection_cache.protocol()
159    }
160}
161
162impl<T> NotifyKeyUpdate for ConnectionCacheClient<T>
163where
164    T: TpuInfoWithSendStatic,
165{
166    fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
167        self.connection_cache.update_key(identity)
168    }
169}
170
171/// The leader info refresh rate.
172pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000;
173
174/// A struct responsible for holding up-to-date leader information
175/// used for sending transactions.
176#[derive(Clone)]
177pub struct CurrentLeaderInfo<T>
178where
179    T: TpuInfoWithSendStatic,
180{
181    /// The last time the leader info was refreshed
182    last_leader_refresh: Option<Instant>,
183
184    /// The leader info
185    leader_info: Option<T>,
186
187    /// How often to refresh the leader info
188    refresh_rate: Duration,
189}
190
191impl<T> CurrentLeaderInfo<T>
192where
193    T: TpuInfoWithSendStatic,
194{
195    /// Get the leader info, refresh if expired
196    pub fn get_leader_info(&mut self) -> Option<&T> {
197        if let Some(leader_info) = self.leader_info.as_mut() {
198            let now = Instant::now();
199            let need_refresh = self
200                .last_leader_refresh
201                .map(|last| now.duration_since(last) >= self.refresh_rate)
202                .unwrap_or(true);
203
204            if need_refresh {
205                leader_info.refresh_recent_peers();
206                self.last_leader_refresh = Some(now);
207            }
208        }
209        self.leader_info.as_ref()
210    }
211
212    pub fn new(leader_info: Option<T>) -> Self {
213        Self {
214            last_leader_refresh: None,
215            leader_info,
216            refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS),
217        }
218    }
219}
220
221/// `TpuClientNextClient` provides an interface for managing the
222/// [`ConnectionWorkersScheduler`].
223///
224/// It allows:
225/// * Create and initializes the scheduler with runtime configurations,
226/// * Send transactions to the connection scheduler,
227/// * Update the validator identity keypair and propagate the changes to the
228///   scheduler. Most of the complexity of this structure arises from this
229///   functionality.
230#[derive(Clone)]
231pub struct TpuClientNextClient {
232    runtime_handle: Handle,
233    sender: mpsc::Sender<TransactionBatch>,
234    update_certificate_sender: watch::Sender<Option<StakeIdentity>>,
235    #[cfg(any(test, feature = "dev-context-only-utils"))]
236    cancel: CancellationToken,
237}
238
239const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
240impl TpuClientNextClient {
241    pub fn new<T>(
242        runtime_handle: Handle,
243        my_tpu_address: SocketAddr,
244        tpu_peers: Option<Vec<SocketAddr>>,
245        leader_info: Option<T>,
246        leader_forward_count: u64,
247        identity: Option<&Keypair>,
248        bind_socket: UdpSocket,
249        cancel: CancellationToken,
250    ) -> Self
251    where
252        T: TpuInfoWithSendStatic + Clone,
253    {
254        // The channel size represents 8s worth of transactions at a rate of
255        // 1000 tps, assuming batch size is 64.
256        let (sender, receiver) = mpsc::channel(128);
257
258        let (update_certificate_sender, update_certificate_receiver) = watch::channel(None);
259
260        let leader_info_provider = CurrentLeaderInfo::new(leader_info);
261        let leader_updater: SendTransactionServiceLeaderUpdater<T> =
262            SendTransactionServiceLeaderUpdater {
263                leader_info_provider,
264                my_tpu_address,
265                tpu_peers,
266            };
267        let config = Self::create_config(bind_socket, identity, leader_forward_count as usize);
268
269        let scheduler = ConnectionWorkersScheduler::new(
270            Box::new(leader_updater),
271            receiver,
272            update_certificate_receiver,
273            cancel.clone(),
274        );
275        // leaking handle to this task, as it will run until the cancel signal is received
276        runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
277            "send-transaction-service-TPU-client",
278            METRICS_REPORTING_INTERVAL,
279            cancel.clone(),
280        ));
281        let _handle = runtime_handle.spawn(scheduler.run(config));
282        Self {
283            runtime_handle,
284            sender,
285            update_certificate_sender,
286            #[cfg(any(test, feature = "dev-context-only-utils"))]
287            cancel,
288        }
289    }
290
291    fn create_config(
292        bind_socket: UdpSocket,
293        stake_identity: Option<&Keypair>,
294        leader_forward_count: usize,
295    ) -> ConnectionWorkersSchedulerConfig {
296        ConnectionWorkersSchedulerConfig {
297            bind: BindTarget::Socket(bind_socket),
298            stake_identity: stake_identity.map(StakeIdentity::new),
299            num_connections: MAX_CONNECTIONS,
300            skip_check_transaction_age: true,
301            // experimentally found parameter values
302            worker_channel_size: 64,
303            max_reconnect_attempts: 4,
304            // We open connection to one more leader in advance, which time-wise means ~1.6s
305            leaders_fanout: Fanout {
306                connect: leader_forward_count + 1,
307                send: leader_forward_count,
308            },
309        }
310    }
311
312    #[cfg(any(test, feature = "dev-context-only-utils"))]
313    pub fn cancel(&self) {
314        self.cancel.cancel();
315    }
316}
317
318impl NotifyKeyUpdate for TpuClientNextClient {
319    fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
320        let stake_identity = StakeIdentity::new(identity);
321        self.update_certificate_sender
322            .send(Some(stake_identity))
323            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
324    }
325}
326
327impl TransactionClient for TpuClientNextClient {
328    fn send_transactions_in_batch(
329        &self,
330        wire_transactions: Vec<Vec<u8>>,
331        stats: &SendTransactionServiceStats,
332    ) {
333        let mut measure = Measure::start("send-us");
334        self.runtime_handle.spawn({
335            let sender = self.sender.clone();
336            async move {
337                let res = sender.send(TransactionBatch::new(wire_transactions)).await;
338                if res.is_err() {
339                    warn!("Failed to send transaction to channel: it is closed.");
340                }
341            }
342        });
343
344        measure.stop();
345        stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed);
346        stats.send_attempt_count.fetch_add(1, Ordering::Relaxed);
347    }
348
349    #[cfg(any(test, feature = "dev-context-only-utils"))]
350    fn protocol(&self) -> Protocol {
351        Protocol::QUIC
352    }
353}
354
355#[derive(Clone)]
356pub struct SendTransactionServiceLeaderUpdater<T: TpuInfoWithSendStatic> {
357    leader_info_provider: CurrentLeaderInfo<T>,
358    my_tpu_address: SocketAddr,
359    tpu_peers: Option<Vec<SocketAddr>>,
360}
361
362#[async_trait]
363impl<T> LeaderUpdater for SendTransactionServiceLeaderUpdater<T>
364where
365    T: TpuInfoWithSendStatic,
366{
367    fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
368        let discovered_peers = self
369            .leader_info_provider
370            .get_leader_info()
371            .map(|leader_info| {
372                leader_info.get_not_unique_leader_tpus(lookahead_leaders as u64, Protocol::QUIC)
373            })
374            .filter(|addresses| !addresses.is_empty())
375            .unwrap_or_else(|| vec![&self.my_tpu_address]);
376        let mut all_peers = self.tpu_peers.clone().unwrap_or_default();
377        all_peers.extend(discovered_peers.into_iter().cloned());
378        all_peers
379    }
380    async fn stop(&mut self) {}
381}