nitro_da_client/batch_client/
client.rs

1use std::sync::Arc;
2
3use itertools::Itertools;
4use solana_client::{client_error::ClientError as Error, nonblocking::rpc_client::RpcClient};
5use solana_sdk::{message::Message, signer::keypair::Keypair, transaction::Transaction};
6use tokio::{
7    sync::mpsc,
8    time::{sleep, timeout_at, Duration, Instant},
9};
10use tracing::{info, warn, Span};
11
12use super::{
13    channels::Channels,
14    messages::{self, SendTransactionMessage, StatusMessage},
15    tasks::{
16        block_watcher::spawn_block_watcher, transaction_confirmer::spawn_transaction_confirmer,
17        transaction_sender::spawn_transaction_sender,
18    },
19    transaction::{TransactionOutcome, TransactionProgress, TransactionStatus},
20};
21
22/// Send at ~333 TPS
23pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(1);
24
25/// A client that wraps an [`RpcClient`] and uses it to submit batches of transactions.
26pub struct BatchClient {
27    transaction_sender_tx: Arc<mpsc::UnboundedSender<SendTransactionMessage>>,
28}
29
30// Clone can't be derived because of the phantom references to the TPU implementation details.
31impl Clone for BatchClient {
32    fn clone(&self) -> Self {
33        Self {
34            transaction_sender_tx: self.transaction_sender_tx.clone(),
35        }
36    }
37}
38
39impl BatchClient {
40    /// Creates a new [`BatchClient`], and spawns the associated background tasks. The background
41    /// tasks will run until the [`BatchClient`] is dropped.
42    pub async fn new(
43        rpc_client: Arc<RpcClient>,
44        signers: Vec<Arc<Keypair>>,
45    ) -> Result<Self, Error> {
46        let Channels {
47            blockdata_tx,
48            mut blockdata_rx,
49            transaction_confirmer_tx,
50            transaction_confirmer_rx,
51            transaction_sender_tx,
52            transaction_sender_rx,
53        } = Channels::new();
54
55        spawn_block_watcher(blockdata_tx, rpc_client.clone());
56        // Wait for the first update so the default value is never visible.
57        let _ = blockdata_rx.changed().await;
58
59        spawn_transaction_confirmer(
60            rpc_client.clone(),
61            blockdata_rx.clone(),
62            transaction_sender_tx.downgrade(),
63            transaction_confirmer_tx.downgrade(),
64            transaction_confirmer_rx,
65        );
66
67        spawn_transaction_sender(
68            rpc_client.clone(),
69            signers.clone(),
70            blockdata_rx.clone(),
71            transaction_confirmer_tx.clone(),
72            transaction_sender_tx.downgrade(),
73            transaction_sender_rx,
74        );
75
76        Ok(Self {
77            transaction_sender_tx,
78        })
79    }
80
81    /// Queue a batch of transactions to be sent to the network. An attempt will be made to submit
82    /// the transactions in the provided order, they can be reordered, especially in case of
83    /// re-submissions. The client will re-submit the transactions until they are successfully
84    /// confirmed or the timeout is reached, if one is provided.
85    ///
86    /// Cancel safety: Dropping the future returned by this method will stop any further
87    /// re-submissions of the provided transactions, but makes no guarantees about the number of
88    /// transactions that have already been submitted or confirmed.
89    pub async fn send<T>(
90        &self,
91        messages: Vec<(T, Message)>,
92        timeout: Option<std::time::Duration>,
93    ) -> Vec<TransactionOutcome<T>> {
94        let (data, messages): (Vec<_>, Vec<_>) = messages.into_iter().unzip();
95        let response_rx = self.queue_messages(messages);
96        wait_for_responses(data, response_rx, timeout, log_progress_bar).await
97    }
98
99    fn queue_messages(&self, messages: Vec<Message>) -> mpsc::UnboundedReceiver<StatusMessage> {
100        let (response_tx, response_rx) = mpsc::unbounded_channel();
101
102        for (index, message) in messages.into_iter().enumerate() {
103            let transaction = Transaction::new_unsigned(message);
104            let res = self
105                .transaction_sender_tx
106                .send(messages::SendTransactionMessage {
107                    span: Span::current(),
108                    index,
109                    transaction,
110                    // This will trigger a "re"-sign, keeping signing logic in one place.
111                    last_valid_block_height: 0,
112                    response_tx: response_tx.clone(),
113                });
114            if res.is_err() {
115                warn!("transaction_sender_rx dropped, can't queue new messages");
116                break;
117            }
118        }
119
120        response_rx
121    }
122}
123
124/// Wait for the submitted transactions to be confirmed, or for a timeout to be reached.
125/// This function will also report the progress of the transactions using the provided closure.
126///
127/// Progress will be checked every second, and any updates in that time will be merged together.
128pub async fn wait_for_responses<T>(
129    data: Vec<T>,
130    mut response_rx: mpsc::UnboundedReceiver<StatusMessage>,
131    timeout: Option<Duration>,
132    report: impl Fn(&[TransactionProgress<T>]),
133) -> Vec<TransactionOutcome<T>> {
134    let num_messages = data.len();
135    // Start with all messages as pending.
136    let mut progress: Vec<_> = data.into_iter().map(TransactionProgress::new).collect();
137    let deadline = optional_timeout_to_deadline(timeout);
138
139    loop {
140        sleep(Duration::from_millis(100)).await;
141
142        // The deadline has to be checked separately because the response_rx could be receiving
143        // messages faster than they're being processed, which means recv_many returns instantly
144        // and never triggers the timeout.
145        if deadline < Instant::now() {
146            break;
147        }
148
149        let mut buffer = Vec::new();
150        match timeout_at(deadline, response_rx.recv_many(&mut buffer, num_messages)).await {
151            Ok(0) => {
152                // If this is ever zero, that means the channel was closed.
153                // This will return the received transactions even if not all of them landed.
154                break;
155            }
156            Err(_) => {
157                // Timeout reached, break out and return what has already been received.
158                break;
159            }
160            _ => {}
161        }
162
163        let mut changed = false;
164        for msg in buffer {
165            if progress[msg.index].landed_as != msg.landed_as {
166                progress[msg.index].landed_as = msg.landed_as;
167                changed = true;
168            }
169            if progress[msg.index].status != msg.status {
170                progress[msg.index].status = msg.status;
171                changed = true;
172            }
173        }
174        if changed {
175            report(&progress);
176        }
177    }
178
179    progress.into_iter().map(Into::into).collect()
180}
181
182/// Converts an optional timeout to a conditionless deadline.
183/// If the timeout is not set, the deadline will be set 30 years in the future.
184fn optional_timeout_to_deadline(timeout: Option<Duration>) -> Instant {
185    timeout
186        .map(|timeout| Instant::now() + timeout)
187        // 30 years in the future is far ahead to be effectively infinite,
188        // but low enough to not overflow on some OSes.
189        .unwrap_or(Instant::now() + Duration::from_secs(60 * 24 * 365 * 30))
190}
191
192fn log_progress_bar<T>(progress: &[TransactionProgress<T>]) {
193    let dots: String = progress
194        .iter()
195        .map(|progress| match progress.status {
196            TransactionStatus::Pending => ' ',
197            TransactionStatus::Processing => '.',
198            TransactionStatus::Committed => 'x',
199            TransactionStatus::Failed(..) => '!',
200        })
201        .join("");
202    info!("[{dots}]");
203}