data_anchor_client/batch_client/
client.rs1use std::sync::Arc;
2
3use anchor_lang::solana_program::message::Message;
4use itertools::Itertools;
5use solana_client::{client_error::ClientError as Error, nonblocking::rpc_client::RpcClient};
6use solana_keypair::Keypair;
7use solana_transaction::Transaction;
8use tokio::{
9 sync::mpsc,
10 time::{Duration, Instant, sleep, timeout_at},
11};
12use tracing::{Span, info, warn};
13
14use super::{
15 channels::Channels,
16 messages::{self, SendTransactionMessage, StatusMessage},
17 tasks::{
18 block_watcher::spawn_block_watcher, transaction_confirmer::spawn_transaction_confirmer,
19 transaction_sender::spawn_transaction_sender,
20 },
21 transaction::{TransactionOutcome, TransactionProgress, TransactionStatus},
22};
23
24pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(1);
26
27pub struct BatchClient {
29 transaction_sender_tx: Arc<mpsc::UnboundedSender<SendTransactionMessage>>,
30}
31
32impl Clone for BatchClient {
34 fn clone(&self) -> Self {
35 Self {
36 transaction_sender_tx: self.transaction_sender_tx.clone(),
37 }
38 }
39}
40
41impl BatchClient {
42 pub async fn new(
45 rpc_client: Arc<RpcClient>,
46 signers: Vec<Arc<Keypair>>,
47 ) -> Result<Self, Error> {
48 let Channels {
49 blockdata_tx,
50 mut blockdata_rx,
51 transaction_confirmer_tx,
52 transaction_confirmer_rx,
53 transaction_sender_tx,
54 transaction_sender_rx,
55 } = Channels::new();
56
57 spawn_block_watcher(blockdata_tx, rpc_client.clone());
58 let _ = blockdata_rx.changed().await;
60
61 spawn_transaction_confirmer(
62 rpc_client.clone(),
63 blockdata_rx.clone(),
64 transaction_sender_tx.downgrade(),
65 transaction_confirmer_tx.downgrade(),
66 transaction_confirmer_rx,
67 );
68
69 spawn_transaction_sender(
70 rpc_client.clone(),
71 signers.clone(),
72 blockdata_rx.clone(),
73 transaction_confirmer_tx.clone(),
74 transaction_sender_tx.downgrade(),
75 transaction_sender_rx,
76 );
77
78 Ok(Self {
79 transaction_sender_tx,
80 })
81 }
82
83 pub async fn send<T>(
92 &self,
93 messages: Vec<(T, Message)>,
94 timeout: Option<std::time::Duration>,
95 ) -> Vec<TransactionOutcome<T>> {
96 let (data, messages): (Vec<_>, Vec<_>) = messages.into_iter().unzip();
97 let response_rx = self.queue_messages(messages);
98 wait_for_responses(data, response_rx, timeout, log_progress_bar).await
99 }
100
101 fn queue_messages(&self, messages: Vec<Message>) -> mpsc::UnboundedReceiver<StatusMessage> {
102 let (response_tx, response_rx) = mpsc::unbounded_channel();
103
104 for (index, message) in messages.into_iter().enumerate() {
105 let transaction = Transaction::new_unsigned(message);
106 let res = self
107 .transaction_sender_tx
108 .send(messages::SendTransactionMessage {
109 span: Span::current(),
110 index,
111 transaction,
112 last_valid_block_height: 0,
114 response_tx: response_tx.clone(),
115 });
116 if res.is_err() {
117 warn!("transaction_sender_rx dropped, can't queue new messages");
118 break;
119 }
120 }
121
122 response_rx
123 }
124}
125
126pub async fn wait_for_responses<T>(
131 data: Vec<T>,
132 mut response_rx: mpsc::UnboundedReceiver<StatusMessage>,
133 timeout: Option<Duration>,
134 report: impl Fn(&[TransactionProgress<T>]),
135) -> Vec<TransactionOutcome<T>> {
136 let num_messages = data.len();
137 let mut progress: Vec<_> = data.into_iter().map(TransactionProgress::new).collect();
139 let deadline = optional_timeout_to_deadline(timeout);
140
141 loop {
142 sleep(Duration::from_millis(100)).await;
143
144 if deadline < Instant::now() {
148 break;
149 }
150
151 let mut buffer = Vec::new();
152 match timeout_at(deadline, response_rx.recv_many(&mut buffer, num_messages)).await {
153 Ok(0) => {
154 break;
157 }
158 Err(_) => {
159 break;
161 }
162 _ => {}
163 }
164
165 let mut changed = false;
166 for msg in buffer {
167 if progress[msg.index].landed_as != msg.landed_as {
168 progress[msg.index].landed_as = msg.landed_as;
169 changed = true;
170 }
171 if progress[msg.index].status != msg.status {
172 progress[msg.index].status = msg.status;
173 changed = true;
174 }
175 }
176 if changed {
177 report(&progress);
178 }
179 }
180
181 progress.into_iter().map(Into::into).collect()
182}
183
184fn optional_timeout_to_deadline(timeout: Option<Duration>) -> Instant {
187 timeout
188 .map(|timeout| Instant::now() + timeout)
189 .unwrap_or(Instant::now() + Duration::from_secs(60 * 24 * 365 * 30))
192}
193
194fn log_progress_bar<T>(progress: &[TransactionProgress<T>]) {
195 let dots: String = progress
196 .iter()
197 .map(|progress| match progress.status {
198 TransactionStatus::Pending => ' ',
199 TransactionStatus::Processing => '.',
200 TransactionStatus::Committed => 'x',
201 TransactionStatus::Failed(..) => '!',
202 })
203 .join("");
204 info!("[{dots}]");
205}