de_solana_client/
lib.rs

1use std::{
2    collections::BTreeSet,
3    fmt::Debug,
4    future::Future,
5    time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use base58::ToBase58;
10use itertools::{FoldWhile, Itertools};
11pub use solana_client::nonblocking::rpc_client::RpcClient;
12use solana_client::{
13    client_error::ClientError,
14    rpc_client::GetConfirmedSignaturesForAddress2Config,
15    rpc_filter::{Memcmp, RpcFilterType},
16    rpc_request::RpcError,
17};
18pub use solana_sdk::{
19    self,
20    account::Account,
21    commitment_config::CommitmentConfig,
22    pubkey::Pubkey,
23    signature::Signature,
24    transaction::{Transaction, TransactionError},
25};
26use solana_sdk::{clock::UnixTimestamp, hash::Hash};
27use tokio::time;
28use tracing::{instrument, Level};
29
30#[derive(Debug, Clone)]
31pub struct SendContext {
32    pub confirm_duration: Duration,
33    pub confirm_request_pause: Duration,
34    pub blockhash_validation: bool,
35    pub ignorable_errors_count: usize,
36}
37impl Default for SendContext {
38    fn default() -> Self {
39        Self {
40            confirm_duration: Duration::from_secs(60),
41            confirm_request_pause: Duration::from_secs(1),
42            blockhash_validation: true,
43            ignorable_errors_count: 0,
44        }
45    }
46}
47
48#[async_trait]
49pub trait AsyncSendTransaction {
50    async fn get_latest_blockhash(&self) -> Result<Hash, ClientError>;
51
52    async fn send_transaction_with_custom_expectant<Expecter, Fut, TxStatus>(
53        &self,
54        transaction: Transaction,
55        expectant: &Expecter,
56        send_ctx: SendContext,
57    ) -> Result<(Signature, TxStatus), ClientError>
58    where
59        Expecter: Send + Sync + Fn(Signature) -> Fut,
60        TxStatus: Debug + Send,
61        Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>;
62
63    async fn resend_transaction_with_custom_expectant<TransactionBuilder, Expecter, Fut, TxStatus>(
64        &self,
65        transaction_builder: TransactionBuilder,
66        expectant: &Expecter,
67        send_ctx: SendContext,
68        mut resend_count: usize,
69    ) -> Result<(Signature, TxStatus), ClientError>
70    where
71        Expecter: Send + Sync + Fn(Signature) -> Fut,
72        TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction,
73        TxStatus: Debug + Send,
74        Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>,
75    {
76        loop {
77            let tx = transaction_builder(self.get_latest_blockhash().await?);
78
79            match self
80                .send_transaction_with_custom_expectant::<Expecter, Fut, TxStatus>(
81                    tx,
82                    expectant,
83                    send_ctx.clone(),
84                )
85                .await
86            {
87                Ok(result) => break Ok(result),
88                Err(err) if resend_count != 0 => {
89                    resend_count -= 1;
90                    tracing::warn!(
91                        "Error while send transaction: {:?}. Start resend. Resends left: {}",
92                        err,
93                        resend_count
94                    );
95                    continue;
96                }
97                Err(err) => break Err(err),
98            }
99        }
100    }
101}
102
103#[async_trait]
104impl AsyncSendTransaction for RpcClient {
105    async fn get_latest_blockhash(&self) -> Result<Hash, ClientError> {
106        self.get_latest_blockhash().await
107    }
108
109    async fn send_transaction_with_custom_expectant<Expecter, Fut, TxStatus>(
110        &self,
111        transaction: Transaction,
112        expectant: &Expecter,
113        mut send_ctx: SendContext,
114    ) -> Result<(Signature, TxStatus), ClientError>
115    where
116        Expecter: Send + Sync + Fn(Signature) -> Fut,
117        TxStatus: Debug + Send,
118        Fut: Send + Future<Output = Result<Option<TxStatus>, ClientError>>,
119    {
120        let span = tracing::span!(
121            Level::TRACE,
122            "send ",
123            tx = format!("{:?}", transaction.signatures.first()).as_str()
124        );
125        let _guard = span.enter();
126        if send_ctx.blockhash_validation {
127            tracing::trace!(
128                "Blockhash {} validation of transaction {:?} started",
129                transaction.message.recent_blockhash,
130                transaction
131            );
132            match self
133                .is_blockhash_valid(
134                    &transaction.message.recent_blockhash,
135                    CommitmentConfig::processed(),
136                )
137                .await
138            {
139                Ok(true) => {}
140                Ok(false) => {
141                    return Err(RpcError::ForUser(format!(
142                        "Transaction {transaction:?} blockhash not found by rpc",
143                    ))
144                    .into())
145                }
146                Err(err) => {
147                    tracing::error!(
148                        "Ignore error via blockhash request of {:?} transaction: {:?}. Error ignores left: {}",
149                        transaction,
150                        err,
151                        send_ctx.ignorable_errors_count
152                    );
153                    return Err(RpcError::ForUser(format!(
154                        "Error via transaction {transaction:?} blockhash requesting",
155                    ))
156                    .into());
157                }
158            }
159        }
160        let signature = self.send_transaction(&transaction).await?;
161
162        let instant = Instant::now();
163        loop {
164            match expectant(signature).await {
165                Ok(None) => {
166                    tracing::trace!(
167                        "No status via sending {} transaction. Continue waiting",
168                        signature
169                    );
170                }
171                Ok(Some(status)) => {
172                    tracing::trace!(
173                        "Status of {} transaction, received: {:?}",
174                        signature,
175                        status
176                    );
177                    break Ok((signature, status));
178                }
179                Err(err) if send_ctx.ignorable_errors_count == 0 => {
180                    tracing::error!(
181                        "Error via status request of {} transaction: {:?}",
182                        signature,
183                        err,
184                    );
185                    break Err(err);
186                }
187                Err(err) => {
188                    send_ctx.ignorable_errors_count -= 1;
189                    tracing::error!(
190                        "Ignore error via status request of {} transaction: {:?}. Error ignores left: {}",
191                        signature,
192                        err,
193                        send_ctx.ignorable_errors_count
194                    );
195                }
196            }
197
198            if send_ctx.confirm_duration < instant.elapsed() {
199                break Err(RpcError::ForUser(format!(
200                    "Unable to confirm transaction {signature}.",
201                ))
202                .into());
203            }
204            time::sleep(send_ctx.confirm_request_pause).await;
205        }
206    }
207}
208
209#[async_trait]
210pub trait AsyncSendTransactionWithSimpleStatus: AsyncSendTransaction {
211    async fn send_transaction_with_simple_status(
212        &self,
213        transaction: Transaction,
214        send_ctx: SendContext,
215    ) -> Result<(Signature, Option<TransactionError>), ClientError>;
216}
217
218#[async_trait]
219impl AsyncSendTransactionWithSimpleStatus for RpcClient {
220    async fn send_transaction_with_simple_status(
221        &self,
222        transaction: Transaction,
223        send_ctx: SendContext,
224    ) -> Result<(Signature, Option<TransactionError>), ClientError> {
225        self.send_transaction_with_custom_expectant(
226            transaction,
227            &|signature: Signature| async move {
228                self.get_signature_status(&signature.clone()).await
229            },
230            send_ctx,
231        )
232        .await
233        .map(|(signature, result_with_status)| (signature, result_with_status.err()))
234    }
235}
236#[async_trait]
237pub trait AsyncResendTransactionWithSimpleStatus: AsyncSendTransaction {
238    async fn resend_transaction_with_simple_status<TransactionBuilder>(
239        &self,
240        transaction_builder: TransactionBuilder,
241        send_ctx: SendContext,
242        resend_count: usize,
243    ) -> Result<(Signature, Option<TransactionError>), ClientError>
244    where
245        TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction;
246}
247
248#[async_trait]
249impl AsyncResendTransactionWithSimpleStatus for RpcClient {
250    async fn resend_transaction_with_simple_status<TransactionBuilder>(
251        &self,
252        transaction_builder: TransactionBuilder,
253        send_ctx: SendContext,
254        resend_count: usize,
255    ) -> Result<(Signature, Option<TransactionError>), ClientError>
256    where
257        TransactionBuilder: Send + Sync + Fn(Hash) -> Transaction,
258    {
259        self.resend_transaction_with_custom_expectant(
260            transaction_builder,
261            &|signature: Signature| async move {
262                self.get_signature_status(&signature.clone()).await
263            },
264            send_ctx,
265            resend_count,
266        )
267        .await
268        .map(|(signature, result_with_status)| (signature, result_with_status.err()))
269    }
270}
271
272pub struct Memory {
273    pub offset: usize,
274    pub bytes: Vec<u8>,
275}
276impl From<Memory> for RpcFilterType {
277    fn from(mem: Memory) -> RpcFilterType {
278        #[allow(deprecated)]
279        RpcFilterType::Memcmp(Memcmp {
280            offset: mem.offset,
281            bytes: solana_client::rpc_filter::MemcmpEncodedBytes::Base58(mem.bytes.to_base58()),
282            encoding: None,
283        })
284    }
285}
286
287#[async_trait]
288pub trait GetProgramAccountsWithBytes {
289    async fn get_program_accounts_with_bytes(
290        &self,
291        program: &Pubkey,
292        bytes: Vec<Memory>,
293    ) -> Result<Vec<(Pubkey, Account)>, ClientError>;
294}
295
296use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
297
298#[async_trait]
299impl GetProgramAccountsWithBytes for RpcClient {
300    async fn get_program_accounts_with_bytes(
301        &self,
302        program_id: &Pubkey,
303        bytes: Vec<Memory>,
304    ) -> Result<Vec<(Pubkey, Account)>, ClientError> {
305        use solana_account_decoder::*;
306        Ok(self
307            .get_program_accounts_with_config(
308                program_id,
309                RpcProgramAccountsConfig {
310                    filters: Some(bytes.into_iter().map(RpcFilterType::from).collect()),
311                    account_config: RpcAccountInfoConfig {
312                        encoding: Some(UiAccountEncoding::Base64),
313                        ..Default::default()
314                    },
315                    with_context: None,
316                },
317            )
318            .await?)
319    }
320}
321
322#[derive(Debug, thiserror::Error)]
323pub enum Error {
324    #[error(transparent)]
325    ClientError(#[from] ClientError),
326    #[error(transparent)]
327    SignatureParseError(#[from] solana_sdk::signature::ParseSignatureError),
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct SignaturesData {
332    pub signature: Signature,
333    pub slot: u64,
334    pub block_time: Option<UnixTimestamp>,
335    pub err: Option<TransactionError>,
336}
337impl PartialOrd for SignaturesData {
338    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
339        match self.slot.partial_cmp(&other.slot) {
340            Some(core::cmp::Ordering::Equal) => {}
341            ord => return ord,
342        }
343        match self.block_time.partial_cmp(&other.block_time) {
344            Some(core::cmp::Ordering::Equal) => {}
345            ord => return ord,
346        }
347        self.signature.partial_cmp(&other.signature)
348    }
349}
350impl Ord for SignaturesData {
351    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
352        self.partial_cmp(other).unwrap_or(std::cmp::Ordering::Equal)
353    }
354}
355
356#[async_trait]
357pub trait GetTransactionsSignaturesForAddress {
358    async fn get_signatures_for_address_with_config(
359        &self,
360        address: &Pubkey,
361        commitment_config: CommitmentConfig,
362        until: Option<Signature>,
363    ) -> Result<Vec<Signature>, Error> {
364        Ok(self
365            .get_signatures_data_for_address_with_config(address, commitment_config, until)
366            .await?
367            .into_iter()
368            .filter(|data| data.err.is_none())
369            .map(|data| data.signature)
370            .collect())
371    }
372    async fn get_signatures_data_for_address_with_config(
373        &self,
374        address: &Pubkey,
375        commitment_config: CommitmentConfig,
376        until: Option<Signature>,
377    ) -> Result<BTreeSet<SignaturesData>, Error>;
378}
379
380#[async_trait]
381impl GetTransactionsSignaturesForAddress for RpcClient {
382    #[instrument(skip(self))]
383    async fn get_signatures_data_for_address_with_config(
384        &self,
385        address: &Pubkey,
386        commitment_config: CommitmentConfig,
387        until: Option<Signature>,
388    ) -> Result<BTreeSet<SignaturesData>, Error> {
389        let mut all_signatures = BTreeSet::new();
390        let mut before = None;
391
392        loop {
393            tracing::trace!(
394                "Request signature batch, before: {:?}, until: {:?}",
395                before,
396                until
397            );
398
399            let signatures_batch = self
400                .get_signatures_for_address_with_config(
401                    address,
402                    GetConfirmedSignaturesForAddress2Config {
403                        before,
404                        until,
405                        limit: Some(1000),
406                        commitment: Some(commitment_config),
407                    },
408                )
409                .await
410                .map_err(|err| {
411                    tracing::error!(
412                        "Error while get signature for address with config: {:?}",
413                        err
414                    );
415                    err
416                })?
417                .into_iter()
418                .map(|tx| {
419                    Ok(SignaturesData {
420                        signature: tx.signature.parse()?,
421                        slot: tx.slot,
422                        block_time: tx.block_time,
423                        err: tx.err,
424                    })
425                })
426                .collect::<Result<Vec<_>, Error>>()?;
427
428            if signatures_batch.is_empty() {
429                break;
430            }
431            tracing::trace!("Batch received: {}", signatures_batch.len());
432
433            before = signatures_batch
434                .iter()
435                .rev()
436                .fold_while(
437                    None,
438                    |resync_border_tx, signature_data| match resync_border_tx {
439                        None => FoldWhile::Continue(Some(signature_data)),
440                        Some(resync_border) => {
441                            if resync_border.slot != signature_data.slot {
442                                FoldWhile::Done(Some(signature_data))
443                            } else {
444                                FoldWhile::Continue(Some(resync_border))
445                            }
446                        }
447                    },
448                )
449                .into_inner()
450                .map(|d| d.signature);
451
452            let batch_len_before = signatures_batch
453                .iter()
454                .map(|b| b.slot)
455                .all_equal()
456                .then_some(all_signatures.len());
457
458            signatures_batch.into_iter().for_each(|s| {
459                all_signatures.insert(s);
460            });
461
462            if matches!(
463                batch_len_before,
464                Some(before_len) if before_len == all_signatures.len()
465            ) {
466                break;
467            }
468
469            tracing::trace!("All signatures: {}", all_signatures.len());
470        }
471
472        Ok(all_signatures)
473    }
474}