1use {
2 crate::{
3 nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4 rpc_client::RpcClient as BlockingRpcClient,
5 },
6 bincode::serialize,
7 dashmap::DashMap,
8 futures_util::future::join_all,
9 solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
10 solana_rpc_client::spinner::{self, SendTransactionProgress},
11 solana_rpc_client_api::{
12 client_error::ErrorKind,
13 request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
14 response::RpcSimulateTransactionResult,
15 },
16 solana_sdk::{
17 hash::Hash,
18 message::Message,
19 signature::{Signature, SignerError},
20 signers::Signers,
21 transaction::{Transaction, TransactionError},
22 },
23 solana_tpu_client::tpu_client::{Result, TpuSenderError},
24 std::{
25 sync::{
26 atomic::{AtomicU64, AtomicUsize, Ordering},
27 Arc,
28 },
29 time::Duration,
30 },
31 tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45 last_valid_block_height: u64,
46 message: Message,
47 index: usize,
48 serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53 pub blockhash: Hash,
54 pub last_valid_block_height: u64,
55}
56
57#[derive(Clone, Debug, Copy)]
58pub struct SendAndConfirmConfig {
59 pub with_spinner: bool,
60 pub resign_txs_count: Option<usize>,
61}
62
63pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
65 rpc_client: Arc<BlockingRpcClient>,
66 tpu_client: Option<QuicTpuClient>,
67 messages: &[Message],
68 signers: &T,
69 config: SendAndConfirmConfig,
70) -> Result<Vec<Option<TransactionError>>> {
71 let fut = send_and_confirm_transactions_in_parallel(
72 rpc_client.get_inner_client().clone(),
73 tpu_client,
74 messages,
75 signers,
76 config,
77 );
78 tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
79}
80
81fn create_blockhash_data_updating_task(
82 rpc_client: Arc<RpcClient>,
83 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
84 current_block_height: Arc<AtomicU64>,
85) -> JoinHandle<()> {
86 tokio::spawn(async move {
87 loop {
88 if let Ok((blockhash, last_valid_block_height)) = rpc_client
89 .get_latest_blockhash_with_commitment(rpc_client.commitment())
90 .await
91 {
92 *blockhash_data_rw.write().await = BlockHashData {
93 blockhash,
94 last_valid_block_height,
95 };
96 }
97
98 if let Ok(block_height) = rpc_client.get_block_height().await {
99 current_block_height.store(block_height, Ordering::Relaxed);
100 }
101 tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
102 }
103 })
104}
105
106fn create_transaction_confirmation_task(
107 rpc_client: Arc<RpcClient>,
108 current_block_height: Arc<AtomicU64>,
109 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
110 errors_map: Arc<DashMap<usize, TransactionError>>,
111 num_confirmed_transactions: Arc<AtomicUsize>,
112) -> JoinHandle<()> {
113 tokio::spawn(async move {
114 let mut last_block_height = current_block_height.load(Ordering::Relaxed);
116
117 loop {
118 if !unconfirmed_transaction_map.is_empty() {
119 let current_block_height = current_block_height.load(Ordering::Relaxed);
120 let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
121 .iter()
122 .filter(|x| {
123 let is_not_expired = current_block_height <= x.last_valid_block_height;
124 let is_recently_expired = last_block_height <= x.last_valid_block_height
126 && current_block_height > x.last_valid_block_height;
127 is_not_expired || is_recently_expired
128 })
129 .map(|x| *x.key())
130 .collect();
131 for signatures in
132 transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
133 {
134 if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
135 let statuses = result.value;
136 for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
137 if let Some((status, data)) = status
138 .filter(|status| {
139 status.satisfies_commitment(rpc_client.commitment())
140 })
141 .and_then(|status| {
142 unconfirmed_transaction_map
143 .remove(signature)
144 .map(|(_, data)| (status, data))
145 })
146 {
147 num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
148 match status.err {
149 Some(TransactionError::AlreadyProcessed) | None => {}
150 Some(error) => {
151 errors_map.insert(data.index, error);
152 }
153 }
154 };
155 }
156 }
157 }
158
159 last_block_height = current_block_height;
160 }
161 tokio::time::sleep(Duration::from_secs(1)).await;
162 }
163 })
164}
165
166#[derive(Clone, Debug)]
167struct SendingContext {
168 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
169 error_map: Arc<DashMap<usize, TransactionError>>,
170 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
171 num_confirmed_transactions: Arc<AtomicUsize>,
172 total_transactions: usize,
173 current_block_height: Arc<AtomicU64>,
174}
175fn progress_from_context_and_block_height(
176 context: &SendingContext,
177 last_valid_block_height: u64,
178) -> SendTransactionProgress {
179 SendTransactionProgress {
180 confirmed_transactions: context
181 .num_confirmed_transactions
182 .load(std::sync::atomic::Ordering::Relaxed),
183 total_transactions: context.total_transactions,
184 block_height: context
185 .current_block_height
186 .load(std::sync::atomic::Ordering::Relaxed),
187 last_valid_block_height,
188 }
189}
190
191async fn send_transaction_with_rpc_fallback(
192 rpc_client: &RpcClient,
193 tpu_client: &Option<QuicTpuClient>,
194 transaction: Transaction,
195 serialized_transaction: Vec<u8>,
196 context: &SendingContext,
197 index: usize,
198) -> Result<()> {
199 let send_over_rpc = if let Some(tpu_client) = tpu_client {
200 !tokio::time::timeout(
201 SEND_TIMEOUT_INTERVAL,
202 tpu_client.send_wire_transaction(serialized_transaction.clone()),
203 )
204 .await
205 .unwrap_or(false)
206 } else {
207 true
208 };
209 if send_over_rpc {
210 if let Err(e) = rpc_client.send_transaction(&transaction).await {
211 match &e.kind {
212 ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
213 }
215 ErrorKind::TransactionError(TransactionError::BlockhashNotFound)
216 | ErrorKind::RpcError(RpcError::RpcResponseError {
217 data:
218 RpcResponseErrorData::SendTransactionPreflightFailure(
219 RpcSimulateTransactionResult {
220 err: Some(TransactionError::BlockhashNotFound),
221 ..
222 },
223 ),
224 ..
225 }) => {
226 }
228 ErrorKind::TransactionError(transaction_error)
229 | ErrorKind::RpcError(RpcError::RpcResponseError {
230 data:
231 RpcResponseErrorData::SendTransactionPreflightFailure(
232 RpcSimulateTransactionResult {
233 err: Some(transaction_error),
234 ..
235 },
236 ),
237 ..
238 }) => {
239 context.error_map.insert(index, transaction_error.clone());
241 }
242 _ => {
243 return Err(TpuSenderError::from(e));
244 }
245 }
246 }
247 }
248 Ok(())
249}
250
251async fn sign_all_messages_and_send<T: Signers + ?Sized>(
252 progress_bar: &Option<indicatif::ProgressBar>,
253 rpc_client: &RpcClient,
254 tpu_client: &Option<QuicTpuClient>,
255 messages_with_index: Vec<(usize, Message)>,
256 signers: &T,
257 context: &SendingContext,
258) -> Result<()> {
259 let current_transaction_count = messages_with_index.len();
260 let mut futures = vec![];
261 for (counter, (index, message)) in messages_with_index.iter().enumerate() {
263 let mut transaction = Transaction::new_unsigned(message.clone());
264 futures.push(async move {
265 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
266 let blockhashdata = *context.blockhash_data_rw.read().await;
267
268 transaction
270 .try_sign(signers, blockhashdata.blockhash)
271 .expect("Transaction should be signable");
272 let serialized_transaction =
273 serialize(&transaction).expect("Transaction should serialize");
274 let signature = transaction.signatures[0];
275
276 context.unconfirmed_transaction_map.insert(
278 signature,
279 TransactionData {
280 index: *index,
281 serialized_transaction: serialized_transaction.clone(),
282 last_valid_block_height: blockhashdata.last_valid_block_height,
283 message: message.clone(),
284 },
285 );
286 if let Some(progress_bar) = progress_bar {
287 let progress = progress_from_context_and_block_height(
288 context,
289 blockhashdata.last_valid_block_height,
290 );
291 progress.set_message_for_confirmed_transactions(
292 progress_bar,
293 &format!(
294 "Sending {}/{} transactions",
295 counter + 1,
296 current_transaction_count,
297 ),
298 );
299 }
300 send_transaction_with_rpc_fallback(
301 rpc_client,
302 tpu_client,
303 transaction,
304 serialized_transaction,
305 context,
306 *index,
307 )
308 .await
309 });
310 }
311 join_all(futures)
313 .await
314 .into_iter()
315 .collect::<Result<Vec<()>>>()?;
316 Ok(())
317}
318
319async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
320 progress_bar: &Option<indicatif::ProgressBar>,
321 tpu_client: &Option<QuicTpuClient>,
322 context: &SendingContext,
323) {
324 let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
325 let current_block_height = context.current_block_height.clone();
326
327 let transactions_to_confirm = unconfirmed_transaction_map.len();
328 let max_valid_block_height = unconfirmed_transaction_map
329 .iter()
330 .map(|x| x.last_valid_block_height)
331 .max();
332
333 if let Some(mut max_valid_block_height) = max_valid_block_height {
334 if let Some(progress_bar) = progress_bar {
335 let progress = progress_from_context_and_block_height(context, max_valid_block_height);
336 progress.set_message_for_confirmed_transactions(
337 progress_bar,
338 &format!(
339 "Waiting for next block, {transactions_to_confirm} transactions pending..."
340 ),
341 );
342 }
343
344 while !unconfirmed_transaction_map.is_empty()
346 && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
347 {
348 let block_height = current_block_height.load(Ordering::Relaxed);
349
350 if let Some(tpu_client) = tpu_client {
351 let txs_to_resend_over_tpu = unconfirmed_transaction_map
354 .iter()
355 .filter(|x| block_height < x.last_valid_block_height)
356 .map(|x| x.serialized_transaction.clone())
357 .collect::<Vec<_>>();
358 send_staggered_transactions(
359 progress_bar,
360 tpu_client,
361 txs_to_resend_over_tpu,
362 max_valid_block_height,
363 context,
364 )
365 .await;
366 } else {
367 tokio::time::sleep(Duration::from_millis(100)).await;
368 }
369 if let Some(max_valid_block_height_in_remaining_transaction) =
370 unconfirmed_transaction_map
371 .iter()
372 .map(|x| x.last_valid_block_height)
373 .max()
374 {
375 max_valid_block_height = max_valid_block_height_in_remaining_transaction;
376 }
377 }
378 }
379}
380
381async fn send_staggered_transactions(
382 progress_bar: &Option<indicatif::ProgressBar>,
383 tpu_client: &QuicTpuClient,
384 wire_transactions: Vec<Vec<u8>>,
385 last_valid_block_height: u64,
386 context: &SendingContext,
387) {
388 let current_transaction_count = wire_transactions.len();
389 let futures = wire_transactions
390 .into_iter()
391 .enumerate()
392 .map(|(counter, transaction)| async move {
393 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
394 if let Some(progress_bar) = progress_bar {
395 let progress =
396 progress_from_context_and_block_height(context, last_valid_block_height);
397 progress.set_message_for_confirmed_transactions(
398 progress_bar,
399 &format!(
400 "Resending {}/{} transactions",
401 counter + 1,
402 current_transaction_count,
403 ),
404 );
405 }
406 tokio::time::timeout(
407 SEND_TIMEOUT_INTERVAL,
408 tpu_client.send_wire_transaction(transaction),
409 )
410 .await
411 })
412 .collect::<Vec<_>>();
413 join_all(futures).await;
414}
415
416pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
422 rpc_client: Arc<RpcClient>,
423 tpu_client: Option<QuicTpuClient>,
424 messages: &[Message],
425 signers: &T,
426 config: SendAndConfirmConfig,
427) -> Result<Vec<Option<TransactionError>>> {
428 let (blockhash, last_valid_block_height) = rpc_client
430 .get_latest_blockhash_with_commitment(rpc_client.commitment())
431 .await?;
432 let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
433 blockhash,
434 last_valid_block_height,
435 }));
436
437 messages
439 .iter()
440 .map(|x| {
441 let mut transaction = Transaction::new_unsigned(x.clone());
442 transaction.try_sign(signers, blockhash)
443 })
444 .collect::<std::result::Result<Vec<()>, SignerError>>()?;
445
446 let block_height = rpc_client.get_block_height().await?;
448 let current_block_height = Arc::new(AtomicU64::new(block_height));
449
450 let progress_bar = config.with_spinner.then(|| {
451 let progress_bar = spinner::new_progress_bar();
452 progress_bar.set_message("Setting up...");
453 progress_bar
454 });
455
456 let block_data_task = create_blockhash_data_updating_task(
458 rpc_client.clone(),
459 blockhash_data_rw.clone(),
460 current_block_height.clone(),
461 );
462
463 let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
464 let error_map = Arc::new(DashMap::new());
465 let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
466 let transaction_confirming_task = create_transaction_confirmation_task(
468 rpc_client.clone(),
469 current_block_height.clone(),
470 unconfirmed_transasction_map.clone(),
471 error_map.clone(),
472 num_confirmed_transactions.clone(),
473 );
474
475 let total_transactions = messages.len();
477 let mut initial = true;
478 let signing_count = config.resign_txs_count.unwrap_or(1);
479 let context = SendingContext {
480 unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
481 blockhash_data_rw: blockhash_data_rw.clone(),
482 num_confirmed_transactions: num_confirmed_transactions.clone(),
483 current_block_height: current_block_height.clone(),
484 error_map: error_map.clone(),
485 total_transactions,
486 };
487
488 for expired_blockhash_retries in (0..signing_count).rev() {
489 let messages_with_index: Vec<(usize, Message)> = if initial {
491 initial = false;
492 messages.iter().cloned().enumerate().collect()
493 } else {
494 unconfirmed_transasction_map
496 .iter()
497 .map(|x| (x.index, x.message.clone()))
498 .collect()
499 };
500
501 if messages_with_index.is_empty() {
502 break;
503 }
504
505 unconfirmed_transasction_map.clear();
507
508 sign_all_messages_and_send(
509 &progress_bar,
510 &rpc_client,
511 &tpu_client,
512 messages_with_index,
513 signers,
514 &context,
515 )
516 .await?;
517 confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
518 &progress_bar,
519 &tpu_client,
520 &context,
521 )
522 .await;
523
524 if unconfirmed_transasction_map.is_empty() {
525 break;
526 }
527
528 if let Some(progress_bar) = &progress_bar {
529 progress_bar.println(format!(
530 "Blockhash expired. {expired_blockhash_retries} retries remaining"
531 ));
532 }
533 }
534
535 block_data_task.abort();
536 transaction_confirming_task.abort();
537 if unconfirmed_transasction_map.is_empty() {
538 let mut transaction_errors = vec![None; messages.len()];
539 for iterator in error_map.iter() {
540 transaction_errors[*iterator.key()] = Some(iterator.value().clone());
541 }
542 Ok(transaction_errors)
543 } else {
544 Err(TpuSenderError::Custom("Max retries exceeded".into()))
545 }
546}