data_anchor_client/batch_client/
client.rs

1use std::sync::Arc;
2
3use anchor_lang::solana_program::message::Message;
4use itertools::Itertools;
5use solana_client::{client_error::ClientError as Error, nonblocking::rpc_client::RpcClient};
6use solana_keypair::Keypair;
7use solana_transaction::Transaction;
8use tokio::{
9    sync::mpsc,
10    time::{Duration, Instant, sleep, timeout_at},
11};
12use tracing::{Span, info, warn};
13
14use super::{
15    channels::Channels,
16    messages::{self, SendTransactionMessage, StatusMessage},
17    tasks::{
18        block_watcher::spawn_block_watcher, transaction_confirmer::spawn_transaction_confirmer,
19        transaction_sender::spawn_transaction_sender,
20    },
21    transaction::{TransactionOutcome, TransactionProgress, TransactionStatus},
22};
23
24/// Send at ~333 TPS
25pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(1);
26
27/// A client that wraps an [`RpcClient`] and uses it to submit batches of transactions.
28pub struct BatchClient {
29    transaction_sender_tx: Arc<mpsc::UnboundedSender<SendTransactionMessage>>,
30}
31
32// Clone can't be derived because of the phantom references to the TPU implementation details.
33impl Clone for BatchClient {
34    fn clone(&self) -> Self {
35        Self {
36            transaction_sender_tx: self.transaction_sender_tx.clone(),
37        }
38    }
39}
40
41impl BatchClient {
42    /// Creates a new [`BatchClient`], and spawns the associated background tasks. The background
43    /// tasks will run until the [`BatchClient`] is dropped.
44    pub async fn new(
45        rpc_client: Arc<RpcClient>,
46        signers: Vec<Arc<Keypair>>,
47    ) -> Result<Self, Error> {
48        let Channels {
49            blockdata_tx,
50            mut blockdata_rx,
51            transaction_confirmer_tx,
52            transaction_confirmer_rx,
53            transaction_sender_tx,
54            transaction_sender_rx,
55        } = Channels::new();
56
57        spawn_block_watcher(blockdata_tx, rpc_client.clone());
58        // Wait for the first update so the default value is never visible.
59        let _ = blockdata_rx.changed().await;
60
61        spawn_transaction_confirmer(
62            rpc_client.clone(),
63            blockdata_rx.clone(),
64            transaction_sender_tx.downgrade(),
65            transaction_confirmer_tx.downgrade(),
66            transaction_confirmer_rx,
67        );
68
69        spawn_transaction_sender(
70            rpc_client.clone(),
71            signers.clone(),
72            blockdata_rx.clone(),
73            transaction_confirmer_tx.clone(),
74            transaction_sender_tx.downgrade(),
75            transaction_sender_rx,
76        );
77
78        Ok(Self {
79            transaction_sender_tx,
80        })
81    }
82
83    /// Queue a batch of transactions to be sent to the network. An attempt will be made to submit
84    /// the transactions in the provided order, they can be reordered, especially in case of
85    /// re-submissions. The client will re-submit the transactions until they are successfully
86    /// confirmed or the timeout is reached, if one is provided.
87    ///
88    /// Cancel safety: Dropping the future returned by this method will stop any further
89    /// re-submissions of the provided transactions, but makes no guarantees about the number of
90    /// transactions that have already been submitted or confirmed.
91    pub async fn send<T>(
92        &self,
93        messages: Vec<(T, Message)>,
94        timeout: Option<std::time::Duration>,
95    ) -> Vec<TransactionOutcome<T>> {
96        let (data, messages): (Vec<_>, Vec<_>) = messages.into_iter().unzip();
97        let response_rx = self.queue_messages(messages);
98        wait_for_responses(data, response_rx, timeout, log_progress_bar).await
99    }
100
101    fn queue_messages(&self, messages: Vec<Message>) -> mpsc::UnboundedReceiver<StatusMessage> {
102        let (response_tx, response_rx) = mpsc::unbounded_channel();
103
104        for (index, message) in messages.into_iter().enumerate() {
105            let transaction = Transaction::new_unsigned(message);
106            let res = self
107                .transaction_sender_tx
108                .send(messages::SendTransactionMessage {
109                    span: Span::current(),
110                    index,
111                    transaction,
112                    // This will trigger a "re"-sign, keeping signing logic in one place.
113                    last_valid_block_height: 0,
114                    response_tx: response_tx.clone(),
115                });
116            if res.is_err() {
117                warn!("transaction_sender_rx dropped, can't queue new messages");
118                break;
119            }
120        }
121
122        response_rx
123    }
124}
125
126/// Wait for the submitted transactions to be confirmed, or for a timeout to be reached.
127/// This function will also report the progress of the transactions using the provided closure.
128///
129/// Progress will be checked every second, and any updates in that time will be merged together.
130pub async fn wait_for_responses<T>(
131    data: Vec<T>,
132    mut response_rx: mpsc::UnboundedReceiver<StatusMessage>,
133    timeout: Option<Duration>,
134    report: impl Fn(&[TransactionProgress<T>]),
135) -> Vec<TransactionOutcome<T>> {
136    let num_messages = data.len();
137    // Start with all messages as pending.
138    let mut progress: Vec<_> = data.into_iter().map(TransactionProgress::new).collect();
139    let deadline = optional_timeout_to_deadline(timeout);
140
141    loop {
142        sleep(Duration::from_millis(100)).await;
143
144        // The deadline has to be checked separately because the response_rx could be receiving
145        // messages faster than they're being processed, which means recv_many returns instantly
146        // and never triggers the timeout.
147        if deadline < Instant::now() {
148            break;
149        }
150
151        let mut buffer = Vec::new();
152        match timeout_at(deadline, response_rx.recv_many(&mut buffer, num_messages)).await {
153            Ok(0) => {
154                // If this is ever zero, that means the channel was closed.
155                // This will return the received transactions even if not all of them landed.
156                break;
157            }
158            Err(_) => {
159                // Timeout reached, break out and return what has already been received.
160                break;
161            }
162            _ => {}
163        }
164
165        let mut changed = false;
166        for msg in buffer {
167            if progress[msg.index].landed_as != msg.landed_as {
168                progress[msg.index].landed_as = msg.landed_as;
169                changed = true;
170            }
171            if progress[msg.index].status != msg.status {
172                progress[msg.index].status = msg.status;
173                changed = true;
174            }
175        }
176        if changed {
177            report(&progress);
178        }
179    }
180
181    progress.into_iter().map(Into::into).collect()
182}
183
184/// Converts an optional timeout to a conditionless deadline.
185/// If the timeout is not set, the deadline will be set 30 years in the future.
186fn optional_timeout_to_deadline(timeout: Option<Duration>) -> Instant {
187    timeout
188        .map(|timeout| Instant::now() + timeout)
189        // 30 years in the future is far ahead to be effectively infinite,
190        // but low enough to not overflow on some OSes.
191        .unwrap_or(Instant::now() + Duration::from_secs(60 * 24 * 365 * 30))
192}
193
194fn log_progress_bar<T>(progress: &[TransactionProgress<T>]) {
195    let dots: String = progress
196        .iter()
197        .map(|progress| match progress.status {
198            TransactionStatus::Pending => ' ',
199            TransactionStatus::Processing => '.',
200            TransactionStatus::Committed => 'x',
201            TransactionStatus::Failed(..) => '!',
202        })
203        .join("");
204    info!("[{dots}]");
205}