1use {
7 log::*,
8 rayon::iter::{IntoParallelIterator, ParallelIterator},
9 solana_account::Account,
10 solana_client_traits::{AsyncClient, Client, SyncClient},
11 solana_clock::MAX_PROCESSING_AGE,
12 solana_commitment_config::CommitmentConfig,
13 solana_connection_cache::{
14 client_connection::ClientConnection,
15 connection_cache::{
16 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
17 },
18 },
19 solana_epoch_info::EpochInfo,
20 solana_hash::Hash,
21 solana_instruction::Instruction,
22 solana_keypair::Keypair,
23 solana_message::Message,
24 solana_pubkey::Pubkey,
25 solana_rpc_client::rpc_client::RpcClient,
26 solana_rpc_client_api::config::RpcProgramAccountsConfig,
27 solana_signature::Signature,
28 solana_signer::{signers::Signers, Signer},
29 solana_system_interface::instruction::transfer,
30 solana_transaction::{versioned::VersionedTransaction, Transaction},
31 solana_transaction_error::{TransactionResult, TransportResult},
32 std::{
33 io,
34 net::SocketAddr,
35 sync::{
36 atomic::{AtomicBool, AtomicUsize, Ordering},
37 Arc, RwLock,
38 },
39 time::{Duration, Instant},
40 },
41};
42
43struct ClientOptimizer {
44 cur_index: AtomicUsize,
45 experiment_index: AtomicUsize,
46 experiment_done: AtomicBool,
47 times: RwLock<Vec<u64>>,
48 num_clients: usize,
49}
50
51impl ClientOptimizer {
52 fn new(num_clients: usize) -> Self {
53 Self {
54 cur_index: AtomicUsize::new(0),
55 experiment_index: AtomicUsize::new(0),
56 experiment_done: AtomicBool::new(false),
57 times: RwLock::new(vec![u64::MAX; num_clients]),
58 num_clients,
59 }
60 }
61
62 fn experiment(&self) -> usize {
63 if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
64 let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
65 if old < self.num_clients {
66 old
67 } else {
68 self.best()
69 }
70 } else {
71 self.best()
72 }
73 }
74
75 fn report(&self, index: usize, time_ms: u64) {
76 if self.num_clients > 1
77 && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
78 {
79 trace!(
80 "report {} with {} exp: {}",
81 index,
82 time_ms,
83 self.experiment_index.load(Ordering::Relaxed)
84 );
85
86 self.times.write().unwrap()[index] = time_ms;
87
88 if index == (self.num_clients - 1) || time_ms == u64::MAX {
89 let times = self.times.read().unwrap();
90 let (min_time, min_index) = min_index(×);
91 trace!(
92 "done experimenting min: {} time: {} times: {:?}",
93 min_index,
94 min_time,
95 times
96 );
97
98 self.cur_index.store(min_index, Ordering::Relaxed);
100 self.experiment_done.store(true, Ordering::Relaxed);
101 }
102 }
103 }
104
105 fn best(&self) -> usize {
106 self.cur_index.load(Ordering::Relaxed)
107 }
108}
109
110#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
112pub struct ThinClient<
113 P, M, C, > {
117 rpc_clients: Vec<RpcClient>,
118 tpu_addrs: Vec<SocketAddr>,
119 optimizer: ClientOptimizer,
120 connection_cache: Arc<ConnectionCache<P, M, C>>,
121}
122
123#[allow(deprecated)]
124impl<P, M, C> ThinClient<P, M, C>
125where
126 P: ConnectionPool<NewConnectionConfig = C>,
127 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
128 C: NewConnectionConfig,
129{
130 pub fn new(
134 rpc_addr: SocketAddr,
135 tpu_addr: SocketAddr,
136 connection_cache: Arc<ConnectionCache<P, M, C>>,
137 ) -> Self {
138 Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139 }
140
141 pub fn new_socket_with_timeout(
142 rpc_addr: SocketAddr,
143 tpu_addr: SocketAddr,
144 timeout: Duration,
145 connection_cache: Arc<ConnectionCache<P, M, C>>,
146 ) -> Self {
147 let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148 Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149 }
150
151 fn new_from_client(
152 rpc_client: RpcClient,
153 tpu_addr: SocketAddr,
154 connection_cache: Arc<ConnectionCache<P, M, C>>,
155 ) -> Self {
156 Self {
157 rpc_clients: vec![rpc_client],
158 tpu_addrs: vec![tpu_addr],
159 optimizer: ClientOptimizer::new(0),
160 connection_cache,
161 }
162 }
163
164 pub fn new_from_addrs(
165 rpc_addrs: Vec<SocketAddr>,
166 tpu_addrs: Vec<SocketAddr>,
167 connection_cache: Arc<ConnectionCache<P, M, C>>,
168 ) -> Self {
169 assert!(!rpc_addrs.is_empty());
170 assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172 let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173 let optimizer = ClientOptimizer::new(rpc_clients.len());
174 Self {
175 rpc_clients,
176 tpu_addrs,
177 optimizer,
178 connection_cache,
179 }
180 }
181
182 fn tpu_addr(&self) -> &SocketAddr {
183 &self.tpu_addrs[self.optimizer.best()]
184 }
185
186 pub fn rpc_client(&self) -> &RpcClient {
187 &self.rpc_clients[self.optimizer.best()]
188 }
189
190 pub fn retry_transfer_until_confirmed(
192 &self,
193 keypair: &Keypair,
194 transaction: &mut Transaction,
195 tries: usize,
196 min_confirmed_blocks: usize,
197 ) -> TransportResult<Signature> {
198 self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199 }
200
201 pub fn retry_transfer(
203 &self,
204 keypair: &Keypair,
205 transaction: &mut Transaction,
206 tries: usize,
207 ) -> TransportResult<Signature> {
208 self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209 }
210
211 pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
212 &self,
213 keypairs: &T,
214 transaction: &mut Transaction,
215 tries: usize,
216 pending_confirmations: usize,
217 ) -> TransportResult<Signature> {
218 for x in 0..tries {
219 let now = Instant::now();
220 let mut num_confirmed = 0;
221 let mut wait_time = MAX_PROCESSING_AGE;
222 let wire_transaction =
224 bincode::serialize(&transaction).expect("transaction serialization failed");
225 while now.elapsed().as_secs() < wait_time as u64 {
226 if num_confirmed == 0 {
227 let conn = self.connection_cache.get_connection(self.tpu_addr());
228 #[allow(clippy::needless_borrow)]
230 conn.send_data(&wire_transaction)?;
231 }
232
233 if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
234 &transaction.signatures[0],
235 pending_confirmations,
236 ) {
237 num_confirmed = confirmed_blocks;
238 if confirmed_blocks >= pending_confirmations {
239 return Ok(transaction.signatures[0]);
240 }
241 wait_time = wait_time.max(
245 MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
246 );
247 }
248 }
249 info!("{} tries failed transfer to {}", x, self.tpu_addr());
250 let blockhash = self.get_latest_blockhash()?;
251 transaction.sign(keypairs, blockhash);
252 }
253 Err(io::Error::other(format!("retry_transfer failed in {tries} retries")).into())
254 }
255
256 pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
257 self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
258 }
259
260 pub fn poll_get_balance_with_commitment(
261 &self,
262 pubkey: &Pubkey,
263 commitment_config: CommitmentConfig,
264 ) -> TransportResult<u64> {
265 self.rpc_client()
266 .poll_get_balance_with_commitment(pubkey, commitment_config)
267 .map_err(|e| e.into())
268 }
269
270 pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
271 self.rpc_client().wait_for_balance_with_commitment(
272 pubkey,
273 expected_balance,
274 CommitmentConfig::default(),
275 )
276 }
277
278 pub fn get_program_accounts_with_config(
279 &self,
280 pubkey: &Pubkey,
281 config: RpcProgramAccountsConfig,
282 ) -> TransportResult<Vec<(Pubkey, Account)>> {
283 self.rpc_client()
284 .get_program_accounts_with_config(pubkey, config)
285 .map_err(|e| e.into())
286 }
287
288 pub fn wait_for_balance_with_commitment(
289 &self,
290 pubkey: &Pubkey,
291 expected_balance: Option<u64>,
292 commitment_config: CommitmentConfig,
293 ) -> Option<u64> {
294 self.rpc_client().wait_for_balance_with_commitment(
295 pubkey,
296 expected_balance,
297 commitment_config,
298 )
299 }
300
301 pub fn poll_for_signature_with_commitment(
302 &self,
303 signature: &Signature,
304 commitment_config: CommitmentConfig,
305 ) -> TransportResult<()> {
306 self.rpc_client()
307 .poll_for_signature_with_commitment(signature, commitment_config)
308 .map_err(|e| e.into())
309 }
310
311 pub fn get_num_blocks_since_signature_confirmation(
312 &mut self,
313 sig: &Signature,
314 ) -> TransportResult<usize> {
315 self.rpc_client()
316 .get_num_blocks_since_signature_confirmation(sig)
317 .map_err(|e| e.into())
318 }
319}
320
321#[allow(deprecated)]
322impl<P, M, C> Client for ThinClient<P, M, C>
323where
324 P: ConnectionPool<NewConnectionConfig = C>,
325 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
326 C: NewConnectionConfig,
327{
328 fn tpu_addr(&self) -> String {
329 self.tpu_addr().to_string()
330 }
331}
332
333#[allow(deprecated)]
334impl<P, M, C> SyncClient for ThinClient<P, M, C>
335where
336 P: ConnectionPool<NewConnectionConfig = C>,
337 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
338 C: NewConnectionConfig,
339{
340 fn send_and_confirm_message<T: Signers + ?Sized>(
341 &self,
342 keypairs: &T,
343 message: Message,
344 ) -> TransportResult<Signature> {
345 let blockhash = self.get_latest_blockhash()?;
346 let mut transaction = Transaction::new(keypairs, message, blockhash);
347 let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
348 Ok(signature)
349 }
350
351 fn send_and_confirm_instruction(
352 &self,
353 keypair: &Keypair,
354 instruction: Instruction,
355 ) -> TransportResult<Signature> {
356 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
357 self.send_and_confirm_message(&[keypair], message)
358 }
359
360 fn transfer_and_confirm(
361 &self,
362 lamports: u64,
363 keypair: &Keypair,
364 pubkey: &Pubkey,
365 ) -> TransportResult<Signature> {
366 let transfer_instruction = transfer(&keypair.pubkey(), pubkey, lamports);
367 self.send_and_confirm_instruction(keypair, transfer_instruction)
368 }
369
370 fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
371 Ok(self.rpc_client().get_account_data(pubkey).ok())
372 }
373
374 fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
375 let account = self.rpc_client().get_account(pubkey);
376 match account {
377 Ok(value) => Ok(Some(value)),
378 Err(_) => Ok(None),
379 }
380 }
381
382 fn get_account_with_commitment(
383 &self,
384 pubkey: &Pubkey,
385 commitment_config: CommitmentConfig,
386 ) -> TransportResult<Option<Account>> {
387 self.rpc_client()
388 .get_account_with_commitment(pubkey, commitment_config)
389 .map_err(|e| e.into())
390 .map(|r| r.value)
391 }
392
393 fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
394 self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
395 }
396
397 fn get_balance_with_commitment(
398 &self,
399 pubkey: &Pubkey,
400 commitment_config: CommitmentConfig,
401 ) -> TransportResult<u64> {
402 self.rpc_client()
403 .get_balance_with_commitment(pubkey, commitment_config)
404 .map_err(|e| e.into())
405 .map(|r| r.value)
406 }
407
408 fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
409 self.rpc_client()
410 .get_minimum_balance_for_rent_exemption(data_len)
411 .map_err(|e| e.into())
412 }
413
414 fn get_signature_status(
415 &self,
416 signature: &Signature,
417 ) -> TransportResult<Option<TransactionResult<()>>> {
418 let status = self
419 .rpc_client()
420 .get_signature_status(signature)
421 .map_err(|err| {
422 io::Error::other(format!("send_transaction failed with error {err:?}"))
423 })?;
424 Ok(status)
425 }
426
427 fn get_signature_status_with_commitment(
428 &self,
429 signature: &Signature,
430 commitment_config: CommitmentConfig,
431 ) -> TransportResult<Option<TransactionResult<()>>> {
432 let status = self
433 .rpc_client()
434 .get_signature_status_with_commitment(signature, commitment_config)
435 .map_err(|err| {
436 io::Error::other(format!("send_transaction failed with error {err:?}"))
437 })?;
438 Ok(status)
439 }
440
441 fn get_slot(&self) -> TransportResult<u64> {
442 self.get_slot_with_commitment(CommitmentConfig::default())
443 }
444
445 fn get_slot_with_commitment(
446 &self,
447 commitment_config: CommitmentConfig,
448 ) -> TransportResult<u64> {
449 let slot = self
450 .rpc_client()
451 .get_slot_with_commitment(commitment_config)
452 .map_err(|err| {
453 io::Error::other(format!("send_transaction failed with error {err:?}"))
454 })?;
455 Ok(slot)
456 }
457
458 fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
459 self.rpc_client().get_epoch_info().map_err(|e| e.into())
460 }
461
462 fn get_transaction_count(&self) -> TransportResult<u64> {
463 let index = self.optimizer.experiment();
464 let now = Instant::now();
465 match self.rpc_client().get_transaction_count() {
466 Ok(transaction_count) => {
467 self.optimizer
468 .report(index, now.elapsed().as_millis() as u64);
469 Ok(transaction_count)
470 }
471 Err(e) => {
472 self.optimizer.report(index, u64::MAX);
473 Err(e.into())
474 }
475 }
476 }
477
478 fn get_transaction_count_with_commitment(
479 &self,
480 commitment_config: CommitmentConfig,
481 ) -> TransportResult<u64> {
482 let index = self.optimizer.experiment();
483 let now = Instant::now();
484 match self
485 .rpc_client()
486 .get_transaction_count_with_commitment(commitment_config)
487 {
488 Ok(transaction_count) => {
489 self.optimizer
490 .report(index, now.elapsed().as_millis() as u64);
491 Ok(transaction_count)
492 }
493 Err(e) => {
494 self.optimizer.report(index, u64::MAX);
495 Err(e.into())
496 }
497 }
498 }
499
500 fn poll_for_signature_confirmation(
502 &self,
503 signature: &Signature,
504 min_confirmed_blocks: usize,
505 ) -> TransportResult<usize> {
506 self.rpc_client()
507 .poll_for_signature_confirmation(signature, min_confirmed_blocks)
508 .map_err(|e| e.into())
509 }
510
511 fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
512 self.rpc_client()
513 .poll_for_signature(signature)
514 .map_err(|e| e.into())
515 }
516
517 fn get_latest_blockhash(&self) -> TransportResult<Hash> {
518 let (blockhash, _) =
519 self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
520 Ok(blockhash)
521 }
522
523 fn get_latest_blockhash_with_commitment(
524 &self,
525 commitment_config: CommitmentConfig,
526 ) -> TransportResult<(Hash, u64)> {
527 let index = self.optimizer.experiment();
528 let now = Instant::now();
529 match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
530 Ok((blockhash, last_valid_block_height)) => {
531 self.optimizer
532 .report(index, now.elapsed().as_millis() as u64);
533 Ok((blockhash, last_valid_block_height))
534 }
535 Err(e) => {
536 self.optimizer.report(index, u64::MAX);
537 Err(e.into())
538 }
539 }
540 }
541
542 fn is_blockhash_valid(
543 &self,
544 blockhash: &Hash,
545 commitment_config: CommitmentConfig,
546 ) -> TransportResult<bool> {
547 self.rpc_client()
548 .is_blockhash_valid(blockhash, commitment_config)
549 .map_err(|e| e.into())
550 }
551
552 fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
553 self.rpc_client()
554 .get_fee_for_message(message)
555 .map_err(|e| e.into())
556 }
557}
558
559#[allow(deprecated)]
560impl<P, M, C> AsyncClient for ThinClient<P, M, C>
561where
562 P: ConnectionPool<NewConnectionConfig = C>,
563 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
564 C: NewConnectionConfig,
565{
566 fn async_send_versioned_transaction(
567 &self,
568 transaction: VersionedTransaction,
569 ) -> TransportResult<Signature> {
570 let conn = self.connection_cache.get_connection(self.tpu_addr());
571 let wire_transaction =
572 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
573 conn.send_data(&wire_transaction)?;
574 Ok(transaction.signatures[0])
575 }
576
577 fn async_send_versioned_transaction_batch(
578 &self,
579 batch: Vec<VersionedTransaction>,
580 ) -> TransportResult<()> {
581 let conn = self.connection_cache.get_connection(self.tpu_addr());
582 let buffers = batch
583 .into_par_iter()
584 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
585 .collect::<Vec<_>>();
586 conn.send_data_batch(&buffers)?;
587 Ok(())
588 }
589}
590
591fn min_index(array: &[u64]) -> (u64, usize) {
592 let mut min_time = u64::MAX;
593 let mut min_index = 0;
594 for (i, time) in array.iter().enumerate() {
595 if *time < min_time {
596 min_time = *time;
597 min_index = i;
598 }
599 }
600 (min_time, min_index)
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606
607 #[test]
608 fn test_client_optimizer() {
609 solana_logger::setup();
610
611 const NUM_CLIENTS: usize = 5;
612 let optimizer = ClientOptimizer::new(NUM_CLIENTS);
613 (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
614 let index = optimizer.experiment();
615 optimizer.report(index, (NUM_CLIENTS - index) as u64);
616 });
617
618 let index = optimizer.experiment();
619 optimizer.report(index, 50);
620 assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
621
622 optimizer.report(optimizer.best(), u64::MAX);
623 assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
624 }
625}