gemachain_client/
thin_client.rs

1//! The `thin_client` module is a client-side object that interfaces with
2//! a server-side TPU.  Client code should use this object instead of writing
3//! messages to the network directly. The binary encoding of its messages are
4//! unstable and may change in future releases.
5
6use {
7    crate::{rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response},
8    bincode::{serialize_into, serialized_size},
9    log::*,
10    gemachain_sdk::{
11        account::Account,
12        client::{AsyncClient, Client, SyncClient},
13        clock::{Slot, MAX_PROCESSING_AGE},
14        commitment_config::CommitmentConfig,
15        epoch_info::EpochInfo,
16        fee_calculator::{FeeCalculator, FeeRateGovernor},
17        hash::Hash,
18        instruction::Instruction,
19        message::Message,
20        packet::PACKET_DATA_SIZE,
21        pubkey::Pubkey,
22        signature::{Keypair, Signature, Signer},
23        signers::Signers,
24        system_instruction,
25        timing::duration_as_ms,
26        transaction::{self, Transaction},
27        transport::Result as TransportResult,
28    },
29    std::{
30        io,
31        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
32        sync::{
33            atomic::{AtomicBool, AtomicUsize, Ordering},
34            RwLock,
35        },
36        time::{Duration, Instant},
37    },
38};
39
40struct ClientOptimizer {
41    cur_index: AtomicUsize,
42    experiment_index: AtomicUsize,
43    experiment_done: AtomicBool,
44    times: RwLock<Vec<u64>>,
45    num_clients: usize,
46}
47
48fn min_index(array: &[u64]) -> (u64, usize) {
49    let mut min_time = std::u64::MAX;
50    let mut min_index = 0;
51    for (i, time) in array.iter().enumerate() {
52        if *time < min_time {
53            min_time = *time;
54            min_index = i;
55        }
56    }
57    (min_time, min_index)
58}
59
60impl ClientOptimizer {
61    fn new(num_clients: usize) -> Self {
62        Self {
63            cur_index: AtomicUsize::new(0),
64            experiment_index: AtomicUsize::new(0),
65            experiment_done: AtomicBool::new(false),
66            times: RwLock::new(vec![std::u64::MAX; num_clients]),
67            num_clients,
68        }
69    }
70
71    fn experiment(&self) -> usize {
72        if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
73            let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
74            if old < self.num_clients {
75                old
76            } else {
77                self.best()
78            }
79        } else {
80            self.best()
81        }
82    }
83
84    fn report(&self, index: usize, time_ms: u64) {
85        if self.num_clients > 1
86            && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
87        {
88            trace!(
89                "report {} with {} exp: {}",
90                index,
91                time_ms,
92                self.experiment_index.load(Ordering::Relaxed)
93            );
94
95            self.times.write().unwrap()[index] = time_ms;
96
97            if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
98                let times = self.times.read().unwrap();
99                let (min_time, min_index) = min_index(&times);
100                trace!(
101                    "done experimenting min: {} time: {} times: {:?}",
102                    min_index,
103                    min_time,
104                    times
105                );
106
107                // Only 1 thread should grab the num_clients-1 index, so this should be ok.
108                self.cur_index.store(min_index, Ordering::Relaxed);
109                self.experiment_done.store(true, Ordering::Relaxed);
110            }
111        }
112    }
113
114    fn best(&self) -> usize {
115        self.cur_index.load(Ordering::Relaxed)
116    }
117}
118
119/// An object for querying and sending transactions to the network.
120pub struct ThinClient {
121    transactions_socket: UdpSocket,
122    tpu_addrs: Vec<SocketAddr>,
123    rpc_clients: Vec<RpcClient>,
124    optimizer: ClientOptimizer,
125}
126
127impl ThinClient {
128    /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
129    /// and the Tpu at `tpu_addr` over `transactions_socket` using UDP.
130    pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self {
131        Self::new_from_client(
132            tpu_addr,
133            transactions_socket,
134            RpcClient::new_socket(rpc_addr),
135        )
136    }
137
138    pub fn new_socket_with_timeout(
139        rpc_addr: SocketAddr,
140        tpu_addr: SocketAddr,
141        transactions_socket: UdpSocket,
142        timeout: Duration,
143    ) -> Self {
144        let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
145        Self::new_from_client(tpu_addr, transactions_socket, rpc_client)
146    }
147
148    fn new_from_client(
149        tpu_addr: SocketAddr,
150        transactions_socket: UdpSocket,
151        rpc_client: RpcClient,
152    ) -> Self {
153        Self {
154            transactions_socket,
155            tpu_addrs: vec![tpu_addr],
156            rpc_clients: vec![rpc_client],
157            optimizer: ClientOptimizer::new(0),
158        }
159    }
160
161    pub fn new_from_addrs(
162        rpc_addrs: Vec<SocketAddr>,
163        tpu_addrs: Vec<SocketAddr>,
164        transactions_socket: UdpSocket,
165    ) -> Self {
166        assert!(!rpc_addrs.is_empty());
167        assert_eq!(rpc_addrs.len(), tpu_addrs.len());
168
169        let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
170        let optimizer = ClientOptimizer::new(rpc_clients.len());
171        Self {
172            transactions_socket,
173            tpu_addrs,
174            rpc_clients,
175            optimizer,
176        }
177    }
178
179    fn tpu_addr(&self) -> &SocketAddr {
180        &self.tpu_addrs[self.optimizer.best()]
181    }
182
183    fn rpc_client(&self) -> &RpcClient {
184        &self.rpc_clients[self.optimizer.best()]
185    }
186
187    /// Retry a sending a signed Transaction to the server for processing.
188    pub fn retry_transfer_until_confirmed(
189        &self,
190        keypair: &Keypair,
191        transaction: &mut Transaction,
192        tries: usize,
193        min_confirmed_blocks: usize,
194    ) -> TransportResult<Signature> {
195        self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
196    }
197
198    /// Retry sending a signed Transaction with one signing Keypair to the server for processing.
199    pub fn retry_transfer(
200        &self,
201        keypair: &Keypair,
202        transaction: &mut Transaction,
203        tries: usize,
204    ) -> TransportResult<Signature> {
205        self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
206    }
207
208    /// Retry sending a signed Transaction to the server for processing
209    pub fn send_and_confirm_transaction<T: Signers>(
210        &self,
211        keypairs: &T,
212        transaction: &mut Transaction,
213        tries: usize,
214        pending_confirmations: usize,
215    ) -> TransportResult<Signature> {
216        for x in 0..tries {
217            let now = Instant::now();
218            let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
219            let mut wr = std::io::Cursor::new(&mut buf[..]);
220            let mut num_confirmed = 0;
221            let mut wait_time = MAX_PROCESSING_AGE;
222            serialize_into(&mut wr, &transaction)
223                .expect("serialize Transaction in pub fn transfer_signed");
224            // resend the same transaction until the transaction has no chance of succeeding
225            while now.elapsed().as_secs() < wait_time as u64 {
226                if num_confirmed == 0 {
227                    // Send the transaction if there has been no confirmation (e.g. the first time)
228                    self.transactions_socket
229                        .send_to(&buf[..], &self.tpu_addr())?;
230                }
231
232                if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
233                    &transaction.signatures[0],
234                    pending_confirmations,
235                ) {
236                    num_confirmed = confirmed_blocks;
237                    if confirmed_blocks >= pending_confirmations {
238                        return Ok(transaction.signatures[0]);
239                    }
240                    // Since network has seen the transaction, wait longer to receive
241                    // all pending confirmations. Resending the transaction could result into
242                    // extra transaction fees
243                    wait_time = wait_time.max(
244                        MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
245                    );
246                }
247            }
248            info!("{} tries failed transfer to {}", x, self.tpu_addr());
249            let blockhash = self.get_latest_blockhash()?;
250            transaction.sign(keypairs, blockhash);
251        }
252        Err(io::Error::new(
253            io::ErrorKind::Other,
254            format!("retry_transfer failed in {} retries", tries),
255        )
256        .into())
257    }
258
259    pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
260        self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
261    }
262
263    pub fn poll_get_balance_with_commitment(
264        &self,
265        pubkey: &Pubkey,
266        commitment_config: CommitmentConfig,
267    ) -> TransportResult<u64> {
268        self.rpc_client()
269            .poll_get_balance_with_commitment(pubkey, commitment_config)
270            .map_err(|e| e.into())
271    }
272
273    pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
274        self.rpc_client().wait_for_balance_with_commitment(
275            pubkey,
276            expected_balance,
277            CommitmentConfig::default(),
278        )
279    }
280
281    pub fn get_program_accounts_with_config(
282        &self,
283        pubkey: &Pubkey,
284        config: RpcProgramAccountsConfig,
285    ) -> TransportResult<Vec<(Pubkey, Account)>> {
286        self.rpc_client()
287            .get_program_accounts_with_config(pubkey, config)
288            .map_err(|e| e.into())
289    }
290
291    pub fn wait_for_balance_with_commitment(
292        &self,
293        pubkey: &Pubkey,
294        expected_balance: Option<u64>,
295        commitment_config: CommitmentConfig,
296    ) -> Option<u64> {
297        self.rpc_client().wait_for_balance_with_commitment(
298            pubkey,
299            expected_balance,
300            commitment_config,
301        )
302    }
303
304    pub fn poll_for_signature_with_commitment(
305        &self,
306        signature: &Signature,
307        commitment_config: CommitmentConfig,
308    ) -> TransportResult<()> {
309        self.rpc_client()
310            .poll_for_signature_with_commitment(signature, commitment_config)
311            .map_err(|e| e.into())
312    }
313
314    pub fn get_num_blocks_since_signature_confirmation(
315        &mut self,
316        sig: &Signature,
317    ) -> TransportResult<usize> {
318        self.rpc_client()
319            .get_num_blocks_since_signature_confirmation(sig)
320            .map_err(|e| e.into())
321    }
322}
323
324impl Client for ThinClient {
325    fn tpu_addr(&self) -> String {
326        self.tpu_addr().to_string()
327    }
328}
329
330impl SyncClient for ThinClient {
331    fn send_and_confirm_message<T: Signers>(
332        &self,
333        keypairs: &T,
334        message: Message,
335    ) -> TransportResult<Signature> {
336        let blockhash = self.get_latest_blockhash()?;
337        let mut transaction = Transaction::new(keypairs, message, blockhash);
338        let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
339        Ok(signature)
340    }
341
342    fn send_and_confirm_instruction(
343        &self,
344        keypair: &Keypair,
345        instruction: Instruction,
346    ) -> TransportResult<Signature> {
347        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
348        self.send_and_confirm_message(&[keypair], message)
349    }
350
351    fn transfer_and_confirm(
352        &self,
353        carats: u64,
354        keypair: &Keypair,
355        pubkey: &Pubkey,
356    ) -> TransportResult<Signature> {
357        let transfer_instruction =
358            system_instruction::transfer(&keypair.pubkey(), pubkey, carats);
359        self.send_and_confirm_instruction(keypair, transfer_instruction)
360    }
361
362    fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
363        Ok(self.rpc_client().get_account_data(pubkey).ok())
364    }
365
366    fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
367        let account = self.rpc_client().get_account(pubkey);
368        match account {
369            Ok(value) => Ok(Some(value)),
370            Err(_) => Ok(None),
371        }
372    }
373
374    fn get_account_with_commitment(
375        &self,
376        pubkey: &Pubkey,
377        commitment_config: CommitmentConfig,
378    ) -> TransportResult<Option<Account>> {
379        self.rpc_client()
380            .get_account_with_commitment(pubkey, commitment_config)
381            .map_err(|e| e.into())
382            .map(|r| r.value)
383    }
384
385    fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
386        self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
387    }
388
389    fn get_balance_with_commitment(
390        &self,
391        pubkey: &Pubkey,
392        commitment_config: CommitmentConfig,
393    ) -> TransportResult<u64> {
394        self.rpc_client()
395            .get_balance_with_commitment(pubkey, commitment_config)
396            .map_err(|e| e.into())
397            .map(|r| r.value)
398    }
399
400    fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
401        self.rpc_client()
402            .get_minimum_balance_for_rent_exemption(data_len)
403            .map_err(|e| e.into())
404    }
405
406    fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> {
407        #[allow(deprecated)]
408        let (blockhash, fee_calculator, _last_valid_slot) =
409            self.get_recent_blockhash_with_commitment(CommitmentConfig::default())?;
410        Ok((blockhash, fee_calculator))
411    }
412
413    fn get_recent_blockhash_with_commitment(
414        &self,
415        commitment_config: CommitmentConfig,
416    ) -> TransportResult<(Hash, FeeCalculator, Slot)> {
417        let index = self.optimizer.experiment();
418        let now = Instant::now();
419        #[allow(deprecated)]
420        let recent_blockhash =
421            self.rpc_clients[index].get_recent_blockhash_with_commitment(commitment_config);
422        match recent_blockhash {
423            Ok(Response { value, .. }) => {
424                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
425                Ok((value.0, value.1, value.2))
426            }
427            Err(e) => {
428                self.optimizer.report(index, std::u64::MAX);
429                Err(e.into())
430            }
431        }
432    }
433
434    fn get_fee_calculator_for_blockhash(
435        &self,
436        blockhash: &Hash,
437    ) -> TransportResult<Option<FeeCalculator>> {
438        #[allow(deprecated)]
439        self.rpc_client()
440            .get_fee_calculator_for_blockhash(blockhash)
441            .map_err(|e| e.into())
442    }
443
444    fn get_fee_rate_governor(&self) -> TransportResult<FeeRateGovernor> {
445        #[allow(deprecated)]
446        self.rpc_client()
447            .get_fee_rate_governor()
448            .map_err(|e| e.into())
449            .map(|r| r.value)
450    }
451
452    fn get_signature_status(
453        &self,
454        signature: &Signature,
455    ) -> TransportResult<Option<transaction::Result<()>>> {
456        let status = self
457            .rpc_client()
458            .get_signature_status(signature)
459            .map_err(|err| {
460                io::Error::new(
461                    io::ErrorKind::Other,
462                    format!("send_transaction failed with error {:?}", err),
463                )
464            })?;
465        Ok(status)
466    }
467
468    fn get_signature_status_with_commitment(
469        &self,
470        signature: &Signature,
471        commitment_config: CommitmentConfig,
472    ) -> TransportResult<Option<transaction::Result<()>>> {
473        let status = self
474            .rpc_client()
475            .get_signature_status_with_commitment(signature, commitment_config)
476            .map_err(|err| {
477                io::Error::new(
478                    io::ErrorKind::Other,
479                    format!("send_transaction failed with error {:?}", err),
480                )
481            })?;
482        Ok(status)
483    }
484
485    fn get_slot(&self) -> TransportResult<u64> {
486        self.get_slot_with_commitment(CommitmentConfig::default())
487    }
488
489    fn get_slot_with_commitment(
490        &self,
491        commitment_config: CommitmentConfig,
492    ) -> TransportResult<u64> {
493        let slot = self
494            .rpc_client()
495            .get_slot_with_commitment(commitment_config)
496            .map_err(|err| {
497                io::Error::new(
498                    io::ErrorKind::Other,
499                    format!("send_transaction failed with error {:?}", err),
500                )
501            })?;
502        Ok(slot)
503    }
504
505    fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
506        self.rpc_client().get_epoch_info().map_err(|e| e.into())
507    }
508
509    fn get_transaction_count(&self) -> TransportResult<u64> {
510        let index = self.optimizer.experiment();
511        let now = Instant::now();
512        match self.rpc_client().get_transaction_count() {
513            Ok(transaction_count) => {
514                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
515                Ok(transaction_count)
516            }
517            Err(e) => {
518                self.optimizer.report(index, std::u64::MAX);
519                Err(e.into())
520            }
521        }
522    }
523
524    fn get_transaction_count_with_commitment(
525        &self,
526        commitment_config: CommitmentConfig,
527    ) -> TransportResult<u64> {
528        let index = self.optimizer.experiment();
529        let now = Instant::now();
530        match self
531            .rpc_client()
532            .get_transaction_count_with_commitment(commitment_config)
533        {
534            Ok(transaction_count) => {
535                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
536                Ok(transaction_count)
537            }
538            Err(e) => {
539                self.optimizer.report(index, std::u64::MAX);
540                Err(e.into())
541            }
542        }
543    }
544
545    /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
546    fn poll_for_signature_confirmation(
547        &self,
548        signature: &Signature,
549        min_confirmed_blocks: usize,
550    ) -> TransportResult<usize> {
551        self.rpc_client()
552            .poll_for_signature_confirmation(signature, min_confirmed_blocks)
553            .map_err(|e| e.into())
554    }
555
556    fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
557        self.rpc_client()
558            .poll_for_signature(signature)
559            .map_err(|e| e.into())
560    }
561
562    fn get_new_blockhash(&self, blockhash: &Hash) -> TransportResult<(Hash, FeeCalculator)> {
563        #[allow(deprecated)]
564        self.rpc_client()
565            .get_new_blockhash(blockhash)
566            .map_err(|e| e.into())
567    }
568
569    fn get_latest_blockhash(&self) -> TransportResult<Hash> {
570        let (blockhash, _) =
571            self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
572        Ok(blockhash)
573    }
574
575    fn get_latest_blockhash_with_commitment(
576        &self,
577        commitment_config: CommitmentConfig,
578    ) -> TransportResult<(Hash, u64)> {
579        let index = self.optimizer.experiment();
580        let now = Instant::now();
581        match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
582            Ok((blockhash, last_valid_block_height)) => {
583                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
584                Ok((blockhash, last_valid_block_height))
585            }
586            Err(e) => {
587                self.optimizer.report(index, std::u64::MAX);
588                Err(e.into())
589            }
590        }
591    }
592
593    fn is_blockhash_valid(
594        &self,
595        blockhash: &Hash,
596        commitment_config: CommitmentConfig,
597    ) -> TransportResult<bool> {
598        self.rpc_client()
599            .is_blockhash_valid(blockhash, commitment_config)
600            .map_err(|e| e.into())
601    }
602
603    fn get_fee_for_message(&self, blockhash: &Hash, message: &Message) -> TransportResult<u64> {
604        self.rpc_client()
605            .get_fee_for_message(blockhash, message)
606            .map_err(|e| e.into())
607    }
608
609    fn get_new_latest_blockhash(&self, blockhash: &Hash) -> TransportResult<Hash> {
610        self.rpc_client()
611            .get_new_latest_blockhash(blockhash)
612            .map_err(|e| e.into())
613    }
614}
615
616impl AsyncClient for ThinClient {
617    fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
618        let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
619        let mut wr = std::io::Cursor::new(&mut buf[..]);
620        serialize_into(&mut wr, &transaction)
621            .expect("serialize Transaction in pub fn transfer_signed");
622        assert!(buf.len() < PACKET_DATA_SIZE);
623        self.transactions_socket
624            .send_to(&buf[..], &self.tpu_addr())?;
625        Ok(transaction.signatures[0])
626    }
627    fn async_send_message<T: Signers>(
628        &self,
629        keypairs: &T,
630        message: Message,
631        recent_blockhash: Hash,
632    ) -> TransportResult<Signature> {
633        let transaction = Transaction::new(keypairs, message, recent_blockhash);
634        self.async_send_transaction(transaction)
635    }
636    fn async_send_instruction(
637        &self,
638        keypair: &Keypair,
639        instruction: Instruction,
640        recent_blockhash: Hash,
641    ) -> TransportResult<Signature> {
642        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
643        self.async_send_message(&[keypair], message, recent_blockhash)
644    }
645    fn async_transfer(
646        &self,
647        carats: u64,
648        keypair: &Keypair,
649        pubkey: &Pubkey,
650        recent_blockhash: Hash,
651    ) -> TransportResult<Signature> {
652        let transfer_instruction =
653            system_instruction::transfer(&keypair.pubkey(), pubkey, carats);
654        self.async_send_instruction(keypair, transfer_instruction, recent_blockhash)
655    }
656}
657
658pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16)) -> ThinClient {
659    let (_, transactions_socket) =
660        gemachain_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
661    ThinClient::new(rpc, tpu, transactions_socket)
662}
663
664pub fn create_client_with_timeout(
665    (rpc, tpu): (SocketAddr, SocketAddr),
666    range: (u16, u16),
667    timeout: Duration,
668) -> ThinClient {
669    let (_, transactions_socket) =
670        gemachain_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
671    ThinClient::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout)
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677    use rayon::prelude::*;
678
679    #[test]
680    fn test_client_optimizer() {
681        gemachain_logger::setup();
682
683        const NUM_CLIENTS: usize = 5;
684        let optimizer = ClientOptimizer::new(NUM_CLIENTS);
685        (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
686            let index = optimizer.experiment();
687            optimizer.report(index, (NUM_CLIENTS - index) as u64);
688        });
689
690        let index = optimizer.experiment();
691        optimizer.report(index, 50);
692        assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
693
694        optimizer.report(optimizer.best(), std::u64::MAX);
695        assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
696    }
697}