solana_client/
send_and_confirm_transactions_in_parallel.rs

1use {
2    crate::{
3        nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4        rpc_client::RpcClient as BlockingRpcClient,
5    },
6    bincode::serialize,
7    dashmap::DashMap,
8    futures_util::future::join_all,
9    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
10    solana_rpc_client::spinner::{self, SendTransactionProgress},
11    solana_rpc_client_api::{
12        client_error::ErrorKind,
13        request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
14        response::RpcSimulateTransactionResult,
15    },
16    solana_sdk::{
17        hash::Hash,
18        message::Message,
19        signature::{Signature, SignerError},
20        signers::Signers,
21        transaction::{Transaction, TransactionError},
22    },
23    solana_tpu_client::tpu_client::{Result, TpuSenderError},
24    std::{
25        sync::{
26            atomic::{AtomicU64, AtomicUsize, Ordering},
27            Arc,
28        },
29        time::Duration,
30    },
31    tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36// This is a "reasonable" constant for how long it should
37// take to fan the transactions out, taken from
38// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
39const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45    last_valid_block_height: u64,
46    message: Message,
47    index: usize,
48    serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53    pub blockhash: Hash,
54    pub last_valid_block_height: u64,
55}
56
57#[derive(Clone, Debug, Copy)]
58pub struct SendAndConfirmConfig {
59    pub with_spinner: bool,
60    pub resign_txs_count: Option<usize>,
61}
62
63/// Sends and confirms transactions concurrently in a sync context
64pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
65    rpc_client: Arc<BlockingRpcClient>,
66    tpu_client: Option<QuicTpuClient>,
67    messages: &[Message],
68    signers: &T,
69    config: SendAndConfirmConfig,
70) -> Result<Vec<Option<TransactionError>>> {
71    let fut = send_and_confirm_transactions_in_parallel(
72        rpc_client.get_inner_client().clone(),
73        tpu_client,
74        messages,
75        signers,
76        config,
77    );
78    tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
79}
80
81fn create_blockhash_data_updating_task(
82    rpc_client: Arc<RpcClient>,
83    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
84    current_block_height: Arc<AtomicU64>,
85) -> JoinHandle<()> {
86    tokio::spawn(async move {
87        loop {
88            if let Ok((blockhash, last_valid_block_height)) = rpc_client
89                .get_latest_blockhash_with_commitment(rpc_client.commitment())
90                .await
91            {
92                *blockhash_data_rw.write().await = BlockHashData {
93                    blockhash,
94                    last_valid_block_height,
95                };
96            }
97
98            if let Ok(block_height) = rpc_client.get_block_height().await {
99                current_block_height.store(block_height, Ordering::Relaxed);
100            }
101            tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
102        }
103    })
104}
105
106fn create_transaction_confirmation_task(
107    rpc_client: Arc<RpcClient>,
108    current_block_height: Arc<AtomicU64>,
109    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
110    errors_map: Arc<DashMap<usize, TransactionError>>,
111    num_confirmed_transactions: Arc<AtomicUsize>,
112) -> JoinHandle<()> {
113    tokio::spawn(async move {
114        // check transactions that are not expired or have just expired between two checks
115        let mut last_block_height = current_block_height.load(Ordering::Relaxed);
116
117        loop {
118            if !unconfirmed_transaction_map.is_empty() {
119                let current_block_height = current_block_height.load(Ordering::Relaxed);
120                let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
121                    .iter()
122                    .filter(|x| {
123                        let is_not_expired = current_block_height <= x.last_valid_block_height;
124                        // transaction expired between last and current check
125                        let is_recently_expired = last_block_height <= x.last_valid_block_height
126                            && current_block_height > x.last_valid_block_height;
127                        is_not_expired || is_recently_expired
128                    })
129                    .map(|x| *x.key())
130                    .collect();
131                for signatures in
132                    transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
133                {
134                    if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
135                        let statuses = result.value;
136                        for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
137                            if let Some((status, data)) = status
138                                .filter(|status| {
139                                    status.satisfies_commitment(rpc_client.commitment())
140                                })
141                                .and_then(|status| {
142                                    unconfirmed_transaction_map
143                                        .remove(signature)
144                                        .map(|(_, data)| (status, data))
145                                })
146                            {
147                                num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
148                                match status.err {
149                                    Some(TransactionError::AlreadyProcessed) | None => {}
150                                    Some(error) => {
151                                        errors_map.insert(data.index, error);
152                                    }
153                                }
154                            };
155                        }
156                    }
157                }
158
159                last_block_height = current_block_height;
160            }
161            tokio::time::sleep(Duration::from_secs(1)).await;
162        }
163    })
164}
165
166#[derive(Clone, Debug)]
167struct SendingContext {
168    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
169    error_map: Arc<DashMap<usize, TransactionError>>,
170    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
171    num_confirmed_transactions: Arc<AtomicUsize>,
172    total_transactions: usize,
173    current_block_height: Arc<AtomicU64>,
174}
175fn progress_from_context_and_block_height(
176    context: &SendingContext,
177    last_valid_block_height: u64,
178) -> SendTransactionProgress {
179    SendTransactionProgress {
180        confirmed_transactions: context
181            .num_confirmed_transactions
182            .load(std::sync::atomic::Ordering::Relaxed),
183        total_transactions: context.total_transactions,
184        block_height: context
185            .current_block_height
186            .load(std::sync::atomic::Ordering::Relaxed),
187        last_valid_block_height,
188    }
189}
190
191async fn send_transaction_with_rpc_fallback(
192    rpc_client: &RpcClient,
193    tpu_client: &Option<QuicTpuClient>,
194    transaction: Transaction,
195    serialized_transaction: Vec<u8>,
196    context: &SendingContext,
197    index: usize,
198) -> Result<()> {
199    let send_over_rpc = if let Some(tpu_client) = tpu_client {
200        !tokio::time::timeout(
201            SEND_TIMEOUT_INTERVAL,
202            tpu_client.send_wire_transaction(serialized_transaction.clone()),
203        )
204        .await
205        .unwrap_or(false)
206    } else {
207        true
208    };
209    if send_over_rpc {
210        if let Err(e) = rpc_client.send_transaction(&transaction).await {
211            match &e.kind {
212                ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
213                    // fall through on io error, we will retry the transaction
214                }
215                ErrorKind::TransactionError(TransactionError::BlockhashNotFound)
216                | ErrorKind::RpcError(RpcError::RpcResponseError {
217                    data:
218                        RpcResponseErrorData::SendTransactionPreflightFailure(
219                            RpcSimulateTransactionResult {
220                                err: Some(TransactionError::BlockhashNotFound),
221                                ..
222                            },
223                        ),
224                    ..
225                }) => {
226                    // fall through so that we will resend with another blockhash
227                }
228                ErrorKind::TransactionError(transaction_error)
229                | ErrorKind::RpcError(RpcError::RpcResponseError {
230                    data:
231                        RpcResponseErrorData::SendTransactionPreflightFailure(
232                            RpcSimulateTransactionResult {
233                                err: Some(transaction_error),
234                                ..
235                            },
236                        ),
237                    ..
238                }) => {
239                    // if we get other than blockhash not found error the transaction is invalid
240                    context.error_map.insert(index, transaction_error.clone());
241                }
242                _ => {
243                    return Err(TpuSenderError::from(e));
244                }
245            }
246        }
247    }
248    Ok(())
249}
250
251async fn sign_all_messages_and_send<T: Signers + ?Sized>(
252    progress_bar: &Option<indicatif::ProgressBar>,
253    rpc_client: &RpcClient,
254    tpu_client: &Option<QuicTpuClient>,
255    messages_with_index: Vec<(usize, Message)>,
256    signers: &T,
257    context: &SendingContext,
258) -> Result<()> {
259    let current_transaction_count = messages_with_index.len();
260    let mut futures = vec![];
261    // send all the transaction messages
262    for (counter, (index, message)) in messages_with_index.iter().enumerate() {
263        let mut transaction = Transaction::new_unsigned(message.clone());
264        futures.push(async move {
265            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
266            let blockhashdata = *context.blockhash_data_rw.read().await;
267
268            // we have already checked if all transactions are signable.
269            transaction
270                .try_sign(signers, blockhashdata.blockhash)
271                .expect("Transaction should be signable");
272            let serialized_transaction =
273                serialize(&transaction).expect("Transaction should serialize");
274            let signature = transaction.signatures[0];
275
276            // send to confirm the transaction
277            context.unconfirmed_transaction_map.insert(
278                signature,
279                TransactionData {
280                    index: *index,
281                    serialized_transaction: serialized_transaction.clone(),
282                    last_valid_block_height: blockhashdata.last_valid_block_height,
283                    message: message.clone(),
284                },
285            );
286            if let Some(progress_bar) = progress_bar {
287                let progress = progress_from_context_and_block_height(
288                    context,
289                    blockhashdata.last_valid_block_height,
290                );
291                progress.set_message_for_confirmed_transactions(
292                    progress_bar,
293                    &format!(
294                        "Sending {}/{} transactions",
295                        counter + 1,
296                        current_transaction_count,
297                    ),
298                );
299            }
300            send_transaction_with_rpc_fallback(
301                rpc_client,
302                tpu_client,
303                transaction,
304                serialized_transaction,
305                context,
306                *index,
307            )
308            .await
309        });
310    }
311    // collect to convert Vec<Result<_>> to Result<Vec<_>>
312    join_all(futures)
313        .await
314        .into_iter()
315        .collect::<Result<Vec<()>>>()?;
316    Ok(())
317}
318
319async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
320    progress_bar: &Option<indicatif::ProgressBar>,
321    tpu_client: &Option<QuicTpuClient>,
322    context: &SendingContext,
323) {
324    let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
325    let current_block_height = context.current_block_height.clone();
326
327    let transactions_to_confirm = unconfirmed_transaction_map.len();
328    let max_valid_block_height = unconfirmed_transaction_map
329        .iter()
330        .map(|x| x.last_valid_block_height)
331        .max();
332
333    if let Some(mut max_valid_block_height) = max_valid_block_height {
334        if let Some(progress_bar) = progress_bar {
335            let progress = progress_from_context_and_block_height(context, max_valid_block_height);
336            progress.set_message_for_confirmed_transactions(
337                progress_bar,
338                &format!(
339                    "Waiting for next block, {transactions_to_confirm} transactions pending..."
340                ),
341            );
342        }
343
344        // wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
345        while !unconfirmed_transaction_map.is_empty()
346            && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
347        {
348            let block_height = current_block_height.load(Ordering::Relaxed);
349
350            if let Some(tpu_client) = tpu_client {
351                // retry sending transaction only over TPU port
352                // any transactions sent over RPC will be automatically rebroadcast by the RPC server
353                let txs_to_resend_over_tpu = unconfirmed_transaction_map
354                    .iter()
355                    .filter(|x| block_height < x.last_valid_block_height)
356                    .map(|x| x.serialized_transaction.clone())
357                    .collect::<Vec<_>>();
358                send_staggered_transactions(
359                    progress_bar,
360                    tpu_client,
361                    txs_to_resend_over_tpu,
362                    max_valid_block_height,
363                    context,
364                )
365                .await;
366            } else {
367                tokio::time::sleep(Duration::from_millis(100)).await;
368            }
369            if let Some(max_valid_block_height_in_remaining_transaction) =
370                unconfirmed_transaction_map
371                    .iter()
372                    .map(|x| x.last_valid_block_height)
373                    .max()
374            {
375                max_valid_block_height = max_valid_block_height_in_remaining_transaction;
376            }
377        }
378    }
379}
380
381async fn send_staggered_transactions(
382    progress_bar: &Option<indicatif::ProgressBar>,
383    tpu_client: &QuicTpuClient,
384    wire_transactions: Vec<Vec<u8>>,
385    last_valid_block_height: u64,
386    context: &SendingContext,
387) {
388    let current_transaction_count = wire_transactions.len();
389    let futures = wire_transactions
390        .into_iter()
391        .enumerate()
392        .map(|(counter, transaction)| async move {
393            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
394            if let Some(progress_bar) = progress_bar {
395                let progress =
396                    progress_from_context_and_block_height(context, last_valid_block_height);
397                progress.set_message_for_confirmed_transactions(
398                    progress_bar,
399                    &format!(
400                        "Resending {}/{} transactions",
401                        counter + 1,
402                        current_transaction_count,
403                    ),
404                );
405            }
406            tokio::time::timeout(
407                SEND_TIMEOUT_INTERVAL,
408                tpu_client.send_wire_transaction(transaction),
409            )
410            .await
411        })
412        .collect::<Vec<_>>();
413    join_all(futures).await;
414}
415
416/// Sends and confirms transactions concurrently
417///
418/// The sending and confirmation of transactions is done in parallel tasks
419/// The method signs transactions just before sending so that blockhash does not
420/// expire.
421pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
422    rpc_client: Arc<RpcClient>,
423    tpu_client: Option<QuicTpuClient>,
424    messages: &[Message],
425    signers: &T,
426    config: SendAndConfirmConfig,
427) -> Result<Vec<Option<TransactionError>>> {
428    // get current blockhash and corresponding last valid block height
429    let (blockhash, last_valid_block_height) = rpc_client
430        .get_latest_blockhash_with_commitment(rpc_client.commitment())
431        .await?;
432    let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
433        blockhash,
434        last_valid_block_height,
435    }));
436
437    // check if all the messages are signable by the signers
438    messages
439        .iter()
440        .map(|x| {
441            let mut transaction = Transaction::new_unsigned(x.clone());
442            transaction.try_sign(signers, blockhash)
443        })
444        .collect::<std::result::Result<Vec<()>, SignerError>>()?;
445
446    // get current block height
447    let block_height = rpc_client.get_block_height().await?;
448    let current_block_height = Arc::new(AtomicU64::new(block_height));
449
450    let progress_bar = config.with_spinner.then(|| {
451        let progress_bar = spinner::new_progress_bar();
452        progress_bar.set_message("Setting up...");
453        progress_bar
454    });
455
456    // blockhash and block height update task
457    let block_data_task = create_blockhash_data_updating_task(
458        rpc_client.clone(),
459        blockhash_data_rw.clone(),
460        current_block_height.clone(),
461    );
462
463    let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
464    let error_map = Arc::new(DashMap::new());
465    let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
466    // tasks which confirms the transactions that were sent
467    let transaction_confirming_task = create_transaction_confirmation_task(
468        rpc_client.clone(),
469        current_block_height.clone(),
470        unconfirmed_transasction_map.clone(),
471        error_map.clone(),
472        num_confirmed_transactions.clone(),
473    );
474
475    // transaction sender task
476    let total_transactions = messages.len();
477    let mut initial = true;
478    let signing_count = config.resign_txs_count.unwrap_or(1);
479    let context = SendingContext {
480        unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
481        blockhash_data_rw: blockhash_data_rw.clone(),
482        num_confirmed_transactions: num_confirmed_transactions.clone(),
483        current_block_height: current_block_height.clone(),
484        error_map: error_map.clone(),
485        total_transactions,
486    };
487
488    for expired_blockhash_retries in (0..signing_count).rev() {
489        // only send messages which have not been confirmed
490        let messages_with_index: Vec<(usize, Message)> = if initial {
491            initial = false;
492            messages.iter().cloned().enumerate().collect()
493        } else {
494            // remove all the confirmed transactions
495            unconfirmed_transasction_map
496                .iter()
497                .map(|x| (x.index, x.message.clone()))
498                .collect()
499        };
500
501        if messages_with_index.is_empty() {
502            break;
503        }
504
505        // clear the map so that we can start resending
506        unconfirmed_transasction_map.clear();
507
508        sign_all_messages_and_send(
509            &progress_bar,
510            &rpc_client,
511            &tpu_client,
512            messages_with_index,
513            signers,
514            &context,
515        )
516        .await?;
517        confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
518            &progress_bar,
519            &tpu_client,
520            &context,
521        )
522        .await;
523
524        if unconfirmed_transasction_map.is_empty() {
525            break;
526        }
527
528        if let Some(progress_bar) = &progress_bar {
529            progress_bar.println(format!(
530                "Blockhash expired. {expired_blockhash_retries} retries remaining"
531            ));
532        }
533    }
534
535    block_data_task.abort();
536    transaction_confirming_task.abort();
537    if unconfirmed_transasction_map.is_empty() {
538        let mut transaction_errors = vec![None; messages.len()];
539        for iterator in error_map.iter() {
540            transaction_errors[*iterator.key()] = Some(iterator.value().clone());
541        }
542        Ok(transaction_errors)
543    } else {
544        Err(TpuSenderError::Custom("Max retries exceeded".into()))
545    }
546}