solana_thin_client/
thin_client.rs

1//! The `thin_client` module is a client-side object that interfaces with
2//! a server-side TPU.  Client code should use this object instead of writing
3//! messages to the network directly. The binary encoding of its messages are
4//! unstable and may change in future releases.
5
6use {
7    log::*,
8    rayon::iter::{IntoParallelIterator, ParallelIterator},
9    solana_account::Account,
10    solana_client_traits::{AsyncClient, Client, SyncClient},
11    solana_clock::MAX_PROCESSING_AGE,
12    solana_commitment_config::CommitmentConfig,
13    solana_connection_cache::{
14        client_connection::ClientConnection,
15        connection_cache::{
16            ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
17        },
18    },
19    solana_epoch_info::EpochInfo,
20    solana_hash::Hash,
21    solana_instruction::Instruction,
22    solana_keypair::Keypair,
23    solana_message::Message,
24    solana_pubkey::Pubkey,
25    solana_rpc_client::rpc_client::RpcClient,
26    solana_rpc_client_api::config::RpcProgramAccountsConfig,
27    solana_signature::Signature,
28    solana_signer::{signers::Signers, Signer},
29    solana_system_interface::instruction::transfer,
30    solana_transaction::{versioned::VersionedTransaction, Transaction},
31    solana_transaction_error::{TransactionResult, TransportResult},
32    std::{
33        io,
34        net::SocketAddr,
35        sync::{
36            atomic::{AtomicBool, AtomicUsize, Ordering},
37            Arc, RwLock,
38        },
39        time::{Duration, Instant},
40    },
41};
42
43struct ClientOptimizer {
44    cur_index: AtomicUsize,
45    experiment_index: AtomicUsize,
46    experiment_done: AtomicBool,
47    times: RwLock<Vec<u64>>,
48    num_clients: usize,
49}
50
51impl ClientOptimizer {
52    fn new(num_clients: usize) -> Self {
53        Self {
54            cur_index: AtomicUsize::new(0),
55            experiment_index: AtomicUsize::new(0),
56            experiment_done: AtomicBool::new(false),
57            times: RwLock::new(vec![u64::MAX; num_clients]),
58            num_clients,
59        }
60    }
61
62    fn experiment(&self) -> usize {
63        if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
64            let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
65            if old < self.num_clients {
66                old
67            } else {
68                self.best()
69            }
70        } else {
71            self.best()
72        }
73    }
74
75    fn report(&self, index: usize, time_ms: u64) {
76        if self.num_clients > 1
77            && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
78        {
79            trace!(
80                "report {} with {} exp: {}",
81                index,
82                time_ms,
83                self.experiment_index.load(Ordering::Relaxed)
84            );
85
86            self.times.write().unwrap()[index] = time_ms;
87
88            if index == (self.num_clients - 1) || time_ms == u64::MAX {
89                let times = self.times.read().unwrap();
90                let (min_time, min_index) = min_index(&times);
91                trace!(
92                    "done experimenting min: {} time: {} times: {:?}",
93                    min_index,
94                    min_time,
95                    times
96                );
97
98                // Only 1 thread should grab the num_clients-1 index, so this should be ok.
99                self.cur_index.store(min_index, Ordering::Relaxed);
100                self.experiment_done.store(true, Ordering::Relaxed);
101            }
102        }
103    }
104
105    fn best(&self) -> usize {
106        self.cur_index.load(Ordering::Relaxed)
107    }
108}
109
110/// An object for querying and sending transactions to the network.
111#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
112pub struct ThinClient<
113    P, // ConnectionPool
114    M, // ConnectionManager
115    C, // NewConnectionConfig
116> {
117    rpc_clients: Vec<RpcClient>,
118    tpu_addrs: Vec<SocketAddr>,
119    optimizer: ClientOptimizer,
120    connection_cache: Arc<ConnectionCache<P, M, C>>,
121}
122
123#[allow(deprecated)]
124impl<P, M, C> ThinClient<P, M, C>
125where
126    P: ConnectionPool<NewConnectionConfig = C>,
127    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
128    C: NewConnectionConfig,
129{
130    /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
131    /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
132    /// (currently hardcoded to UDP)
133    pub fn new(
134        rpc_addr: SocketAddr,
135        tpu_addr: SocketAddr,
136        connection_cache: Arc<ConnectionCache<P, M, C>>,
137    ) -> Self {
138        Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139    }
140
141    pub fn new_socket_with_timeout(
142        rpc_addr: SocketAddr,
143        tpu_addr: SocketAddr,
144        timeout: Duration,
145        connection_cache: Arc<ConnectionCache<P, M, C>>,
146    ) -> Self {
147        let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148        Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149    }
150
151    fn new_from_client(
152        rpc_client: RpcClient,
153        tpu_addr: SocketAddr,
154        connection_cache: Arc<ConnectionCache<P, M, C>>,
155    ) -> Self {
156        Self {
157            rpc_clients: vec![rpc_client],
158            tpu_addrs: vec![tpu_addr],
159            optimizer: ClientOptimizer::new(0),
160            connection_cache,
161        }
162    }
163
164    pub fn new_from_addrs(
165        rpc_addrs: Vec<SocketAddr>,
166        tpu_addrs: Vec<SocketAddr>,
167        connection_cache: Arc<ConnectionCache<P, M, C>>,
168    ) -> Self {
169        assert!(!rpc_addrs.is_empty());
170        assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172        let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173        let optimizer = ClientOptimizer::new(rpc_clients.len());
174        Self {
175            rpc_clients,
176            tpu_addrs,
177            optimizer,
178            connection_cache,
179        }
180    }
181
182    fn tpu_addr(&self) -> &SocketAddr {
183        &self.tpu_addrs[self.optimizer.best()]
184    }
185
186    pub fn rpc_client(&self) -> &RpcClient {
187        &self.rpc_clients[self.optimizer.best()]
188    }
189
190    /// Retry a sending a signed Transaction to the server for processing.
191    pub fn retry_transfer_until_confirmed(
192        &self,
193        keypair: &Keypair,
194        transaction: &mut Transaction,
195        tries: usize,
196        min_confirmed_blocks: usize,
197    ) -> TransportResult<Signature> {
198        self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199    }
200
201    /// Retry sending a signed Transaction with one signing Keypair to the server for processing.
202    pub fn retry_transfer(
203        &self,
204        keypair: &Keypair,
205        transaction: &mut Transaction,
206        tries: usize,
207    ) -> TransportResult<Signature> {
208        self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209    }
210
211    pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
212        &self,
213        keypairs: &T,
214        transaction: &mut Transaction,
215        tries: usize,
216        pending_confirmations: usize,
217    ) -> TransportResult<Signature> {
218        for x in 0..tries {
219            let now = Instant::now();
220            let mut num_confirmed = 0;
221            let mut wait_time = MAX_PROCESSING_AGE;
222            // resend the same transaction until the transaction has no chance of succeeding
223            let wire_transaction =
224                bincode::serialize(&transaction).expect("transaction serialization failed");
225            while now.elapsed().as_secs() < wait_time as u64 {
226                if num_confirmed == 0 {
227                    let conn = self.connection_cache.get_connection(self.tpu_addr());
228                    // Send the transaction if there has been no confirmation (e.g. the first time)
229                    #[allow(clippy::needless_borrow)]
230                    conn.send_data(&wire_transaction)?;
231                }
232
233                if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
234                    &transaction.signatures[0],
235                    pending_confirmations,
236                ) {
237                    num_confirmed = confirmed_blocks;
238                    if confirmed_blocks >= pending_confirmations {
239                        return Ok(transaction.signatures[0]);
240                    }
241                    // Since network has seen the transaction, wait longer to receive
242                    // all pending confirmations. Resending the transaction could result into
243                    // extra transaction fees
244                    wait_time = wait_time.max(
245                        MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
246                    );
247                }
248            }
249            info!("{} tries failed transfer to {}", x, self.tpu_addr());
250            let blockhash = self.get_latest_blockhash()?;
251            transaction.sign(keypairs, blockhash);
252        }
253        Err(io::Error::other(format!("retry_transfer failed in {tries} retries")).into())
254    }
255
256    pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
257        self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
258    }
259
260    pub fn poll_get_balance_with_commitment(
261        &self,
262        pubkey: &Pubkey,
263        commitment_config: CommitmentConfig,
264    ) -> TransportResult<u64> {
265        self.rpc_client()
266            .poll_get_balance_with_commitment(pubkey, commitment_config)
267            .map_err(|e| e.into())
268    }
269
270    pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
271        self.rpc_client().wait_for_balance_with_commitment(
272            pubkey,
273            expected_balance,
274            CommitmentConfig::default(),
275        )
276    }
277
278    pub fn get_program_accounts_with_config(
279        &self,
280        pubkey: &Pubkey,
281        config: RpcProgramAccountsConfig,
282    ) -> TransportResult<Vec<(Pubkey, Account)>> {
283        self.rpc_client()
284            .get_program_accounts_with_config(pubkey, config)
285            .map_err(|e| e.into())
286    }
287
288    pub fn wait_for_balance_with_commitment(
289        &self,
290        pubkey: &Pubkey,
291        expected_balance: Option<u64>,
292        commitment_config: CommitmentConfig,
293    ) -> Option<u64> {
294        self.rpc_client().wait_for_balance_with_commitment(
295            pubkey,
296            expected_balance,
297            commitment_config,
298        )
299    }
300
301    pub fn poll_for_signature_with_commitment(
302        &self,
303        signature: &Signature,
304        commitment_config: CommitmentConfig,
305    ) -> TransportResult<()> {
306        self.rpc_client()
307            .poll_for_signature_with_commitment(signature, commitment_config)
308            .map_err(|e| e.into())
309    }
310
311    pub fn get_num_blocks_since_signature_confirmation(
312        &mut self,
313        sig: &Signature,
314    ) -> TransportResult<usize> {
315        self.rpc_client()
316            .get_num_blocks_since_signature_confirmation(sig)
317            .map_err(|e| e.into())
318    }
319}
320
321#[allow(deprecated)]
322impl<P, M, C> Client for ThinClient<P, M, C>
323where
324    P: ConnectionPool<NewConnectionConfig = C>,
325    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
326    C: NewConnectionConfig,
327{
328    fn tpu_addr(&self) -> String {
329        self.tpu_addr().to_string()
330    }
331}
332
333#[allow(deprecated)]
334impl<P, M, C> SyncClient for ThinClient<P, M, C>
335where
336    P: ConnectionPool<NewConnectionConfig = C>,
337    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
338    C: NewConnectionConfig,
339{
340    fn send_and_confirm_message<T: Signers + ?Sized>(
341        &self,
342        keypairs: &T,
343        message: Message,
344    ) -> TransportResult<Signature> {
345        let blockhash = self.get_latest_blockhash()?;
346        let mut transaction = Transaction::new(keypairs, message, blockhash);
347        let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
348        Ok(signature)
349    }
350
351    fn send_and_confirm_instruction(
352        &self,
353        keypair: &Keypair,
354        instruction: Instruction,
355    ) -> TransportResult<Signature> {
356        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
357        self.send_and_confirm_message(&[keypair], message)
358    }
359
360    fn transfer_and_confirm(
361        &self,
362        lamports: u64,
363        keypair: &Keypair,
364        pubkey: &Pubkey,
365    ) -> TransportResult<Signature> {
366        let transfer_instruction = transfer(&keypair.pubkey(), pubkey, lamports);
367        self.send_and_confirm_instruction(keypair, transfer_instruction)
368    }
369
370    fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
371        Ok(self.rpc_client().get_account_data(pubkey).ok())
372    }
373
374    fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
375        let account = self.rpc_client().get_account(pubkey);
376        match account {
377            Ok(value) => Ok(Some(value)),
378            Err(_) => Ok(None),
379        }
380    }
381
382    fn get_account_with_commitment(
383        &self,
384        pubkey: &Pubkey,
385        commitment_config: CommitmentConfig,
386    ) -> TransportResult<Option<Account>> {
387        self.rpc_client()
388            .get_account_with_commitment(pubkey, commitment_config)
389            .map_err(|e| e.into())
390            .map(|r| r.value)
391    }
392
393    fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
394        self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
395    }
396
397    fn get_balance_with_commitment(
398        &self,
399        pubkey: &Pubkey,
400        commitment_config: CommitmentConfig,
401    ) -> TransportResult<u64> {
402        self.rpc_client()
403            .get_balance_with_commitment(pubkey, commitment_config)
404            .map_err(|e| e.into())
405            .map(|r| r.value)
406    }
407
408    fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
409        self.rpc_client()
410            .get_minimum_balance_for_rent_exemption(data_len)
411            .map_err(|e| e.into())
412    }
413
414    fn get_signature_status(
415        &self,
416        signature: &Signature,
417    ) -> TransportResult<Option<TransactionResult<()>>> {
418        let status = self
419            .rpc_client()
420            .get_signature_status(signature)
421            .map_err(|err| {
422                io::Error::other(format!("send_transaction failed with error {err:?}"))
423            })?;
424        Ok(status)
425    }
426
427    fn get_signature_status_with_commitment(
428        &self,
429        signature: &Signature,
430        commitment_config: CommitmentConfig,
431    ) -> TransportResult<Option<TransactionResult<()>>> {
432        let status = self
433            .rpc_client()
434            .get_signature_status_with_commitment(signature, commitment_config)
435            .map_err(|err| {
436                io::Error::other(format!("send_transaction failed with error {err:?}"))
437            })?;
438        Ok(status)
439    }
440
441    fn get_slot(&self) -> TransportResult<u64> {
442        self.get_slot_with_commitment(CommitmentConfig::default())
443    }
444
445    fn get_slot_with_commitment(
446        &self,
447        commitment_config: CommitmentConfig,
448    ) -> TransportResult<u64> {
449        let slot = self
450            .rpc_client()
451            .get_slot_with_commitment(commitment_config)
452            .map_err(|err| {
453                io::Error::other(format!("send_transaction failed with error {err:?}"))
454            })?;
455        Ok(slot)
456    }
457
458    fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
459        self.rpc_client().get_epoch_info().map_err(|e| e.into())
460    }
461
462    fn get_transaction_count(&self) -> TransportResult<u64> {
463        let index = self.optimizer.experiment();
464        let now = Instant::now();
465        match self.rpc_client().get_transaction_count() {
466            Ok(transaction_count) => {
467                self.optimizer
468                    .report(index, now.elapsed().as_millis() as u64);
469                Ok(transaction_count)
470            }
471            Err(e) => {
472                self.optimizer.report(index, u64::MAX);
473                Err(e.into())
474            }
475        }
476    }
477
478    fn get_transaction_count_with_commitment(
479        &self,
480        commitment_config: CommitmentConfig,
481    ) -> TransportResult<u64> {
482        let index = self.optimizer.experiment();
483        let now = Instant::now();
484        match self
485            .rpc_client()
486            .get_transaction_count_with_commitment(commitment_config)
487        {
488            Ok(transaction_count) => {
489                self.optimizer
490                    .report(index, now.elapsed().as_millis() as u64);
491                Ok(transaction_count)
492            }
493            Err(e) => {
494                self.optimizer.report(index, u64::MAX);
495                Err(e.into())
496            }
497        }
498    }
499
500    /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
501    fn poll_for_signature_confirmation(
502        &self,
503        signature: &Signature,
504        min_confirmed_blocks: usize,
505    ) -> TransportResult<usize> {
506        self.rpc_client()
507            .poll_for_signature_confirmation(signature, min_confirmed_blocks)
508            .map_err(|e| e.into())
509    }
510
511    fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
512        self.rpc_client()
513            .poll_for_signature(signature)
514            .map_err(|e| e.into())
515    }
516
517    fn get_latest_blockhash(&self) -> TransportResult<Hash> {
518        let (blockhash, _) =
519            self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
520        Ok(blockhash)
521    }
522
523    fn get_latest_blockhash_with_commitment(
524        &self,
525        commitment_config: CommitmentConfig,
526    ) -> TransportResult<(Hash, u64)> {
527        let index = self.optimizer.experiment();
528        let now = Instant::now();
529        match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
530            Ok((blockhash, last_valid_block_height)) => {
531                self.optimizer
532                    .report(index, now.elapsed().as_millis() as u64);
533                Ok((blockhash, last_valid_block_height))
534            }
535            Err(e) => {
536                self.optimizer.report(index, u64::MAX);
537                Err(e.into())
538            }
539        }
540    }
541
542    fn is_blockhash_valid(
543        &self,
544        blockhash: &Hash,
545        commitment_config: CommitmentConfig,
546    ) -> TransportResult<bool> {
547        self.rpc_client()
548            .is_blockhash_valid(blockhash, commitment_config)
549            .map_err(|e| e.into())
550    }
551
552    fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
553        self.rpc_client()
554            .get_fee_for_message(message)
555            .map_err(|e| e.into())
556    }
557}
558
559#[allow(deprecated)]
560impl<P, M, C> AsyncClient for ThinClient<P, M, C>
561where
562    P: ConnectionPool<NewConnectionConfig = C>,
563    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
564    C: NewConnectionConfig,
565{
566    fn async_send_versioned_transaction(
567        &self,
568        transaction: VersionedTransaction,
569    ) -> TransportResult<Signature> {
570        let conn = self.connection_cache.get_connection(self.tpu_addr());
571        let wire_transaction =
572            bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
573        conn.send_data(&wire_transaction)?;
574        Ok(transaction.signatures[0])
575    }
576
577    fn async_send_versioned_transaction_batch(
578        &self,
579        batch: Vec<VersionedTransaction>,
580    ) -> TransportResult<()> {
581        let conn = self.connection_cache.get_connection(self.tpu_addr());
582        let buffers = batch
583            .into_par_iter()
584            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
585            .collect::<Vec<_>>();
586        conn.send_data_batch(&buffers)?;
587        Ok(())
588    }
589}
590
591fn min_index(array: &[u64]) -> (u64, usize) {
592    let mut min_time = u64::MAX;
593    let mut min_index = 0;
594    for (i, time) in array.iter().enumerate() {
595        if *time < min_time {
596            min_time = *time;
597            min_index = i;
598        }
599    }
600    (min_time, min_index)
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606
607    #[test]
608    fn test_client_optimizer() {
609        solana_logger::setup();
610
611        const NUM_CLIENTS: usize = 5;
612        let optimizer = ClientOptimizer::new(NUM_CLIENTS);
613        (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
614            let index = optimizer.experiment();
615            optimizer.report(index, (NUM_CLIENTS - index) as u64);
616        });
617
618        let index = optimizer.experiment();
619        optimizer.report(index, 50);
620        assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
621
622        optimizer.report(optimizer.best(), u64::MAX);
623        assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
624    }
625}