1use std::{
2 collections::BTreeSet,
3 fmt::Debug,
4 future::Future,
5 time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use base58::ToBase58;
10use itertools::{FoldWhile, Itertools};
11pub use solana_client::nonblocking::rpc_client::RpcClient;
12use solana_client::{
13 client_error::ClientError,
14 rpc_client::GetConfirmedSignaturesForAddress2Config,
15 rpc_filter::{Memcmp, RpcFilterType},
16 rpc_request::RpcError,
17};
18pub use solana_sdk::{
19 self,
20 account::Account,
21 commitment_config::CommitmentConfig,
22 pubkey::Pubkey,
23 signature::Signature,
24 transaction::{Transaction, TransactionError},
25};
26use solana_sdk::{clock::UnixTimestamp, hash::Hash};
27use tokio::time;
28use tracing::{instrument, Level};
29
30#[derive(Debug, Clone)]
31pub struct SendContext {
32 pub confirm_duration: Duration,
33 pub confirm_request_pause: Duration,
34 pub blockhash_validation: bool,
35 pub ignorable_errors_count: usize,
36}
37impl Default for SendContext {
38 fn default() -> Self {
39 Self {
40 confirm_duration: Duration::from_secs(60),
41 confirm_request_pause: Duration::from_secs(1),
42 blockhash_validation: true,
43 ignorable_errors_count: 0,
44 }
45 }
46}
47
48#[async_trait]
49pub trait AsyncSendTransaction {
50 async fn get_latest_blockhash(&self) -> Result<Hash, ClientError>;
51
52 async fn send_transaction_with_custom_expectant<Expecter, Fut, TxStatus>(
53 &self,
54 transaction: Transaction,
55 expectant: &Expecter,
56 send_ctx: SendContext,
57 ) -> Result<(Signature, TxStatus), ClientError>
58 where
59 Expecter: Send + Sync + Fn(Signature) -> Fut,
60 TxStatus: Debug + Send,
61 Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>;
62
63 async fn resend_transaction_with_custom_expectant<TransactionBuilder, Expecter, Fut, TxStatus>(
64 &self,
65 transaction_builder: TransactionBuilder,
66 expectant: &Expecter,
67 send_ctx: SendContext,
68 mut resend_count: usize,
69 ) -> Result<(Signature, TxStatus), ClientError>
70 where
71 Expecter: Send + Sync + Fn(Signature) -> Fut,
72 TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction,
73 TxStatus: Debug + Send,
74 Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>,
75 {
76 loop {
77 let tx = transaction_builder(self.get_latest_blockhash().await?);
78
79 match self
80 .send_transaction_with_custom_expectant::<Expecter, Fut, TxStatus>(
81 tx,
82 expectant,
83 send_ctx.clone(),
84 )
85 .await
86 {
87 Ok(result) => break Ok(result),
88 Err(err) if resend_count != 0 => {
89 resend_count -= 1;
90 tracing::warn!(
91 "Error while send transaction: {:?}. Start resend. Resends left: {}",
92 err,
93 resend_count
94 );
95 continue;
96 }
97 Err(err) => break Err(err),
98 }
99 }
100 }
101}
102
103#[async_trait]
104impl AsyncSendTransaction for RpcClient {
105 async fn get_latest_blockhash(&self) -> Result<Hash, ClientError> {
106 self.get_latest_blockhash().await
107 }
108
109 async fn send_transaction_with_custom_expectant<Expecter, Fut, TxStatus>(
110 &self,
111 transaction: Transaction,
112 expectant: &Expecter,
113 mut send_ctx: SendContext,
114 ) -> Result<(Signature, TxStatus), ClientError>
115 where
116 Expecter: Send + Sync + Fn(Signature) -> Fut,
117 TxStatus: Debug + Send,
118 Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>,
119 {
120 let span = tracing::span!(
121 Level::TRACE,
122 "send ",
123 tx = format!("{:?}", transaction.signatures.first()).as_str()
124 );
125 let _guard = span.enter();
126 if send_ctx.blockhash_validation {
127 tracing::trace!(
128 "Blockhash {} validation of transaction {:?} started",
129 transaction.message.recent_blockhash,
130 transaction
131 );
132 match self
133 .is_blockhash_valid(
134 &transaction.message.recent_blockhash,
135 CommitmentConfig::processed(),
136 )
137 .await
138 {
139 Ok(true) => {}
140 Ok(false) => {
141 return Err(RpcError::ForUser(format!(
142 "Transaction {transaction:?} blockhash not found by rpc",
143 ))
144 .into())
145 }
146 Err(err) => {
147 tracing::error!(
148 "Ignore error via blockhash request of {:?} transaction: {:?}. Error ignores left: {}",
149 transaction,
150 err,
151 send_ctx.ignorable_errors_count
152 );
153 return Err(RpcError::ForUser(format!(
154 "Error via transaction {transaction:?} blockhash requesting",
155 ))
156 .into());
157 }
158 }
159 }
160 let signature = self.send_transaction(&transaction).await?;
161
162 let instant = Instant::now();
163 loop {
164 match expectant(signature).await {
165 Ok(None) => {
166 tracing::trace!(
167 "No status via sending {} transaction. Continue waiting",
168 signature
169 );
170 }
171 Ok(Some(status)) => {
172 tracing::trace!(
173 "Status of {} transaction, received: {:?}",
174 signature,
175 status
176 );
177 break Ok((signature, status));
178 }
179 Err(err) if send_ctx.ignorable_errors_count == 0 => {
180 tracing::error!(
181 "Error via status request of {} transaction: {:?}",
182 signature,
183 err,
184 );
185 break Err(err);
186 }
187 Err(err) => {
188 send_ctx.ignorable_errors_count -= 1;
189 tracing::error!(
190 "Ignore error via status request of {} transaction: {:?}. Error ignores left: {}",
191 signature,
192 err,
193 send_ctx.ignorable_errors_count
194 );
195 }
196 }
197
198 if send_ctx.confirm_duration < instant.elapsed() {
199 break Err(RpcError::ForUser(format!(
200 "Unable to confirm transaction {signature}.",
201 ))
202 .into());
203 }
204 time::sleep(send_ctx.confirm_request_pause).await;
205 }
206 }
207}
208
209#[async_trait]
210pub trait AsyncSendTransactionWithSimpleStatus: AsyncSendTransaction {
211 async fn send_transaction_with_simple_status(
212 &self,
213 transaction: Transaction,
214 send_ctx: SendContext,
215 ) -> Result<(Signature, Option<TransactionError>), ClientError>;
216}
217
218#[async_trait]
219impl AsyncSendTransactionWithSimpleStatus for RpcClient {
220 async fn send_transaction_with_simple_status(
221 &self,
222 transaction: Transaction,
223 send_ctx: SendContext,
224 ) -> Result<(Signature, Option<TransactionError>), ClientError> {
225 self.send_transaction_with_custom_expectant(
226 transaction,
227 &|signature: Signature| async move {
228 self.get_signature_status(&signature.clone()).await
229 },
230 send_ctx,
231 )
232 .await
233 .map(|(signature, result_with_status)| (signature, result_with_status.err()))
234 }
235}
236#[async_trait]
237pub trait AsyncResendTransactionWithSimpleStatus: AsyncSendTransaction {
238 async fn resend_transaction_with_simple_status<TransactionBuilder>(
239 &self,
240 transaction_builder: TransactionBuilder,
241 send_ctx: SendContext,
242 resend_count: usize,
243 ) -> Result<(Signature, Option<TransactionError>), ClientError>
244 where
245 TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction;
246}
247
248#[async_trait]
249impl AsyncResendTransactionWithSimpleStatus for RpcClient {
250 async fn resend_transaction_with_simple_status<TransactionBuilder>(
251 &self,
252 transaction_builder: TransactionBuilder,
253 send_ctx: SendContext,
254 resend_count: usize,
255 ) -> Result<(Signature, Option<TransactionError>), ClientError>
256 where
257 TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction,
258 {
259 self.resend_transaction_with_custom_expectant(
260 transaction_builder,
261 &|signature: Signature| async move {
262 self.get_signature_status(&signature.clone()).await
263 },
264 send_ctx,
265 resend_count,
266 )
267 .await
268 .map(|(signature, result_with_status)| (signature, result_with_status.err()))
269 }
270}
271
272pub struct Memory {
273 pub offset: usize,
274 pub bytes: Vec<u8>,
275}
276impl From<Memory> for RpcFilterType {
277 fn from(mem: Memory) -> RpcFilterType {
278 #[allow(deprecated)]
279 RpcFilterType::Memcmp(Memcmp {
280 offset: mem.offset,
281 bytes: solana_client::rpc_filter::MemcmpEncodedBytes::Base58(mem.bytes.to_base58()),
282 encoding: None,
283 })
284 }
285}
286
287#[async_trait]
288pub trait GetProgramAccountsWithBytes {
289 async fn get_program_accounts_with_bytes(
290 &self,
291 program: &Pubkey,
292 bytes: Vec<Memory>,
293 ) -> Result<Vec<(Pubkey, Account)>, ClientError>;
294}
295
296use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
297
298#[async_trait]
299impl GetProgramAccountsWithBytes for RpcClient {
300 async fn get_program_accounts_with_bytes(
301 &self,
302 program_id: &Pubkey,
303 bytes: Vec<Memory>,
304 ) -> Result<Vec<(Pubkey, Account)>, ClientError> {
305 use solana_account_decoder::*;
306 Ok(self
307 .get_program_accounts_with_config(
308 program_id,
309 RpcProgramAccountsConfig {
310 filters: Some(bytes.into_iter().map(RpcFilterType::from).collect()),
311 account_config: RpcAccountInfoConfig {
312 encoding: Some(UiAccountEncoding::Base64),
313 ..Default::default()
314 },
315 with_context: None,
316 },
317 )
318 .await?)
319 }
320}
321
322#[derive(Debug, thiserror::Error)]
323pub enum Error {
324 #[error(transparent)]
325 ClientError(#[from] ClientError),
326 #[error(transparent)]
327 SignatureParseError(#[from] solana_sdk::signature::ParseSignatureError),
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct SignaturesData {
332 pub signature: Signature,
333 pub slot: u64,
334 pub block_time: Option<UnixTimestamp>,
335 pub err: Option<TransactionError>,
336}
337impl PartialOrd for SignaturesData {
338 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
339 match self.slot.partial_cmp(&other.slot) {
340 Some(core::cmp::Ordering::Equal) => {}
341 ord => return ord,
342 }
343 match self.block_time.partial_cmp(&other.block_time) {
344 Some(core::cmp::Ordering::Equal) => {}
345 ord => return ord,
346 }
347 self.signature.partial_cmp(&other.signature)
348 }
349}
350impl Ord for SignaturesData {
351 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
352 self.partial_cmp(other).unwrap_or(std::cmp::Ordering::Equal)
353 }
354}
355
356#[async_trait]
357pub trait GetTransactionsSignaturesForAddress {
358 async fn get_signatures_for_address_with_config(
359 &self,
360 address: &Pubkey,
361 commitment_config: CommitmentConfig,
362 until: Option<Signature>,
363 ) -> Result<Vec<Signature>, Error> {
364 Ok(self
365 .get_signatures_data_for_address_with_config(address, commitment_config, until)
366 .await?
367 .into_iter()
368 .filter(|data| data.err.is_none())
369 .map(|data| data.signature)
370 .collect())
371 }
372 async fn get_signatures_data_for_address_with_config(
373 &self,
374 address: &Pubkey,
375 commitment_config: CommitmentConfig,
376 until: Option<Signature>,
377 ) -> Result<BTreeSet<SignaturesData>, Error>;
378}
379
380#[async_trait]
381impl GetTransactionsSignaturesForAddress for RpcClient {
382 #[instrument(skip(self))]
383 async fn get_signatures_data_for_address_with_config(
384 &self,
385 address: &Pubkey,
386 commitment_config: CommitmentConfig,
387 until: Option<Signature>,
388 ) -> Result<BTreeSet<SignaturesData>, Error> {
389 let mut all_signatures = BTreeSet::new();
390 let mut before = None;
391
392 loop {
393 tracing::trace!(
394 "Request signature batch, before: {:?}, until: {:?}",
395 before,
396 until
397 );
398
399 let signatures_batch = self
400 .get_signatures_for_address_with_config(
401 address,
402 GetConfirmedSignaturesForAddress2Config {
403 before,
404 until,
405 limit: Some(1000),
406 commitment: Some(commitment_config),
407 },
408 )
409 .await
410 .map_err(|err| {
411 tracing::error!(
412 "Error while get signature for address with config: {:?}",
413 err
414 );
415 err
416 })?
417 .into_iter()
418 .map(|tx| {
419 Ok(SignaturesData {
420 signature: tx.signature.parse()?,
421 slot: tx.slot,
422 block_time: tx.block_time,
423 err: tx.err,
424 })
425 })
426 .collect::<Result<Vec<_>, Error>>()?;
427
428 if signatures_batch.is_empty() {
429 break;
430 }
431 tracing::trace!("Batch received: {}", signatures_batch.len());
432
433 before = signatures_batch
434 .iter()
435 .rev()
436 .fold_while(
437 None,
438 |resync_border_tx, signature_data| match resync_border_tx {
439 None => FoldWhile::Continue(Some(signature_data)),
440 Some(resync_border) => {
441 if resync_border.slot != signature_data.slot {
442 FoldWhile::Done(Some(signature_data))
443 } else {
444 FoldWhile::Continue(Some(resync_border))
445 }
446 }
447 },
448 )
449 .into_inner()
450 .map(|d| d.signature);
451
452 let batch_len_before = signatures_batch
453 .iter()
454 .map(|b| b.slot)
455 .all_equal()
456 .then_some(all_signatures.len());
457
458 signatures_batch.into_iter().for_each(|s| {
459 all_signatures.insert(s);
460 });
461
462 if matches!(
463 batch_len_before,
464 Some(before_len) if before_len == all_signatures.len()
465 ) {
466 break;
467 }
468
469 tracing::trace!("All signatures: {}", all_signatures.len());
470 }
471
472 Ok(all_signatures)
473 }
474}