light_client/rpc/
solana_rpc.rs

1use crate::rpc::errors::RpcError;
2use crate::rpc::rpc_connection::RpcConnection;
3use crate::transaction_params::TransactionParams;
4use async_trait::async_trait;
5use borsh::BorshDeserialize;
6use log::warn;
7use solana_client::rpc_client::RpcClient;
8use solana_client::rpc_config::{RpcSendTransactionConfig, RpcTransactionConfig};
9use solana_program::clock::Slot;
10use solana_program::hash::Hash;
11use solana_program::pubkey::Pubkey;
12use solana_sdk::account::{Account, AccountSharedData};
13use solana_sdk::bs58;
14use solana_sdk::clock::UnixTimestamp;
15use solana_sdk::commitment_config::CommitmentConfig;
16use solana_sdk::epoch_info::EpochInfo;
17use solana_sdk::instruction::Instruction;
18use solana_sdk::signature::{Keypair, Signature};
19use solana_sdk::transaction::Transaction;
20use solana_transaction_status::option_serializer::OptionSerializer;
21use solana_transaction_status::{UiInstruction, UiTransactionEncoding};
22use std::fmt::{Debug, Display, Formatter};
23use std::time::Duration;
24use tokio::time::{sleep, Instant};
25
26pub enum SolanaRpcUrl {
27    Testnet,
28    Devnet,
29    Localnet,
30    ZKTestnet,
31    Custom(String),
32}
33
34impl Display for SolanaRpcUrl {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        let str = match self {
37            SolanaRpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
38            SolanaRpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
39            SolanaRpcUrl::Localnet => "http://localhost:8899".to_string(),
40            SolanaRpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
41            SolanaRpcUrl::Custom(url) => url.clone(),
42        };
43        write!(f, "{}", str)
44    }
45}
46
47#[derive(Clone, Debug, Copy)]
48pub struct RetryConfig {
49    pub max_retries: u32,
50    pub retry_delay: Duration,
51    pub timeout: Duration,
52}
53
54impl Default for RetryConfig {
55    fn default() -> Self {
56        RetryConfig {
57            max_retries: 10,
58            retry_delay: Duration::from_millis(100),
59            timeout: Duration::from_secs(60),
60        }
61    }
62}
63
64#[allow(dead_code)]
65pub struct SolanaRpcConnection {
66    pub client: RpcClient,
67    pub payer: Keypair,
68    retry_config: RetryConfig,
69}
70
71impl Debug for SolanaRpcConnection {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "SolanaRpcConnection {{ client: {:?} }}",
76            self.client.url()
77        )
78    }
79}
80
81impl SolanaRpcConnection {
82    pub fn new_with_retry<U: ToString>(
83        url: U,
84        commitment_config: Option<CommitmentConfig>,
85        retry_config: Option<RetryConfig>,
86    ) -> Self {
87        let payer = Keypair::new();
88        let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed());
89        let client = RpcClient::new_with_commitment(url.to_string(), commitment_config);
90        let retry_config = retry_config.unwrap_or_default();
91        Self {
92            client,
93            payer,
94            retry_config,
95        }
96    }
97
98    async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
99    where
100        F: Fn() -> Fut,
101        Fut: std::future::Future<Output = Result<T, RpcError>>,
102    {
103        let mut attempts = 0;
104        let start_time = Instant::now();
105        loop {
106            match operation().await {
107                Ok(result) => return Ok(result),
108                Err(e) => {
109                    attempts += 1;
110                    if attempts >= self.retry_config.max_retries
111                        || start_time.elapsed() >= self.retry_config.timeout
112                    {
113                        return Err(e);
114                    }
115                    warn!(
116                        "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
117                        self.retry_config.retry_delay, attempts, self.retry_config.max_retries, e
118                    );
119                    sleep(self.retry_config.retry_delay).await;
120                }
121            }
122        }
123    }
124}
125
126impl SolanaRpcConnection {
127    fn parse_inner_instructions<T: BorshDeserialize>(
128        &self,
129        signature: Signature,
130    ) -> Result<T, RpcError> {
131        let rpc_transaction_config = RpcTransactionConfig {
132            encoding: Some(UiTransactionEncoding::Base64),
133            commitment: Some(self.client.commitment()),
134            ..Default::default()
135        };
136        let transaction = self
137            .client
138            .get_transaction_with_config(&signature, rpc_transaction_config)
139            .map_err(|e| RpcError::CustomError(e.to_string()))?;
140        let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
141            RpcError::CustomError("Transaction missing metadata information".to_string())
142        })?;
143        if meta.status.is_err() {
144            return Err(RpcError::CustomError(
145                "Transaction status indicates an error".to_string(),
146            ));
147        }
148
149        let inner_instructions = match &meta.inner_instructions {
150            OptionSerializer::Some(i) => i,
151            OptionSerializer::None => {
152                return Err(RpcError::CustomError(
153                    "No inner instructions found".to_string(),
154                ));
155            }
156            OptionSerializer::Skip => {
157                return Err(RpcError::CustomError(
158                    "No inner instructions found".to_string(),
159                ));
160            }
161        };
162
163        for ix in inner_instructions.iter() {
164            for ui_instruction in ix.instructions.iter() {
165                match ui_instruction {
166                    UiInstruction::Compiled(ui_compiled_instruction) => {
167                        let data = bs58::decode(&ui_compiled_instruction.data)
168                            .into_vec()
169                            .map_err(|_| {
170                                RpcError::CustomError(
171                                    "Failed to decode instruction data".to_string(),
172                                )
173                            })?;
174
175                        if let Ok(parsed_data) = T::try_from_slice(data.as_slice()) {
176                            return Ok(parsed_data);
177                        }
178                    }
179                    UiInstruction::Parsed(_) => {
180                        println!("Parsed instructions are not implemented yet");
181                    }
182                }
183            }
184        }
185        Err(RpcError::CustomError(
186            "Failed to find any parseable inner instructions".to_string(),
187        ))
188    }
189}
190
191#[async_trait]
192impl RpcConnection for SolanaRpcConnection {
193    fn new<U: ToString>(url: U, commitment_config: Option<CommitmentConfig>) -> Self
194    where
195        Self: Sized,
196    {
197        Self::new_with_retry(url, commitment_config, None)
198    }
199
200    fn get_payer(&self) -> &Keypair {
201        &self.payer
202    }
203
204    fn get_url(&self) -> String {
205        self.client.url()
206    }
207
208    async fn health(&self) -> Result<(), RpcError> {
209        self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
210            .await
211    }
212
213    async fn get_block_time(&self, slot: u64) -> Result<UnixTimestamp, RpcError> {
214        self.retry(|| async { self.client.get_block_time(slot).map_err(RpcError::from) })
215            .await
216    }
217
218    async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
219        self.retry(|| async { self.client.get_epoch_info().map_err(RpcError::from) })
220            .await
221    }
222
223    async fn get_program_accounts(
224        &self,
225        program_id: &Pubkey,
226    ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
227        self.retry(|| async {
228            self.client
229                .get_program_accounts(program_id)
230                .map_err(RpcError::from)
231        })
232        .await
233    }
234
235    async fn process_transaction(
236        &mut self,
237        transaction: Transaction,
238    ) -> Result<Signature, RpcError> {
239        self.retry(|| async {
240            self.client
241                .send_and_confirm_transaction(&transaction)
242                .map_err(RpcError::from)
243        })
244        .await
245    }
246
247    async fn process_transaction_with_context(
248        &mut self,
249        transaction: Transaction,
250    ) -> Result<(Signature, Slot), RpcError> {
251        self.retry(|| async {
252            let signature = self.client.send_and_confirm_transaction(&transaction)?;
253            let sig_info = self.client.get_signature_statuses(&[signature])?;
254            let slot = sig_info
255                .value
256                .first()
257                .and_then(|s| s.as_ref())
258                .map(|s| s.slot)
259                .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
260            Ok((signature, slot))
261        })
262        .await
263    }
264
265    async fn create_and_send_transaction_with_event<T>(
266        &mut self,
267        instructions: &[Instruction],
268        payer: &Pubkey,
269        signers: &[&Keypair],
270        transaction_params: Option<TransactionParams>,
271    ) -> Result<Option<(T, Signature, u64)>, RpcError>
272    where
273        T: BorshDeserialize + Send + Debug,
274    {
275        let pre_balance = self.client.get_balance(payer)?;
276        let latest_blockhash = self.client.get_latest_blockhash()?;
277
278        let mut instructions_vec = vec![
279            solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1_000_000),
280        ];
281        instructions_vec.extend_from_slice(instructions);
282
283        let transaction = Transaction::new_signed_with_payer(
284            instructions_vec.as_slice(),
285            Some(payer),
286            signers,
287            latest_blockhash,
288        );
289
290        let (signature, slot) = self
291            .process_transaction_with_context(transaction.clone())
292            .await?;
293
294        let mut parsed_event = None;
295        for instruction in &transaction.message.instructions {
296            if let Ok(e) = T::deserialize(&mut &instruction.data[..]) {
297                parsed_event = Some(e);
298                break;
299            }
300        }
301
302        if parsed_event.is_none() {
303            parsed_event = self.parse_inner_instructions::<T>(signature).ok();
304        }
305
306        if let Some(transaction_params) = transaction_params {
307            let mut deduped_signers = signers.to_vec();
308            deduped_signers.dedup();
309            let post_balance = self.get_account(*payer).await?.unwrap().lamports;
310
311            // a network_fee is charged if there are input compressed accounts or new addresses
312            let mut network_fee: i64 = 0;
313            if transaction_params.num_input_compressed_accounts != 0 {
314                network_fee += transaction_params.fee_config.network_fee as i64;
315            }
316            if transaction_params.num_new_addresses != 0 {
317                network_fee += transaction_params.fee_config.address_network_fee as i64;
318            }
319
320            let expected_post_balance = pre_balance as i64
321                - i64::from(transaction_params.num_new_addresses)
322                    * transaction_params.fee_config.address_queue_rollover as i64
323                - i64::from(transaction_params.num_output_compressed_accounts)
324                    * transaction_params.fee_config.state_merkle_tree_rollover as i64
325                - transaction_params.compress
326                - 5000 * deduped_signers.len() as i64
327                - network_fee;
328            if post_balance as i64 != expected_post_balance {
329                return Err(RpcError::AssertRpcError(format!("unexpected balance after transaction: expected {expected_post_balance}, got {post_balance}")));
330            }
331        }
332
333        let result = parsed_event.map(|e| (e, signature, slot));
334        Ok(result)
335    }
336
337    async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
338        self.retry(|| async {
339            self.client
340                .confirm_transaction(&signature)
341                .map_err(RpcError::from)
342        })
343        .await
344    }
345
346    async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError> {
347        self.retry(|| async {
348            self.client
349                .get_account_with_commitment(&address, self.client.commitment())
350                .map(|response| response.value)
351                .map_err(RpcError::from)
352        })
353        .await
354    }
355
356    fn set_account(&mut self, _address: &Pubkey, _account: &AccountSharedData) {
357        unimplemented!()
358    }
359
360    async fn get_minimum_balance_for_rent_exemption(
361        &mut self,
362        data_len: usize,
363    ) -> Result<u64, RpcError> {
364        self.retry(|| async {
365            self.client
366                .get_minimum_balance_for_rent_exemption(data_len)
367                .map_err(RpcError::from)
368        })
369        .await
370    }
371
372    async fn airdrop_lamports(
373        &mut self,
374        to: &Pubkey,
375        lamports: u64,
376    ) -> Result<Signature, RpcError> {
377        self.retry(|| async {
378            let signature = self
379                .client
380                .request_airdrop(to, lamports)
381                .map_err(RpcError::ClientError)?;
382            println!("Airdrop signature: {:?}", signature);
383            self.retry(|| async {
384                if self
385                    .client
386                    .confirm_transaction_with_commitment(&signature, self.client.commitment())?
387                    .value
388                {
389                    Ok(())
390                } else {
391                    Err(RpcError::CustomError("Airdrop not confirmed".into()))
392                }
393            })
394            .await?;
395
396            Ok(signature)
397        })
398        .await
399    }
400
401    async fn get_balance(&mut self, pubkey: &Pubkey) -> Result<u64, RpcError> {
402        self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
403            .await
404    }
405
406    async fn get_latest_blockhash(&mut self) -> Result<Hash, RpcError> {
407        self.retry(|| async { self.client.get_latest_blockhash().map_err(RpcError::from) })
408            .await
409    }
410
411    async fn get_slot(&mut self) -> Result<u64, RpcError> {
412        self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
413            .await
414    }
415
416    async fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> {
417        Err(RpcError::CustomError(
418            "Warp to slot is not supported in SolanaRpcConnection".to_string(),
419        ))
420    }
421
422    async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
423        self.retry(|| async {
424            self.client
425                .send_transaction_with_config(
426                    transaction,
427                    RpcSendTransactionConfig {
428                        skip_preflight: true,
429                        max_retries: Some(self.retry_config.max_retries as usize),
430                        ..Default::default()
431                    },
432                )
433                .map_err(RpcError::from)
434        })
435        .await
436    }
437}