1use {
2 crate::{
3 nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4 rpc_client::RpcClient as BlockingRpcClient,
5 },
6 bincode::serialize,
7 dashmap::DashMap,
8 futures_util::future::join_all,
9 solana_hash::Hash,
10 solana_message::Message,
11 solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
12 solana_rpc_client::spinner::{self, SendTransactionProgress},
13 solana_rpc_client_api::{
14 client_error::ErrorKind,
15 config::RpcSendTransactionConfig,
16 request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
17 response::RpcSimulateTransactionResult,
18 },
19 solana_signature::Signature,
20 solana_signer::{signers::Signers, SignerError},
21 solana_tpu_client::tpu_client::{Result, TpuSenderError},
22 solana_transaction::Transaction,
23 solana_transaction_error::TransactionError,
24 std::{
25 sync::{
26 atomic::{AtomicU64, AtomicUsize, Ordering},
27 Arc,
28 },
29 time::Duration,
30 },
31 tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45 last_valid_block_height: u64,
46 message: Message,
47 index: usize,
48 serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53 pub blockhash: Hash,
54 pub last_valid_block_height: u64,
55}
56
57#[derive(Clone, Debug, Copy)]
59pub struct SendAndConfirmConfigV2 {
60 pub with_spinner: bool,
61 pub resign_txs_count: Option<usize>,
62 pub rpc_send_transaction_config: RpcSendTransactionConfig,
63}
64
65pub fn send_and_confirm_transactions_in_parallel_blocking_v2<T: Signers + ?Sized>(
67 rpc_client: Arc<BlockingRpcClient>,
68 tpu_client: Option<QuicTpuClient>,
69 messages: &[Message],
70 signers: &T,
71 config: SendAndConfirmConfigV2,
72) -> Result<Vec<Option<TransactionError>>> {
73 let fut = send_and_confirm_transactions_in_parallel_v2(
74 rpc_client.get_inner_client().clone(),
75 tpu_client,
76 messages,
77 signers,
78 config,
79 );
80 tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
81}
82
83fn create_blockhash_data_updating_task(
84 rpc_client: Arc<RpcClient>,
85 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
86 current_block_height: Arc<AtomicU64>,
87) -> JoinHandle<()> {
88 tokio::spawn(async move {
89 loop {
90 if let Ok((blockhash, last_valid_block_height)) = rpc_client
91 .get_latest_blockhash_with_commitment(rpc_client.commitment())
92 .await
93 {
94 *blockhash_data_rw.write().await = BlockHashData {
95 blockhash,
96 last_valid_block_height,
97 };
98 }
99
100 if let Ok(block_height) = rpc_client.get_block_height().await {
101 current_block_height.store(block_height, Ordering::Relaxed);
102 }
103 tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
104 }
105 })
106}
107
108fn create_transaction_confirmation_task(
109 rpc_client: Arc<RpcClient>,
110 current_block_height: Arc<AtomicU64>,
111 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
112 errors_map: Arc<DashMap<usize, TransactionError>>,
113 num_confirmed_transactions: Arc<AtomicUsize>,
114) -> JoinHandle<()> {
115 tokio::spawn(async move {
116 let mut last_block_height = current_block_height.load(Ordering::Relaxed);
118
119 loop {
120 if !unconfirmed_transaction_map.is_empty() {
121 let current_block_height = current_block_height.load(Ordering::Relaxed);
122 let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
123 .iter()
124 .filter(|x| {
125 let is_not_expired = current_block_height <= x.last_valid_block_height;
126 let is_recently_expired = last_block_height <= x.last_valid_block_height
128 && current_block_height > x.last_valid_block_height;
129 is_not_expired || is_recently_expired
130 })
131 .map(|x| *x.key())
132 .collect();
133 for signatures in
134 transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
135 {
136 if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
137 let statuses = result.value;
138 for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
139 if let Some((status, data)) = status
140 .filter(|status| {
141 status.satisfies_commitment(rpc_client.commitment())
142 })
143 .and_then(|status| {
144 unconfirmed_transaction_map
145 .remove(signature)
146 .map(|(_, data)| (status, data))
147 })
148 {
149 num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
150 match status.err {
151 Some(TransactionError::AlreadyProcessed) | None => {}
152 Some(error) => {
153 errors_map.insert(data.index, error);
154 }
155 }
156 };
157 }
158 }
159 }
160
161 last_block_height = current_block_height;
162 }
163 tokio::time::sleep(Duration::from_secs(1)).await;
164 }
165 })
166}
167
168#[derive(Clone, Debug)]
169struct SendingContext {
170 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
171 error_map: Arc<DashMap<usize, TransactionError>>,
172 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
173 num_confirmed_transactions: Arc<AtomicUsize>,
174 total_transactions: usize,
175 current_block_height: Arc<AtomicU64>,
176}
177fn progress_from_context_and_block_height(
178 context: &SendingContext,
179 last_valid_block_height: u64,
180) -> SendTransactionProgress {
181 SendTransactionProgress {
182 confirmed_transactions: context
183 .num_confirmed_transactions
184 .load(std::sync::atomic::Ordering::Relaxed),
185 total_transactions: context.total_transactions,
186 block_height: context
187 .current_block_height
188 .load(std::sync::atomic::Ordering::Relaxed),
189 last_valid_block_height,
190 }
191}
192
193async fn send_transaction_with_rpc_fallback(
194 rpc_client: &RpcClient,
195 tpu_client: &Option<QuicTpuClient>,
196 transaction: Transaction,
197 serialized_transaction: Vec<u8>,
198 context: &SendingContext,
199 index: usize,
200 rpc_send_transaction_config: RpcSendTransactionConfig,
201) -> Result<()> {
202 let send_over_rpc = if let Some(tpu_client) = tpu_client {
203 !tokio::time::timeout(
204 SEND_TIMEOUT_INTERVAL,
205 tpu_client.send_wire_transaction(serialized_transaction.clone()),
206 )
207 .await
208 .unwrap_or(false)
209 } else {
210 true
211 };
212 if send_over_rpc {
213 if let Err(e) = rpc_client
214 .send_transaction_with_config(
215 &transaction,
216 RpcSendTransactionConfig {
217 preflight_commitment: Some(rpc_client.commitment().commitment),
218 ..rpc_send_transaction_config
219 },
220 )
221 .await
222 {
223 match e.kind() {
224 ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
225 }
227 ErrorKind::TransactionError(TransactionError::BlockhashNotFound) => {
228 }
230 ErrorKind::TransactionError(transaction_error) => {
231 context.error_map.insert(index, transaction_error.clone());
233 }
234 ErrorKind::RpcError(RpcError::RpcResponseError {
235 data:
236 RpcResponseErrorData::SendTransactionPreflightFailure(
237 RpcSimulateTransactionResult {
238 err: Some(ui_transaction_error),
239 ..
240 },
241 ),
242 ..
243 }) => {
244 match TransactionError::from(ui_transaction_error.clone()) {
245 TransactionError::BlockhashNotFound => {
246 }
248 err => {
249 context.error_map.insert(index, err);
251 }
252 }
253 }
254 _ => {
255 return Err(TpuSenderError::from(e));
256 }
257 }
258 }
259 }
260 Ok(())
261}
262
263async fn sign_all_messages_and_send<T: Signers + ?Sized>(
264 progress_bar: &Option<indicatif::ProgressBar>,
265 rpc_client: &RpcClient,
266 tpu_client: &Option<QuicTpuClient>,
267 messages_with_index: Vec<(usize, Message)>,
268 signers: &T,
269 context: &SendingContext,
270 rpc_send_transaction_config: RpcSendTransactionConfig,
271) -> Result<()> {
272 let current_transaction_count = messages_with_index.len();
273 let mut futures = vec![];
274 for (counter, (index, message)) in messages_with_index.iter().enumerate() {
276 let mut transaction = Transaction::new_unsigned(message.clone());
277 futures.push(async move {
278 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
279 let blockhashdata = *context.blockhash_data_rw.read().await;
280
281 transaction
283 .try_sign(signers, blockhashdata.blockhash)
284 .expect("Transaction should be signable");
285 let serialized_transaction =
286 serialize(&transaction).expect("Transaction should serialize");
287 let signature = transaction.signatures[0];
288
289 context.unconfirmed_transaction_map.insert(
291 signature,
292 TransactionData {
293 index: *index,
294 serialized_transaction: serialized_transaction.clone(),
295 last_valid_block_height: blockhashdata.last_valid_block_height,
296 message: message.clone(),
297 },
298 );
299 if let Some(progress_bar) = progress_bar {
300 let progress = progress_from_context_and_block_height(
301 context,
302 blockhashdata.last_valid_block_height,
303 );
304 progress.set_message_for_confirmed_transactions(
305 progress_bar,
306 &format!(
307 "Sending {}/{} transactions",
308 counter + 1,
309 current_transaction_count,
310 ),
311 );
312 }
313 send_transaction_with_rpc_fallback(
314 rpc_client,
315 tpu_client,
316 transaction,
317 serialized_transaction,
318 context,
319 *index,
320 rpc_send_transaction_config,
321 )
322 .await
323 });
324 }
325 join_all(futures)
327 .await
328 .into_iter()
329 .collect::<Result<Vec<()>>>()?;
330 Ok(())
331}
332
333async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
334 progress_bar: &Option<indicatif::ProgressBar>,
335 tpu_client: &Option<QuicTpuClient>,
336 context: &SendingContext,
337) {
338 let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
339 let current_block_height = context.current_block_height.clone();
340
341 let transactions_to_confirm = unconfirmed_transaction_map.len();
342 let max_valid_block_height = unconfirmed_transaction_map
343 .iter()
344 .map(|x| x.last_valid_block_height)
345 .max();
346
347 if let Some(mut max_valid_block_height) = max_valid_block_height {
348 if let Some(progress_bar) = progress_bar {
349 let progress = progress_from_context_and_block_height(context, max_valid_block_height);
350 progress.set_message_for_confirmed_transactions(
351 progress_bar,
352 &format!(
353 "Waiting for next block, {transactions_to_confirm} transactions pending..."
354 ),
355 );
356 }
357
358 while !unconfirmed_transaction_map.is_empty()
360 && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
361 {
362 let block_height = current_block_height.load(Ordering::Relaxed);
363
364 if let Some(tpu_client) = tpu_client {
365 let txs_to_resend_over_tpu = unconfirmed_transaction_map
368 .iter()
369 .filter(|x| block_height < x.last_valid_block_height)
370 .map(|x| x.serialized_transaction.clone())
371 .collect::<Vec<_>>();
372 send_staggered_transactions(
373 progress_bar,
374 tpu_client,
375 txs_to_resend_over_tpu,
376 max_valid_block_height,
377 context,
378 )
379 .await;
380 } else {
381 tokio::time::sleep(Duration::from_millis(100)).await;
382 }
383 if let Some(max_valid_block_height_in_remaining_transaction) =
384 unconfirmed_transaction_map
385 .iter()
386 .map(|x| x.last_valid_block_height)
387 .max()
388 {
389 max_valid_block_height = max_valid_block_height_in_remaining_transaction;
390 }
391 }
392 }
393}
394
395async fn send_staggered_transactions(
396 progress_bar: &Option<indicatif::ProgressBar>,
397 tpu_client: &QuicTpuClient,
398 wire_transactions: Vec<Vec<u8>>,
399 last_valid_block_height: u64,
400 context: &SendingContext,
401) {
402 let current_transaction_count = wire_transactions.len();
403 let futures = wire_transactions
404 .into_iter()
405 .enumerate()
406 .map(|(counter, transaction)| async move {
407 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
408 if let Some(progress_bar) = progress_bar {
409 let progress =
410 progress_from_context_and_block_height(context, last_valid_block_height);
411 progress.set_message_for_confirmed_transactions(
412 progress_bar,
413 &format!(
414 "Resending {}/{} transactions",
415 counter + 1,
416 current_transaction_count,
417 ),
418 );
419 }
420 tokio::time::timeout(
421 SEND_TIMEOUT_INTERVAL,
422 tpu_client.send_wire_transaction(transaction),
423 )
424 .await
425 })
426 .collect::<Vec<_>>();
427 join_all(futures).await;
428}
429
430pub async fn send_and_confirm_transactions_in_parallel_v2<T: Signers + ?Sized>(
436 rpc_client: Arc<RpcClient>,
437 tpu_client: Option<QuicTpuClient>,
438 messages: &[Message],
439 signers: &T,
440 config: SendAndConfirmConfigV2,
441) -> Result<Vec<Option<TransactionError>>> {
442 let (blockhash, last_valid_block_height) = rpc_client
444 .get_latest_blockhash_with_commitment(rpc_client.commitment())
445 .await?;
446 let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
447 blockhash,
448 last_valid_block_height,
449 }));
450
451 messages
453 .iter()
454 .map(|x| {
455 let mut transaction = Transaction::new_unsigned(x.clone());
456 transaction.try_sign(signers, blockhash)
457 })
458 .collect::<std::result::Result<Vec<()>, SignerError>>()?;
459
460 let block_height = rpc_client.get_block_height().await?;
462 let current_block_height = Arc::new(AtomicU64::new(block_height));
463
464 let progress_bar = config.with_spinner.then(|| {
465 let progress_bar = spinner::new_progress_bar();
466 progress_bar.set_message("Setting up...");
467 progress_bar
468 });
469
470 let block_data_task = create_blockhash_data_updating_task(
472 rpc_client.clone(),
473 blockhash_data_rw.clone(),
474 current_block_height.clone(),
475 );
476
477 let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
478 let error_map = Arc::new(DashMap::new());
479 let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
480 let transaction_confirming_task = create_transaction_confirmation_task(
482 rpc_client.clone(),
483 current_block_height.clone(),
484 unconfirmed_transasction_map.clone(),
485 error_map.clone(),
486 num_confirmed_transactions.clone(),
487 );
488
489 let total_transactions = messages.len();
491 let mut initial = true;
492 let signing_count = config.resign_txs_count.unwrap_or(1);
493 let context = SendingContext {
494 unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
495 blockhash_data_rw: blockhash_data_rw.clone(),
496 num_confirmed_transactions: num_confirmed_transactions.clone(),
497 current_block_height: current_block_height.clone(),
498 error_map: error_map.clone(),
499 total_transactions,
500 };
501
502 for expired_blockhash_retries in (0..signing_count).rev() {
503 let messages_with_index: Vec<(usize, Message)> = if initial {
505 initial = false;
506 messages.iter().cloned().enumerate().collect()
507 } else {
508 unconfirmed_transasction_map
510 .iter()
511 .map(|x| (x.index, x.message.clone()))
512 .collect()
513 };
514
515 if messages_with_index.is_empty() {
516 break;
517 }
518
519 unconfirmed_transasction_map.clear();
521
522 sign_all_messages_and_send(
523 &progress_bar,
524 &rpc_client,
525 &tpu_client,
526 messages_with_index,
527 signers,
528 &context,
529 config.rpc_send_transaction_config,
530 )
531 .await?;
532 confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
533 &progress_bar,
534 &tpu_client,
535 &context,
536 )
537 .await;
538
539 if unconfirmed_transasction_map.is_empty() {
540 break;
541 }
542
543 if let Some(progress_bar) = &progress_bar {
544 progress_bar.println(format!(
545 "Blockhash expired. {expired_blockhash_retries} retries remaining"
546 ));
547 }
548 }
549
550 block_data_task.abort();
551 transaction_confirming_task.abort();
552 if unconfirmed_transasction_map.is_empty() {
553 let mut transaction_errors = vec![None; messages.len()];
554 for iterator in error_map.iter() {
555 transaction_errors[*iterator.key()] = Some(iterator.value().clone());
556 }
557 Ok(transaction_errors)
558 } else {
559 Err(TpuSenderError::Custom("Max retries exceeded".into()))
560 }
561}