solana_client/
send_and_confirm_transactions_in_parallel.rs

1use {
2    crate::{
3        nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4        rpc_client::RpcClient as BlockingRpcClient,
5    },
6    bincode::serialize,
7    dashmap::DashMap,
8    futures_util::future::join_all,
9    solana_hash::Hash,
10    solana_message::Message,
11    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
12    solana_rpc_client::spinner::{self, SendTransactionProgress},
13    solana_rpc_client_api::{
14        client_error::ErrorKind,
15        config::RpcSendTransactionConfig,
16        request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
17        response::RpcSimulateTransactionResult,
18    },
19    solana_signature::Signature,
20    solana_signer::{signers::Signers, SignerError},
21    solana_tpu_client::tpu_client::{Result, TpuSenderError},
22    solana_transaction::Transaction,
23    solana_transaction_error::TransactionError,
24    std::{
25        sync::{
26            atomic::{AtomicU64, AtomicUsize, Ordering},
27            Arc,
28        },
29        time::Duration,
30    },
31    tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36// This is a "reasonable" constant for how long it should
37// take to fan the transactions out, taken from
38// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
39const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45    last_valid_block_height: u64,
46    message: Message,
47    index: usize,
48    serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53    pub blockhash: Hash,
54    pub last_valid_block_height: u64,
55}
56
57// New struct with RpcSendTransactionConfig for non-breaking change
58#[derive(Clone, Debug, Copy)]
59pub struct SendAndConfirmConfigV2 {
60    pub with_spinner: bool,
61    pub resign_txs_count: Option<usize>,
62    pub rpc_send_transaction_config: RpcSendTransactionConfig,
63}
64
65/// Sends and confirms transactions concurrently in a sync context
66pub fn send_and_confirm_transactions_in_parallel_blocking_v2<T: Signers + ?Sized>(
67    rpc_client: Arc<BlockingRpcClient>,
68    tpu_client: Option<QuicTpuClient>,
69    messages: &[Message],
70    signers: &T,
71    config: SendAndConfirmConfigV2,
72) -> Result<Vec<Option<TransactionError>>> {
73    let fut = send_and_confirm_transactions_in_parallel_v2(
74        rpc_client.get_inner_client().clone(),
75        tpu_client,
76        messages,
77        signers,
78        config,
79    );
80    tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
81}
82
83fn create_blockhash_data_updating_task(
84    rpc_client: Arc<RpcClient>,
85    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
86    current_block_height: Arc<AtomicU64>,
87) -> JoinHandle<()> {
88    tokio::spawn(async move {
89        loop {
90            if let Ok((blockhash, last_valid_block_height)) = rpc_client
91                .get_latest_blockhash_with_commitment(rpc_client.commitment())
92                .await
93            {
94                *blockhash_data_rw.write().await = BlockHashData {
95                    blockhash,
96                    last_valid_block_height,
97                };
98            }
99
100            if let Ok(block_height) = rpc_client.get_block_height().await {
101                current_block_height.store(block_height, Ordering::Relaxed);
102            }
103            tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
104        }
105    })
106}
107
108fn create_transaction_confirmation_task(
109    rpc_client: Arc<RpcClient>,
110    current_block_height: Arc<AtomicU64>,
111    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
112    errors_map: Arc<DashMap<usize, TransactionError>>,
113    num_confirmed_transactions: Arc<AtomicUsize>,
114) -> JoinHandle<()> {
115    tokio::spawn(async move {
116        // check transactions that are not expired or have just expired between two checks
117        let mut last_block_height = current_block_height.load(Ordering::Relaxed);
118
119        loop {
120            if !unconfirmed_transaction_map.is_empty() {
121                let current_block_height = current_block_height.load(Ordering::Relaxed);
122                let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
123                    .iter()
124                    .filter(|x| {
125                        let is_not_expired = current_block_height <= x.last_valid_block_height;
126                        // transaction expired between last and current check
127                        let is_recently_expired = last_block_height <= x.last_valid_block_height
128                            && current_block_height > x.last_valid_block_height;
129                        is_not_expired || is_recently_expired
130                    })
131                    .map(|x| *x.key())
132                    .collect();
133                for signatures in
134                    transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
135                {
136                    if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
137                        let statuses = result.value;
138                        for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
139                            if let Some((status, data)) = status
140                                .filter(|status| {
141                                    status.satisfies_commitment(rpc_client.commitment())
142                                })
143                                .and_then(|status| {
144                                    unconfirmed_transaction_map
145                                        .remove(signature)
146                                        .map(|(_, data)| (status, data))
147                                })
148                            {
149                                num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
150                                match status.err {
151                                    Some(TransactionError::AlreadyProcessed) | None => {}
152                                    Some(error) => {
153                                        errors_map.insert(data.index, error);
154                                    }
155                                }
156                            };
157                        }
158                    }
159                }
160
161                last_block_height = current_block_height;
162            }
163            tokio::time::sleep(Duration::from_secs(1)).await;
164        }
165    })
166}
167
168#[derive(Clone, Debug)]
169struct SendingContext {
170    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
171    error_map: Arc<DashMap<usize, TransactionError>>,
172    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
173    num_confirmed_transactions: Arc<AtomicUsize>,
174    total_transactions: usize,
175    current_block_height: Arc<AtomicU64>,
176}
177fn progress_from_context_and_block_height(
178    context: &SendingContext,
179    last_valid_block_height: u64,
180) -> SendTransactionProgress {
181    SendTransactionProgress {
182        confirmed_transactions: context
183            .num_confirmed_transactions
184            .load(std::sync::atomic::Ordering::Relaxed),
185        total_transactions: context.total_transactions,
186        block_height: context
187            .current_block_height
188            .load(std::sync::atomic::Ordering::Relaxed),
189        last_valid_block_height,
190    }
191}
192
193async fn send_transaction_with_rpc_fallback(
194    rpc_client: &RpcClient,
195    tpu_client: &Option<QuicTpuClient>,
196    transaction: Transaction,
197    serialized_transaction: Vec<u8>,
198    context: &SendingContext,
199    index: usize,
200    rpc_send_transaction_config: RpcSendTransactionConfig,
201) -> Result<()> {
202    let send_over_rpc = if let Some(tpu_client) = tpu_client {
203        !tokio::time::timeout(
204            SEND_TIMEOUT_INTERVAL,
205            tpu_client.send_wire_transaction(serialized_transaction.clone()),
206        )
207        .await
208        .unwrap_or(false)
209    } else {
210        true
211    };
212    if send_over_rpc {
213        if let Err(e) = rpc_client
214            .send_transaction_with_config(
215                &transaction,
216                RpcSendTransactionConfig {
217                    preflight_commitment: Some(rpc_client.commitment().commitment),
218                    ..rpc_send_transaction_config
219                },
220            )
221            .await
222        {
223            match e.kind() {
224                ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
225                    // fall through on io error, we will retry the transaction
226                }
227                ErrorKind::TransactionError(TransactionError::BlockhashNotFound) => {
228                    // fall through so that we will resend with another blockhash
229                }
230                ErrorKind::TransactionError(transaction_error) => {
231                    // if we get other than blockhash not found error the transaction is invalid
232                    context.error_map.insert(index, transaction_error.clone());
233                }
234                ErrorKind::RpcError(RpcError::RpcResponseError {
235                    data:
236                        RpcResponseErrorData::SendTransactionPreflightFailure(
237                            RpcSimulateTransactionResult {
238                                err: Some(ui_transaction_error),
239                                ..
240                            },
241                        ),
242                    ..
243                }) => {
244                    match TransactionError::from(ui_transaction_error.clone()) {
245                        TransactionError::BlockhashNotFound => {
246                            // fall through so that we will resend with another blockhash
247                        }
248                        err => {
249                            // if we get other than blockhash not found error the transaction is invalid
250                            context.error_map.insert(index, err);
251                        }
252                    }
253                }
254                _ => {
255                    return Err(TpuSenderError::from(e));
256                }
257            }
258        }
259    }
260    Ok(())
261}
262
263async fn sign_all_messages_and_send<T: Signers + ?Sized>(
264    progress_bar: &Option<indicatif::ProgressBar>,
265    rpc_client: &RpcClient,
266    tpu_client: &Option<QuicTpuClient>,
267    messages_with_index: Vec<(usize, Message)>,
268    signers: &T,
269    context: &SendingContext,
270    rpc_send_transaction_config: RpcSendTransactionConfig,
271) -> Result<()> {
272    let current_transaction_count = messages_with_index.len();
273    let mut futures = vec![];
274    // send all the transaction messages
275    for (counter, (index, message)) in messages_with_index.iter().enumerate() {
276        let mut transaction = Transaction::new_unsigned(message.clone());
277        futures.push(async move {
278            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
279            let blockhashdata = *context.blockhash_data_rw.read().await;
280
281            // we have already checked if all transactions are signable.
282            transaction
283                .try_sign(signers, blockhashdata.blockhash)
284                .expect("Transaction should be signable");
285            let serialized_transaction =
286                serialize(&transaction).expect("Transaction should serialize");
287            let signature = transaction.signatures[0];
288
289            // send to confirm the transaction
290            context.unconfirmed_transaction_map.insert(
291                signature,
292                TransactionData {
293                    index: *index,
294                    serialized_transaction: serialized_transaction.clone(),
295                    last_valid_block_height: blockhashdata.last_valid_block_height,
296                    message: message.clone(),
297                },
298            );
299            if let Some(progress_bar) = progress_bar {
300                let progress = progress_from_context_and_block_height(
301                    context,
302                    blockhashdata.last_valid_block_height,
303                );
304                progress.set_message_for_confirmed_transactions(
305                    progress_bar,
306                    &format!(
307                        "Sending {}/{} transactions",
308                        counter + 1,
309                        current_transaction_count,
310                    ),
311                );
312            }
313            send_transaction_with_rpc_fallback(
314                rpc_client,
315                tpu_client,
316                transaction,
317                serialized_transaction,
318                context,
319                *index,
320                rpc_send_transaction_config,
321            )
322            .await
323        });
324    }
325    // collect to convert Vec<Result<_>> to Result<Vec<_>>
326    join_all(futures)
327        .await
328        .into_iter()
329        .collect::<Result<Vec<()>>>()?;
330    Ok(())
331}
332
333async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
334    progress_bar: &Option<indicatif::ProgressBar>,
335    tpu_client: &Option<QuicTpuClient>,
336    context: &SendingContext,
337) {
338    let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
339    let current_block_height = context.current_block_height.clone();
340
341    let transactions_to_confirm = unconfirmed_transaction_map.len();
342    let max_valid_block_height = unconfirmed_transaction_map
343        .iter()
344        .map(|x| x.last_valid_block_height)
345        .max();
346
347    if let Some(mut max_valid_block_height) = max_valid_block_height {
348        if let Some(progress_bar) = progress_bar {
349            let progress = progress_from_context_and_block_height(context, max_valid_block_height);
350            progress.set_message_for_confirmed_transactions(
351                progress_bar,
352                &format!(
353                    "Waiting for next block, {transactions_to_confirm} transactions pending..."
354                ),
355            );
356        }
357
358        // wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
359        while !unconfirmed_transaction_map.is_empty()
360            && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
361        {
362            let block_height = current_block_height.load(Ordering::Relaxed);
363
364            if let Some(tpu_client) = tpu_client {
365                // retry sending transaction only over TPU port
366                // any transactions sent over RPC will be automatically rebroadcast by the RPC server
367                let txs_to_resend_over_tpu = unconfirmed_transaction_map
368                    .iter()
369                    .filter(|x| block_height < x.last_valid_block_height)
370                    .map(|x| x.serialized_transaction.clone())
371                    .collect::<Vec<_>>();
372                send_staggered_transactions(
373                    progress_bar,
374                    tpu_client,
375                    txs_to_resend_over_tpu,
376                    max_valid_block_height,
377                    context,
378                )
379                .await;
380            } else {
381                tokio::time::sleep(Duration::from_millis(100)).await;
382            }
383            if let Some(max_valid_block_height_in_remaining_transaction) =
384                unconfirmed_transaction_map
385                    .iter()
386                    .map(|x| x.last_valid_block_height)
387                    .max()
388            {
389                max_valid_block_height = max_valid_block_height_in_remaining_transaction;
390            }
391        }
392    }
393}
394
395async fn send_staggered_transactions(
396    progress_bar: &Option<indicatif::ProgressBar>,
397    tpu_client: &QuicTpuClient,
398    wire_transactions: Vec<Vec<u8>>,
399    last_valid_block_height: u64,
400    context: &SendingContext,
401) {
402    let current_transaction_count = wire_transactions.len();
403    let futures = wire_transactions
404        .into_iter()
405        .enumerate()
406        .map(|(counter, transaction)| async move {
407            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
408            if let Some(progress_bar) = progress_bar {
409                let progress =
410                    progress_from_context_and_block_height(context, last_valid_block_height);
411                progress.set_message_for_confirmed_transactions(
412                    progress_bar,
413                    &format!(
414                        "Resending {}/{} transactions",
415                        counter + 1,
416                        current_transaction_count,
417                    ),
418                );
419            }
420            tokio::time::timeout(
421                SEND_TIMEOUT_INTERVAL,
422                tpu_client.send_wire_transaction(transaction),
423            )
424            .await
425        })
426        .collect::<Vec<_>>();
427    join_all(futures).await;
428}
429
430/// Sends and confirms transactions concurrently
431///
432/// The sending and confirmation of transactions is done in parallel tasks
433/// The method signs transactions just before sending so that blockhash does not
434/// expire.
435pub async fn send_and_confirm_transactions_in_parallel_v2<T: Signers + ?Sized>(
436    rpc_client: Arc<RpcClient>,
437    tpu_client: Option<QuicTpuClient>,
438    messages: &[Message],
439    signers: &T,
440    config: SendAndConfirmConfigV2,
441) -> Result<Vec<Option<TransactionError>>> {
442    // get current blockhash and corresponding last valid block height
443    let (blockhash, last_valid_block_height) = rpc_client
444        .get_latest_blockhash_with_commitment(rpc_client.commitment())
445        .await?;
446    let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
447        blockhash,
448        last_valid_block_height,
449    }));
450
451    // check if all the messages are signable by the signers
452    messages
453        .iter()
454        .map(|x| {
455            let mut transaction = Transaction::new_unsigned(x.clone());
456            transaction.try_sign(signers, blockhash)
457        })
458        .collect::<std::result::Result<Vec<()>, SignerError>>()?;
459
460    // get current block height
461    let block_height = rpc_client.get_block_height().await?;
462    let current_block_height = Arc::new(AtomicU64::new(block_height));
463
464    let progress_bar = config.with_spinner.then(|| {
465        let progress_bar = spinner::new_progress_bar();
466        progress_bar.set_message("Setting up...");
467        progress_bar
468    });
469
470    // blockhash and block height update task
471    let block_data_task = create_blockhash_data_updating_task(
472        rpc_client.clone(),
473        blockhash_data_rw.clone(),
474        current_block_height.clone(),
475    );
476
477    let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
478    let error_map = Arc::new(DashMap::new());
479    let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
480    // tasks which confirms the transactions that were sent
481    let transaction_confirming_task = create_transaction_confirmation_task(
482        rpc_client.clone(),
483        current_block_height.clone(),
484        unconfirmed_transasction_map.clone(),
485        error_map.clone(),
486        num_confirmed_transactions.clone(),
487    );
488
489    // transaction sender task
490    let total_transactions = messages.len();
491    let mut initial = true;
492    let signing_count = config.resign_txs_count.unwrap_or(1);
493    let context = SendingContext {
494        unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
495        blockhash_data_rw: blockhash_data_rw.clone(),
496        num_confirmed_transactions: num_confirmed_transactions.clone(),
497        current_block_height: current_block_height.clone(),
498        error_map: error_map.clone(),
499        total_transactions,
500    };
501
502    for expired_blockhash_retries in (0..signing_count).rev() {
503        // only send messages which have not been confirmed
504        let messages_with_index: Vec<(usize, Message)> = if initial {
505            initial = false;
506            messages.iter().cloned().enumerate().collect()
507        } else {
508            // remove all the confirmed transactions
509            unconfirmed_transasction_map
510                .iter()
511                .map(|x| (x.index, x.message.clone()))
512                .collect()
513        };
514
515        if messages_with_index.is_empty() {
516            break;
517        }
518
519        // clear the map so that we can start resending
520        unconfirmed_transasction_map.clear();
521
522        sign_all_messages_and_send(
523            &progress_bar,
524            &rpc_client,
525            &tpu_client,
526            messages_with_index,
527            signers,
528            &context,
529            config.rpc_send_transaction_config,
530        )
531        .await?;
532        confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
533            &progress_bar,
534            &tpu_client,
535            &context,
536        )
537        .await;
538
539        if unconfirmed_transasction_map.is_empty() {
540            break;
541        }
542
543        if let Some(progress_bar) = &progress_bar {
544            progress_bar.println(format!(
545                "Blockhash expired. {expired_blockhash_retries} retries remaining"
546            ));
547        }
548    }
549
550    block_data_task.abort();
551    transaction_confirming_task.abort();
552    if unconfirmed_transasction_map.is_empty() {
553        let mut transaction_errors = vec![None; messages.len()];
554        for iterator in error_map.iter() {
555            transaction_errors[*iterator.key()] = Some(iterator.value().clone());
556        }
557        Ok(transaction_errors)
558    } else {
559        Err(TpuSenderError::Custom("Max retries exceeded".into()))
560    }
561}