1use {
7 crate::{rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response},
8 bincode::{serialize_into, serialized_size},
9 log::*,
10 gemachain_sdk::{
11 account::Account,
12 client::{AsyncClient, Client, SyncClient},
13 clock::{Slot, MAX_PROCESSING_AGE},
14 commitment_config::CommitmentConfig,
15 epoch_info::EpochInfo,
16 fee_calculator::{FeeCalculator, FeeRateGovernor},
17 hash::Hash,
18 instruction::Instruction,
19 message::Message,
20 packet::PACKET_DATA_SIZE,
21 pubkey::Pubkey,
22 signature::{Keypair, Signature, Signer},
23 signers::Signers,
24 system_instruction,
25 timing::duration_as_ms,
26 transaction::{self, Transaction},
27 transport::Result as TransportResult,
28 },
29 std::{
30 io,
31 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
32 sync::{
33 atomic::{AtomicBool, AtomicUsize, Ordering},
34 RwLock,
35 },
36 time::{Duration, Instant},
37 },
38};
39
40struct ClientOptimizer {
41 cur_index: AtomicUsize,
42 experiment_index: AtomicUsize,
43 experiment_done: AtomicBool,
44 times: RwLock<Vec<u64>>,
45 num_clients: usize,
46}
47
48fn min_index(array: &[u64]) -> (u64, usize) {
49 let mut min_time = std::u64::MAX;
50 let mut min_index = 0;
51 for (i, time) in array.iter().enumerate() {
52 if *time < min_time {
53 min_time = *time;
54 min_index = i;
55 }
56 }
57 (min_time, min_index)
58}
59
60impl ClientOptimizer {
61 fn new(num_clients: usize) -> Self {
62 Self {
63 cur_index: AtomicUsize::new(0),
64 experiment_index: AtomicUsize::new(0),
65 experiment_done: AtomicBool::new(false),
66 times: RwLock::new(vec![std::u64::MAX; num_clients]),
67 num_clients,
68 }
69 }
70
71 fn experiment(&self) -> usize {
72 if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
73 let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
74 if old < self.num_clients {
75 old
76 } else {
77 self.best()
78 }
79 } else {
80 self.best()
81 }
82 }
83
84 fn report(&self, index: usize, time_ms: u64) {
85 if self.num_clients > 1
86 && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
87 {
88 trace!(
89 "report {} with {} exp: {}",
90 index,
91 time_ms,
92 self.experiment_index.load(Ordering::Relaxed)
93 );
94
95 self.times.write().unwrap()[index] = time_ms;
96
97 if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
98 let times = self.times.read().unwrap();
99 let (min_time, min_index) = min_index(×);
100 trace!(
101 "done experimenting min: {} time: {} times: {:?}",
102 min_index,
103 min_time,
104 times
105 );
106
107 self.cur_index.store(min_index, Ordering::Relaxed);
109 self.experiment_done.store(true, Ordering::Relaxed);
110 }
111 }
112 }
113
114 fn best(&self) -> usize {
115 self.cur_index.load(Ordering::Relaxed)
116 }
117}
118
119pub struct ThinClient {
121 transactions_socket: UdpSocket,
122 tpu_addrs: Vec<SocketAddr>,
123 rpc_clients: Vec<RpcClient>,
124 optimizer: ClientOptimizer,
125}
126
127impl ThinClient {
128 pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self {
131 Self::new_from_client(
132 tpu_addr,
133 transactions_socket,
134 RpcClient::new_socket(rpc_addr),
135 )
136 }
137
138 pub fn new_socket_with_timeout(
139 rpc_addr: SocketAddr,
140 tpu_addr: SocketAddr,
141 transactions_socket: UdpSocket,
142 timeout: Duration,
143 ) -> Self {
144 let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
145 Self::new_from_client(tpu_addr, transactions_socket, rpc_client)
146 }
147
148 fn new_from_client(
149 tpu_addr: SocketAddr,
150 transactions_socket: UdpSocket,
151 rpc_client: RpcClient,
152 ) -> Self {
153 Self {
154 transactions_socket,
155 tpu_addrs: vec![tpu_addr],
156 rpc_clients: vec![rpc_client],
157 optimizer: ClientOptimizer::new(0),
158 }
159 }
160
161 pub fn new_from_addrs(
162 rpc_addrs: Vec<SocketAddr>,
163 tpu_addrs: Vec<SocketAddr>,
164 transactions_socket: UdpSocket,
165 ) -> Self {
166 assert!(!rpc_addrs.is_empty());
167 assert_eq!(rpc_addrs.len(), tpu_addrs.len());
168
169 let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
170 let optimizer = ClientOptimizer::new(rpc_clients.len());
171 Self {
172 transactions_socket,
173 tpu_addrs,
174 rpc_clients,
175 optimizer,
176 }
177 }
178
179 fn tpu_addr(&self) -> &SocketAddr {
180 &self.tpu_addrs[self.optimizer.best()]
181 }
182
183 fn rpc_client(&self) -> &RpcClient {
184 &self.rpc_clients[self.optimizer.best()]
185 }
186
187 pub fn retry_transfer_until_confirmed(
189 &self,
190 keypair: &Keypair,
191 transaction: &mut Transaction,
192 tries: usize,
193 min_confirmed_blocks: usize,
194 ) -> TransportResult<Signature> {
195 self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
196 }
197
198 pub fn retry_transfer(
200 &self,
201 keypair: &Keypair,
202 transaction: &mut Transaction,
203 tries: usize,
204 ) -> TransportResult<Signature> {
205 self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
206 }
207
208 pub fn send_and_confirm_transaction<T: Signers>(
210 &self,
211 keypairs: &T,
212 transaction: &mut Transaction,
213 tries: usize,
214 pending_confirmations: usize,
215 ) -> TransportResult<Signature> {
216 for x in 0..tries {
217 let now = Instant::now();
218 let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
219 let mut wr = std::io::Cursor::new(&mut buf[..]);
220 let mut num_confirmed = 0;
221 let mut wait_time = MAX_PROCESSING_AGE;
222 serialize_into(&mut wr, &transaction)
223 .expect("serialize Transaction in pub fn transfer_signed");
224 while now.elapsed().as_secs() < wait_time as u64 {
226 if num_confirmed == 0 {
227 self.transactions_socket
229 .send_to(&buf[..], &self.tpu_addr())?;
230 }
231
232 if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
233 &transaction.signatures[0],
234 pending_confirmations,
235 ) {
236 num_confirmed = confirmed_blocks;
237 if confirmed_blocks >= pending_confirmations {
238 return Ok(transaction.signatures[0]);
239 }
240 wait_time = wait_time.max(
244 MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
245 );
246 }
247 }
248 info!("{} tries failed transfer to {}", x, self.tpu_addr());
249 let blockhash = self.get_latest_blockhash()?;
250 transaction.sign(keypairs, blockhash);
251 }
252 Err(io::Error::new(
253 io::ErrorKind::Other,
254 format!("retry_transfer failed in {} retries", tries),
255 )
256 .into())
257 }
258
259 pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
260 self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
261 }
262
263 pub fn poll_get_balance_with_commitment(
264 &self,
265 pubkey: &Pubkey,
266 commitment_config: CommitmentConfig,
267 ) -> TransportResult<u64> {
268 self.rpc_client()
269 .poll_get_balance_with_commitment(pubkey, commitment_config)
270 .map_err(|e| e.into())
271 }
272
273 pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
274 self.rpc_client().wait_for_balance_with_commitment(
275 pubkey,
276 expected_balance,
277 CommitmentConfig::default(),
278 )
279 }
280
281 pub fn get_program_accounts_with_config(
282 &self,
283 pubkey: &Pubkey,
284 config: RpcProgramAccountsConfig,
285 ) -> TransportResult<Vec<(Pubkey, Account)>> {
286 self.rpc_client()
287 .get_program_accounts_with_config(pubkey, config)
288 .map_err(|e| e.into())
289 }
290
291 pub fn wait_for_balance_with_commitment(
292 &self,
293 pubkey: &Pubkey,
294 expected_balance: Option<u64>,
295 commitment_config: CommitmentConfig,
296 ) -> Option<u64> {
297 self.rpc_client().wait_for_balance_with_commitment(
298 pubkey,
299 expected_balance,
300 commitment_config,
301 )
302 }
303
304 pub fn poll_for_signature_with_commitment(
305 &self,
306 signature: &Signature,
307 commitment_config: CommitmentConfig,
308 ) -> TransportResult<()> {
309 self.rpc_client()
310 .poll_for_signature_with_commitment(signature, commitment_config)
311 .map_err(|e| e.into())
312 }
313
314 pub fn get_num_blocks_since_signature_confirmation(
315 &mut self,
316 sig: &Signature,
317 ) -> TransportResult<usize> {
318 self.rpc_client()
319 .get_num_blocks_since_signature_confirmation(sig)
320 .map_err(|e| e.into())
321 }
322}
323
324impl Client for ThinClient {
325 fn tpu_addr(&self) -> String {
326 self.tpu_addr().to_string()
327 }
328}
329
330impl SyncClient for ThinClient {
331 fn send_and_confirm_message<T: Signers>(
332 &self,
333 keypairs: &T,
334 message: Message,
335 ) -> TransportResult<Signature> {
336 let blockhash = self.get_latest_blockhash()?;
337 let mut transaction = Transaction::new(keypairs, message, blockhash);
338 let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
339 Ok(signature)
340 }
341
342 fn send_and_confirm_instruction(
343 &self,
344 keypair: &Keypair,
345 instruction: Instruction,
346 ) -> TransportResult<Signature> {
347 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
348 self.send_and_confirm_message(&[keypair], message)
349 }
350
351 fn transfer_and_confirm(
352 &self,
353 carats: u64,
354 keypair: &Keypair,
355 pubkey: &Pubkey,
356 ) -> TransportResult<Signature> {
357 let transfer_instruction =
358 system_instruction::transfer(&keypair.pubkey(), pubkey, carats);
359 self.send_and_confirm_instruction(keypair, transfer_instruction)
360 }
361
362 fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
363 Ok(self.rpc_client().get_account_data(pubkey).ok())
364 }
365
366 fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
367 let account = self.rpc_client().get_account(pubkey);
368 match account {
369 Ok(value) => Ok(Some(value)),
370 Err(_) => Ok(None),
371 }
372 }
373
374 fn get_account_with_commitment(
375 &self,
376 pubkey: &Pubkey,
377 commitment_config: CommitmentConfig,
378 ) -> TransportResult<Option<Account>> {
379 self.rpc_client()
380 .get_account_with_commitment(pubkey, commitment_config)
381 .map_err(|e| e.into())
382 .map(|r| r.value)
383 }
384
385 fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
386 self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
387 }
388
389 fn get_balance_with_commitment(
390 &self,
391 pubkey: &Pubkey,
392 commitment_config: CommitmentConfig,
393 ) -> TransportResult<u64> {
394 self.rpc_client()
395 .get_balance_with_commitment(pubkey, commitment_config)
396 .map_err(|e| e.into())
397 .map(|r| r.value)
398 }
399
400 fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
401 self.rpc_client()
402 .get_minimum_balance_for_rent_exemption(data_len)
403 .map_err(|e| e.into())
404 }
405
406 fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> {
407 #[allow(deprecated)]
408 let (blockhash, fee_calculator, _last_valid_slot) =
409 self.get_recent_blockhash_with_commitment(CommitmentConfig::default())?;
410 Ok((blockhash, fee_calculator))
411 }
412
413 fn get_recent_blockhash_with_commitment(
414 &self,
415 commitment_config: CommitmentConfig,
416 ) -> TransportResult<(Hash, FeeCalculator, Slot)> {
417 let index = self.optimizer.experiment();
418 let now = Instant::now();
419 #[allow(deprecated)]
420 let recent_blockhash =
421 self.rpc_clients[index].get_recent_blockhash_with_commitment(commitment_config);
422 match recent_blockhash {
423 Ok(Response { value, .. }) => {
424 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
425 Ok((value.0, value.1, value.2))
426 }
427 Err(e) => {
428 self.optimizer.report(index, std::u64::MAX);
429 Err(e.into())
430 }
431 }
432 }
433
434 fn get_fee_calculator_for_blockhash(
435 &self,
436 blockhash: &Hash,
437 ) -> TransportResult<Option<FeeCalculator>> {
438 #[allow(deprecated)]
439 self.rpc_client()
440 .get_fee_calculator_for_blockhash(blockhash)
441 .map_err(|e| e.into())
442 }
443
444 fn get_fee_rate_governor(&self) -> TransportResult<FeeRateGovernor> {
445 #[allow(deprecated)]
446 self.rpc_client()
447 .get_fee_rate_governor()
448 .map_err(|e| e.into())
449 .map(|r| r.value)
450 }
451
452 fn get_signature_status(
453 &self,
454 signature: &Signature,
455 ) -> TransportResult<Option<transaction::Result<()>>> {
456 let status = self
457 .rpc_client()
458 .get_signature_status(signature)
459 .map_err(|err| {
460 io::Error::new(
461 io::ErrorKind::Other,
462 format!("send_transaction failed with error {:?}", err),
463 )
464 })?;
465 Ok(status)
466 }
467
468 fn get_signature_status_with_commitment(
469 &self,
470 signature: &Signature,
471 commitment_config: CommitmentConfig,
472 ) -> TransportResult<Option<transaction::Result<()>>> {
473 let status = self
474 .rpc_client()
475 .get_signature_status_with_commitment(signature, commitment_config)
476 .map_err(|err| {
477 io::Error::new(
478 io::ErrorKind::Other,
479 format!("send_transaction failed with error {:?}", err),
480 )
481 })?;
482 Ok(status)
483 }
484
485 fn get_slot(&self) -> TransportResult<u64> {
486 self.get_slot_with_commitment(CommitmentConfig::default())
487 }
488
489 fn get_slot_with_commitment(
490 &self,
491 commitment_config: CommitmentConfig,
492 ) -> TransportResult<u64> {
493 let slot = self
494 .rpc_client()
495 .get_slot_with_commitment(commitment_config)
496 .map_err(|err| {
497 io::Error::new(
498 io::ErrorKind::Other,
499 format!("send_transaction failed with error {:?}", err),
500 )
501 })?;
502 Ok(slot)
503 }
504
505 fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
506 self.rpc_client().get_epoch_info().map_err(|e| e.into())
507 }
508
509 fn get_transaction_count(&self) -> TransportResult<u64> {
510 let index = self.optimizer.experiment();
511 let now = Instant::now();
512 match self.rpc_client().get_transaction_count() {
513 Ok(transaction_count) => {
514 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
515 Ok(transaction_count)
516 }
517 Err(e) => {
518 self.optimizer.report(index, std::u64::MAX);
519 Err(e.into())
520 }
521 }
522 }
523
524 fn get_transaction_count_with_commitment(
525 &self,
526 commitment_config: CommitmentConfig,
527 ) -> TransportResult<u64> {
528 let index = self.optimizer.experiment();
529 let now = Instant::now();
530 match self
531 .rpc_client()
532 .get_transaction_count_with_commitment(commitment_config)
533 {
534 Ok(transaction_count) => {
535 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
536 Ok(transaction_count)
537 }
538 Err(e) => {
539 self.optimizer.report(index, std::u64::MAX);
540 Err(e.into())
541 }
542 }
543 }
544
545 fn poll_for_signature_confirmation(
547 &self,
548 signature: &Signature,
549 min_confirmed_blocks: usize,
550 ) -> TransportResult<usize> {
551 self.rpc_client()
552 .poll_for_signature_confirmation(signature, min_confirmed_blocks)
553 .map_err(|e| e.into())
554 }
555
556 fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
557 self.rpc_client()
558 .poll_for_signature(signature)
559 .map_err(|e| e.into())
560 }
561
562 fn get_new_blockhash(&self, blockhash: &Hash) -> TransportResult<(Hash, FeeCalculator)> {
563 #[allow(deprecated)]
564 self.rpc_client()
565 .get_new_blockhash(blockhash)
566 .map_err(|e| e.into())
567 }
568
569 fn get_latest_blockhash(&self) -> TransportResult<Hash> {
570 let (blockhash, _) =
571 self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
572 Ok(blockhash)
573 }
574
575 fn get_latest_blockhash_with_commitment(
576 &self,
577 commitment_config: CommitmentConfig,
578 ) -> TransportResult<(Hash, u64)> {
579 let index = self.optimizer.experiment();
580 let now = Instant::now();
581 match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
582 Ok((blockhash, last_valid_block_height)) => {
583 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
584 Ok((blockhash, last_valid_block_height))
585 }
586 Err(e) => {
587 self.optimizer.report(index, std::u64::MAX);
588 Err(e.into())
589 }
590 }
591 }
592
593 fn is_blockhash_valid(
594 &self,
595 blockhash: &Hash,
596 commitment_config: CommitmentConfig,
597 ) -> TransportResult<bool> {
598 self.rpc_client()
599 .is_blockhash_valid(blockhash, commitment_config)
600 .map_err(|e| e.into())
601 }
602
603 fn get_fee_for_message(&self, blockhash: &Hash, message: &Message) -> TransportResult<u64> {
604 self.rpc_client()
605 .get_fee_for_message(blockhash, message)
606 .map_err(|e| e.into())
607 }
608
609 fn get_new_latest_blockhash(&self, blockhash: &Hash) -> TransportResult<Hash> {
610 self.rpc_client()
611 .get_new_latest_blockhash(blockhash)
612 .map_err(|e| e.into())
613 }
614}
615
616impl AsyncClient for ThinClient {
617 fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
618 let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
619 let mut wr = std::io::Cursor::new(&mut buf[..]);
620 serialize_into(&mut wr, &transaction)
621 .expect("serialize Transaction in pub fn transfer_signed");
622 assert!(buf.len() < PACKET_DATA_SIZE);
623 self.transactions_socket
624 .send_to(&buf[..], &self.tpu_addr())?;
625 Ok(transaction.signatures[0])
626 }
627 fn async_send_message<T: Signers>(
628 &self,
629 keypairs: &T,
630 message: Message,
631 recent_blockhash: Hash,
632 ) -> TransportResult<Signature> {
633 let transaction = Transaction::new(keypairs, message, recent_blockhash);
634 self.async_send_transaction(transaction)
635 }
636 fn async_send_instruction(
637 &self,
638 keypair: &Keypair,
639 instruction: Instruction,
640 recent_blockhash: Hash,
641 ) -> TransportResult<Signature> {
642 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
643 self.async_send_message(&[keypair], message, recent_blockhash)
644 }
645 fn async_transfer(
646 &self,
647 carats: u64,
648 keypair: &Keypair,
649 pubkey: &Pubkey,
650 recent_blockhash: Hash,
651 ) -> TransportResult<Signature> {
652 let transfer_instruction =
653 system_instruction::transfer(&keypair.pubkey(), pubkey, carats);
654 self.async_send_instruction(keypair, transfer_instruction, recent_blockhash)
655 }
656}
657
658pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16)) -> ThinClient {
659 let (_, transactions_socket) =
660 gemachain_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
661 ThinClient::new(rpc, tpu, transactions_socket)
662}
663
664pub fn create_client_with_timeout(
665 (rpc, tpu): (SocketAddr, SocketAddr),
666 range: (u16, u16),
667 timeout: Duration,
668) -> ThinClient {
669 let (_, transactions_socket) =
670 gemachain_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
671 ThinClient::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout)
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677 use rayon::prelude::*;
678
679 #[test]
680 fn test_client_optimizer() {
681 gemachain_logger::setup();
682
683 const NUM_CLIENTS: usize = 5;
684 let optimizer = ClientOptimizer::new(NUM_CLIENTS);
685 (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
686 let index = optimizer.experiment();
687 optimizer.report(index, (NUM_CLIENTS - index) as u64);
688 });
689
690 let index = optimizer.experiment();
691 optimizer.report(index, 50);
692 assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
693
694 optimizer.report(optimizer.best(), std::u64::MAX);
695 assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
696 }
697}