nitro_da_client/batch_client/
client.rs1use std::sync::Arc;
2
3use itertools::Itertools;
4use solana_client::{client_error::ClientError as Error, nonblocking::rpc_client::RpcClient};
5use solana_sdk::{message::Message, signer::keypair::Keypair, transaction::Transaction};
6use tokio::{
7 sync::mpsc,
8 time::{sleep, timeout_at, Duration, Instant},
9};
10use tracing::{info, warn, Span};
11
12use super::{
13 channels::Channels,
14 messages::{self, SendTransactionMessage, StatusMessage},
15 tasks::{
16 block_watcher::spawn_block_watcher, transaction_confirmer::spawn_transaction_confirmer,
17 transaction_sender::spawn_transaction_sender,
18 },
19 transaction::{TransactionOutcome, TransactionProgress, TransactionStatus},
20};
21
22pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(1);
24
25pub struct BatchClient {
27 transaction_sender_tx: Arc<mpsc::UnboundedSender<SendTransactionMessage>>,
28}
29
30impl Clone for BatchClient {
32 fn clone(&self) -> Self {
33 Self {
34 transaction_sender_tx: self.transaction_sender_tx.clone(),
35 }
36 }
37}
38
39impl BatchClient {
40 pub async fn new(
43 rpc_client: Arc<RpcClient>,
44 signers: Vec<Arc<Keypair>>,
45 ) -> Result<Self, Error> {
46 let Channels {
47 blockdata_tx,
48 mut blockdata_rx,
49 transaction_confirmer_tx,
50 transaction_confirmer_rx,
51 transaction_sender_tx,
52 transaction_sender_rx,
53 } = Channels::new();
54
55 spawn_block_watcher(blockdata_tx, rpc_client.clone());
56 let _ = blockdata_rx.changed().await;
58
59 spawn_transaction_confirmer(
60 rpc_client.clone(),
61 blockdata_rx.clone(),
62 transaction_sender_tx.downgrade(),
63 transaction_confirmer_tx.downgrade(),
64 transaction_confirmer_rx,
65 );
66
67 spawn_transaction_sender(
68 rpc_client.clone(),
69 signers.clone(),
70 blockdata_rx.clone(),
71 transaction_confirmer_tx.clone(),
72 transaction_sender_tx.downgrade(),
73 transaction_sender_rx,
74 );
75
76 Ok(Self {
77 transaction_sender_tx,
78 })
79 }
80
81 pub async fn send<T>(
90 &self,
91 messages: Vec<(T, Message)>,
92 timeout: Option<std::time::Duration>,
93 ) -> Vec<TransactionOutcome<T>> {
94 let (data, messages): (Vec<_>, Vec<_>) = messages.into_iter().unzip();
95 let response_rx = self.queue_messages(messages);
96 wait_for_responses(data, response_rx, timeout, log_progress_bar).await
97 }
98
99 fn queue_messages(&self, messages: Vec<Message>) -> mpsc::UnboundedReceiver<StatusMessage> {
100 let (response_tx, response_rx) = mpsc::unbounded_channel();
101
102 for (index, message) in messages.into_iter().enumerate() {
103 let transaction = Transaction::new_unsigned(message);
104 let res = self
105 .transaction_sender_tx
106 .send(messages::SendTransactionMessage {
107 span: Span::current(),
108 index,
109 transaction,
110 last_valid_block_height: 0,
112 response_tx: response_tx.clone(),
113 });
114 if res.is_err() {
115 warn!("transaction_sender_rx dropped, can't queue new messages");
116 break;
117 }
118 }
119
120 response_rx
121 }
122}
123
124pub async fn wait_for_responses<T>(
129 data: Vec<T>,
130 mut response_rx: mpsc::UnboundedReceiver<StatusMessage>,
131 timeout: Option<Duration>,
132 report: impl Fn(&[TransactionProgress<T>]),
133) -> Vec<TransactionOutcome<T>> {
134 let num_messages = data.len();
135 let mut progress: Vec<_> = data.into_iter().map(TransactionProgress::new).collect();
137 let deadline = optional_timeout_to_deadline(timeout);
138
139 loop {
140 sleep(Duration::from_millis(100)).await;
141
142 if deadline < Instant::now() {
146 break;
147 }
148
149 let mut buffer = Vec::new();
150 match timeout_at(deadline, response_rx.recv_many(&mut buffer, num_messages)).await {
151 Ok(0) => {
152 break;
155 }
156 Err(_) => {
157 break;
159 }
160 _ => {}
161 }
162
163 let mut changed = false;
164 for msg in buffer {
165 if progress[msg.index].landed_as != msg.landed_as {
166 progress[msg.index].landed_as = msg.landed_as;
167 changed = true;
168 }
169 if progress[msg.index].status != msg.status {
170 progress[msg.index].status = msg.status;
171 changed = true;
172 }
173 }
174 if changed {
175 report(&progress);
176 }
177 }
178
179 progress.into_iter().map(Into::into).collect()
180}
181
182fn optional_timeout_to_deadline(timeout: Option<Duration>) -> Instant {
185 timeout
186 .map(|timeout| Instant::now() + timeout)
187 .unwrap_or(Instant::now() + Duration::from_secs(60 * 24 * 365 * 30))
190}
191
192fn log_progress_bar<T>(progress: &[TransactionProgress<T>]) {
193 let dots: String = progress
194 .iter()
195 .map(|progress| match progress.status {
196 TransactionStatus::Pending => ' ',
197 TransactionStatus::Processing => '.',
198 TransactionStatus::Committed => 'x',
199 TransactionStatus::Failed(..) => '!',
200 })
201 .join("");
202 info!("[{dots}]");
203}