light_client/rpc/
client.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    time::Duration,
4};
5
6use async_trait::async_trait;
7use borsh::BorshDeserialize;
8use bs58;
9use light_compressed_account::{
10    indexer_event::{
11        event::{BatchPublicTransactionEvent, PublicTransactionEvent},
12        parse::event_from_light_transaction,
13    },
14    TreeType,
15};
16use solana_account::Account;
17use solana_clock::Slot;
18use solana_commitment_config::CommitmentConfig;
19use solana_hash::Hash;
20use solana_instruction::Instruction;
21use solana_keypair::Keypair;
22use solana_pubkey::{pubkey, Pubkey};
23use solana_rpc_client::rpc_client::RpcClient;
24use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
25use solana_signature::Signature;
26use solana_transaction::Transaction;
27use solana_transaction_status_client_types::{
28    option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding,
29};
30use tokio::time::{sleep, Instant};
31use tracing::warn;
32
33use super::LightClientConfig;
34use crate::{
35    indexer::{photon_indexer::PhotonIndexer, Indexer, TreeInfo},
36    rpc::{
37        errors::RpcError,
38        get_light_state_tree_infos::{
39            default_state_tree_lookup_tables, get_light_state_tree_infos,
40        },
41        merkle_tree::MerkleTreeExt,
42        Rpc,
43    },
44};
45
46pub enum RpcUrl {
47    Testnet,
48    Devnet,
49    Localnet,
50    ZKTestnet,
51    Custom(String),
52}
53
54impl Display for RpcUrl {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        let str = match self {
57            RpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
58            RpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
59            RpcUrl::Localnet => "http://localhost:8899".to_string(),
60            RpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
61            RpcUrl::Custom(url) => url.clone(),
62        };
63        write!(f, "{}", str)
64    }
65}
66
67#[derive(Clone, Debug, Copy)]
68pub struct RetryConfig {
69    pub max_retries: u32,
70    pub retry_delay: Duration,
71    /// Max Light slot timeout in time based on solana slot length and light
72    /// slot length.
73    pub timeout: Duration,
74}
75
76impl Default for RetryConfig {
77    fn default() -> Self {
78        RetryConfig {
79            max_retries: 30,
80            retry_delay: Duration::from_secs(1),
81            timeout: Duration::from_secs(60),
82        }
83    }
84}
85
86#[allow(dead_code)]
87pub struct LightClient {
88    pub client: RpcClient,
89    pub payer: Keypair,
90    pub retry_config: RetryConfig,
91    pub indexer: Option<PhotonIndexer>,
92    pub state_merkle_trees: Vec<TreeInfo>,
93}
94
95impl Debug for LightClient {
96    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97        write!(f, "LightClient {{ client: {:?} }}", self.client.url())
98    }
99}
100
101impl LightClient {
102    pub async fn new_with_retry(
103        config: LightClientConfig,
104        retry_config: Option<RetryConfig>,
105    ) -> Result<Self, RpcError> {
106        let payer = Keypair::new();
107        let commitment_config = config
108            .commitment_config
109            .unwrap_or(CommitmentConfig::confirmed());
110        let client = RpcClient::new_with_commitment(config.url.to_string(), commitment_config);
111        let retry_config = retry_config.unwrap_or_default();
112
113        let indexer = if config.with_indexer {
114            if config.url == RpcUrl::Localnet.to_string() {
115                Some(PhotonIndexer::new(
116                    "http://127.0.0.1:8784".to_string(),
117                    None,
118                ))
119            } else {
120                Some(PhotonIndexer::new(
121                    config.url.to_string(),
122                    None, // TODO: test that this is not required
123                ))
124            }
125        } else {
126            None
127        };
128        let mut new = Self {
129            client,
130            payer,
131            retry_config,
132            indexer,
133            state_merkle_trees: Vec::new(),
134        };
135        if config.fetch_active_tree {
136            new.get_latest_active_state_trees().await?;
137        }
138        Ok(new)
139    }
140
141    pub fn add_indexer(&mut self, path: String, api_key: Option<String>) {
142        self.indexer = Some(PhotonIndexer::new(path, api_key));
143    }
144
145    async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
146    where
147        F: Fn() -> Fut,
148        Fut: std::future::Future<Output = Result<T, RpcError>>,
149    {
150        let mut attempts = 0;
151        let start_time = Instant::now();
152        loop {
153            match operation().await {
154                Ok(result) => return Ok(result),
155                Err(e) => {
156                    let retry = self.should_retry(&e);
157                    if retry {
158                        attempts += 1;
159                        if attempts >= self.retry_config.max_retries
160                            || start_time.elapsed() >= self.retry_config.timeout
161                        {
162                            return Err(e);
163                        }
164                        warn!(
165                            "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
166                            self.retry_config.retry_delay,
167                            attempts,
168                            self.retry_config.max_retries,
169                            e
170                        );
171                        tokio::task::yield_now().await;
172                        sleep(self.retry_config.retry_delay).await;
173                    } else {
174                        return Err(e);
175                    }
176                }
177            }
178        }
179    }
180
181    async fn _create_and_send_transaction_with_batched_event(
182        &mut self,
183        instructions: &[Instruction],
184        payer: &Pubkey,
185        signers: &[&Keypair],
186    ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
187        let latest_blockhash = self.client.get_latest_blockhash()?;
188
189        let mut instructions_vec = vec![
190            solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
191                1_000_000,
192            ),
193        ];
194        instructions_vec.extend_from_slice(instructions);
195
196        let transaction = Transaction::new_signed_with_payer(
197            instructions_vec.as_slice(),
198            Some(payer),
199            signers,
200            latest_blockhash,
201        );
202
203        let (signature, slot) = self
204            .process_transaction_with_context(transaction.clone())
205            .await?;
206
207        let mut vec = Vec::new();
208        let mut vec_accounts = Vec::new();
209        let mut program_ids = Vec::new();
210        instructions_vec.iter().for_each(|x| {
211            program_ids.push(light_compressed_account::Pubkey::new_from_array(
212                x.program_id.to_bytes(),
213            ));
214            vec.push(x.data.clone());
215            vec_accounts.push(
216                x.accounts
217                    .iter()
218                    .map(|x| light_compressed_account::Pubkey::new_from_array(x.pubkey.to_bytes()))
219                    .collect(),
220            );
221        });
222        {
223            let rpc_transaction_config = RpcTransactionConfig {
224                encoding: Some(UiTransactionEncoding::Base64),
225                commitment: Some(self.client.commitment()),
226                ..Default::default()
227            };
228            let transaction = self
229                .client
230                .get_transaction_with_config(&signature, rpc_transaction_config)
231                .map_err(|e| RpcError::CustomError(e.to_string()))?;
232            let decoded_transaction = transaction
233                .transaction
234                .transaction
235                .decode()
236                .clone()
237                .unwrap();
238            let account_keys = decoded_transaction.message.static_account_keys();
239            let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
240                RpcError::CustomError("Transaction missing metadata information".to_string())
241            })?;
242            if meta.status.is_err() {
243                return Err(RpcError::CustomError(
244                    "Transaction status indicates an error".to_string(),
245                ));
246            }
247
248            let inner_instructions = match &meta.inner_instructions {
249                OptionSerializer::Some(i) => i,
250                OptionSerializer::None => {
251                    return Err(RpcError::CustomError(
252                        "No inner instructions found".to_string(),
253                    ));
254                }
255                OptionSerializer::Skip => {
256                    return Err(RpcError::CustomError(
257                        "No inner instructions found".to_string(),
258                    ));
259                }
260            };
261
262            for ix in inner_instructions.iter() {
263                for ui_instruction in ix.instructions.iter() {
264                    match ui_instruction {
265                        UiInstruction::Compiled(ui_compiled_instruction) => {
266                            let accounts = &ui_compiled_instruction.accounts;
267                            let data = bs58::decode(&ui_compiled_instruction.data)
268                                .into_vec()
269                                .map_err(|_| {
270                                    RpcError::CustomError(
271                                        "Failed to decode instruction data".to_string(),
272                                    )
273                                })?;
274                            vec.push(data);
275                            program_ids.push(light_compressed_account::Pubkey::new_from_array(
276                                account_keys[ui_compiled_instruction.program_id_index as usize]
277                                    .to_bytes(),
278                            ));
279                            vec_accounts.push(
280                                accounts
281                                    .iter()
282                                    .map(|x| {
283                                        light_compressed_account::Pubkey::new_from_array(
284                                            account_keys[(*x) as usize].to_bytes(),
285                                        )
286                                    })
287                                    .collect(),
288                            );
289                        }
290                        UiInstruction::Parsed(_) => {
291                            println!("Parsed instructions are not implemented yet");
292                        }
293                    }
294                }
295            }
296        }
297        let parsed_event =
298            event_from_light_transaction(program_ids.as_slice(), vec.as_slice(), vec_accounts)
299                .unwrap();
300        let event = parsed_event.map(|e| (e, signature, slot));
301        Ok(event)
302    }
303
304    async fn _create_and_send_transaction_with_event<T>(
305        &mut self,
306        instructions: &[Instruction],
307        payer: &Pubkey,
308        signers: &[&Keypair],
309    ) -> Result<Option<(T, Signature, u64)>, RpcError>
310    where
311        T: BorshDeserialize + Send + Debug,
312    {
313        let latest_blockhash = self.client.get_latest_blockhash()?;
314
315        let mut instructions_vec = vec![
316            solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
317                1_000_000,
318            ),
319        ];
320        instructions_vec.extend_from_slice(instructions);
321
322        let transaction = Transaction::new_signed_with_payer(
323            instructions_vec.as_slice(),
324            Some(payer),
325            signers,
326            latest_blockhash,
327        );
328
329        let (signature, slot) = self
330            .process_transaction_with_context(transaction.clone())
331            .await?;
332
333        let mut parsed_event = None;
334        for instruction in &transaction.message.instructions {
335            let ix_data = instruction.data.clone();
336            match T::deserialize(&mut &instruction.data[..]) {
337                Ok(e) => {
338                    parsed_event = Some(e);
339                    break;
340                }
341                Err(e) => {
342                    warn!(
343                        "Failed to parse event: {:?}, type: {:?}, ix data: {:?}",
344                        e,
345                        std::any::type_name::<T>(),
346                        ix_data
347                    );
348                }
349            }
350        }
351
352        if parsed_event.is_none() {
353            parsed_event = self.parse_inner_instructions::<T>(signature).ok();
354        }
355
356        let result = parsed_event.map(|e| (e, signature, slot));
357        Ok(result)
358    }
359}
360
361impl LightClient {
362    #[allow(clippy::result_large_err)]
363    fn parse_inner_instructions<T: BorshDeserialize>(
364        &self,
365        signature: Signature,
366    ) -> Result<T, RpcError> {
367        let rpc_transaction_config = RpcTransactionConfig {
368            encoding: Some(UiTransactionEncoding::Base64),
369            commitment: Some(self.client.commitment()),
370            ..Default::default()
371        };
372        let transaction = self
373            .client
374            .get_transaction_with_config(&signature, rpc_transaction_config)
375            .map_err(|e| RpcError::CustomError(e.to_string()))?;
376        let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
377            RpcError::CustomError("Transaction missing metadata information".to_string())
378        })?;
379        if meta.status.is_err() {
380            return Err(RpcError::CustomError(
381                "Transaction status indicates an error".to_string(),
382            ));
383        }
384
385        let inner_instructions = match &meta.inner_instructions {
386            OptionSerializer::Some(i) => i,
387            OptionSerializer::None => {
388                return Err(RpcError::CustomError(
389                    "No inner instructions found".to_string(),
390                ));
391            }
392            OptionSerializer::Skip => {
393                return Err(RpcError::CustomError(
394                    "No inner instructions found".to_string(),
395                ));
396            }
397        };
398
399        for ix in inner_instructions.iter() {
400            for ui_instruction in ix.instructions.iter() {
401                match ui_instruction {
402                    UiInstruction::Compiled(ui_compiled_instruction) => {
403                        let data = bs58::decode(&ui_compiled_instruction.data)
404                            .into_vec()
405                            .map_err(|_| {
406                                RpcError::CustomError(
407                                    "Failed to decode instruction data".to_string(),
408                                )
409                            })?;
410
411                        match T::try_from_slice(data.as_slice()) {
412                            Ok(parsed_data) => return Ok(parsed_data),
413                            Err(e) => {
414                                warn!("Failed to parse inner instruction: {:?}", e);
415                            }
416                        }
417                    }
418                    UiInstruction::Parsed(_) => {
419                        println!("Parsed instructions are not implemented yet");
420                    }
421                }
422            }
423        }
424        Err(RpcError::CustomError(
425            "Failed to find any parseable inner instructions".to_string(),
426        ))
427    }
428}
429
430#[async_trait]
431impl Rpc for LightClient {
432    async fn new(config: LightClientConfig) -> Result<Self, RpcError>
433    where
434        Self: Sized,
435    {
436        Self::new_with_retry(config, None).await
437    }
438
439    fn get_payer(&self) -> &Keypair {
440        &self.payer
441    }
442
443    fn get_url(&self) -> String {
444        self.client.url()
445    }
446
447    async fn health(&self) -> Result<(), RpcError> {
448        self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
449            .await
450    }
451
452    async fn get_program_accounts(
453        &self,
454        program_id: &Pubkey,
455    ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
456        self.retry(|| async {
457            self.client
458                .get_program_accounts(program_id)
459                .map_err(RpcError::from)
460        })
461        .await
462    }
463
464    async fn process_transaction(
465        &mut self,
466        transaction: Transaction,
467    ) -> Result<Signature, RpcError> {
468        self.retry(|| async {
469            self.client
470                .send_and_confirm_transaction(&transaction)
471                .map_err(RpcError::from)
472        })
473        .await
474    }
475
476    async fn process_transaction_with_context(
477        &mut self,
478        transaction: Transaction,
479    ) -> Result<(Signature, Slot), RpcError> {
480        self.retry(|| async {
481            let signature = self.client.send_and_confirm_transaction(&transaction)?;
482            let sig_info = self.client.get_signature_statuses(&[signature])?;
483            let slot = sig_info
484                .value
485                .first()
486                .and_then(|s| s.as_ref())
487                .map(|s| s.slot)
488                .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
489            Ok((signature, slot))
490        })
491        .await
492    }
493
494    async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
495        self.retry(|| async {
496            self.client
497                .confirm_transaction(&signature)
498                .map_err(RpcError::from)
499        })
500        .await
501    }
502
503    async fn get_account(&self, address: Pubkey) -> Result<Option<Account>, RpcError> {
504        self.retry(|| async {
505            self.client
506                .get_account_with_commitment(&address, self.client.commitment())
507                .map(|response| response.value)
508                .map_err(RpcError::from)
509        })
510        .await
511    }
512
513    async fn get_minimum_balance_for_rent_exemption(
514        &self,
515        data_len: usize,
516    ) -> Result<u64, RpcError> {
517        self.retry(|| async {
518            self.client
519                .get_minimum_balance_for_rent_exemption(data_len)
520                .map_err(RpcError::from)
521        })
522        .await
523    }
524
525    async fn airdrop_lamports(
526        &mut self,
527        to: &Pubkey,
528        lamports: u64,
529    ) -> Result<Signature, RpcError> {
530        self.retry(|| async {
531            let signature = self
532                .client
533                .request_airdrop(to, lamports)
534                .map_err(RpcError::ClientError)?;
535            self.retry(|| async {
536                if self
537                    .client
538                    .confirm_transaction_with_commitment(&signature, self.client.commitment())?
539                    .value
540                {
541                    Ok(())
542                } else {
543                    Err(RpcError::CustomError("Airdrop not confirmed".into()))
544                }
545            })
546            .await?;
547
548            Ok(signature)
549        })
550        .await
551    }
552
553    async fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, RpcError> {
554        self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
555            .await
556    }
557
558    async fn get_latest_blockhash(&mut self) -> Result<(Hash, u64), RpcError> {
559        self.retry(|| async {
560            self.client
561                // Confirmed commitments land more reliably than finalized
562                // https://www.helius.dev/blog/how-to-deal-with-blockhash-errors-on-solana#how-to-deal-with-blockhash-errors
563                .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
564                .map_err(RpcError::from)
565        })
566        .await
567    }
568
569    async fn get_slot(&self) -> Result<u64, RpcError> {
570        self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
571            .await
572    }
573
574    async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
575        self.retry(|| async {
576            self.client
577                .send_transaction_with_config(
578                    transaction,
579                    RpcSendTransactionConfig {
580                        skip_preflight: true,
581                        max_retries: Some(self.retry_config.max_retries as usize),
582                        ..Default::default()
583                    },
584                )
585                .map_err(RpcError::from)
586        })
587        .await
588    }
589
590    async fn send_transaction_with_config(
591        &self,
592        transaction: &Transaction,
593        config: RpcSendTransactionConfig,
594    ) -> Result<Signature, RpcError> {
595        self.retry(|| async {
596            self.client
597                .send_transaction_with_config(transaction, config)
598                .map_err(RpcError::from)
599        })
600        .await
601    }
602
603    async fn get_transaction_slot(&self, signature: &Signature) -> Result<u64, RpcError> {
604        self.retry(|| async {
605            Ok(self
606                .client
607                .get_transaction_with_config(
608                    signature,
609                    RpcTransactionConfig {
610                        encoding: Some(UiTransactionEncoding::Base64),
611                        commitment: Some(self.client.commitment()),
612                        ..Default::default()
613                    },
614                )
615                .map_err(RpcError::from)?
616                .slot)
617        })
618        .await
619    }
620
621    async fn get_signature_statuses(
622        &self,
623        signatures: &[Signature],
624    ) -> Result<Vec<Option<TransactionStatus>>, RpcError> {
625        self.client
626            .get_signature_statuses(signatures)
627            .map(|response| response.value)
628            .map_err(RpcError::from)
629    }
630
631    async fn create_and_send_transaction_with_event<T>(
632        &mut self,
633        instructions: &[Instruction],
634        payer: &Pubkey,
635        signers: &[&Keypair],
636    ) -> Result<Option<(T, Signature, u64)>, RpcError>
637    where
638        T: BorshDeserialize + Send + Debug,
639    {
640        self._create_and_send_transaction_with_event::<T>(instructions, payer, signers)
641            .await
642    }
643
644    async fn create_and_send_transaction_with_public_event(
645        &mut self,
646        instructions: &[Instruction],
647        payer: &Pubkey,
648        signers: &[&Keypair],
649    ) -> Result<Option<(PublicTransactionEvent, Signature, Slot)>, RpcError> {
650        let parsed_event = self
651            ._create_and_send_transaction_with_batched_event(instructions, payer, signers)
652            .await?;
653
654        let event = parsed_event.map(|(e, signature, slot)| (e[0].event.clone(), signature, slot));
655        Ok(event)
656    }
657
658    async fn create_and_send_transaction_with_batched_event(
659        &mut self,
660        instructions: &[Instruction],
661        payer: &Pubkey,
662        signers: &[&Keypair],
663    ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
664        self._create_and_send_transaction_with_batched_event(instructions, payer, signers)
665            .await
666    }
667
668    fn indexer(&self) -> Result<&impl Indexer, RpcError> {
669        self.indexer.as_ref().ok_or(RpcError::IndexerNotInitialized)
670    }
671
672    fn indexer_mut(&mut self) -> Result<&mut impl Indexer, RpcError> {
673        self.indexer.as_mut().ok_or(RpcError::IndexerNotInitialized)
674    }
675
676    /// Fetch the latest state tree addresses from the cluster.
677    async fn get_latest_active_state_trees(&mut self) -> Result<Vec<TreeInfo>, RpcError> {
678        let res = default_state_tree_lookup_tables().0;
679        let res = get_light_state_tree_infos(
680            self,
681            &res[0].state_tree_lookup_table,
682            &res[0].nullify_table,
683        )
684        .await?;
685        self.state_merkle_trees = res.clone();
686        Ok(res)
687    }
688
689    /// Fetch the latest state tree addresses from the cluster.
690    fn get_state_tree_infos(&self) -> Vec<TreeInfo> {
691        self.state_merkle_trees.to_vec()
692    }
693
694    /// Gets a random active state tree.
695    /// State trees are cached and have to be fetched or set.
696    fn get_random_state_tree_info(&self) -> Result<TreeInfo, RpcError> {
697        let mut rng = rand::thread_rng();
698        select_state_tree_info(&mut rng, &self.state_merkle_trees)
699    }
700
701    fn get_address_tree_v1(&self) -> TreeInfo {
702        TreeInfo {
703            tree: pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"),
704            queue: pubkey!("aq1S9z4reTSQAdgWHGD2zDaS39sjGrAxbR31vxJ2F4F"),
705            cpi_context: None,
706            next_tree_info: None,
707            tree_type: TreeType::AddressV1,
708        }
709    }
710}
711
712impl MerkleTreeExt for LightClient {}
713
714/// Selects a random state tree from the provided list.
715///
716/// This function should be used together with `get_state_tree_infos()` to first
717/// retrieve the list of state trees, then select one randomly.
718///
719/// # Arguments
720/// * `rng` - A mutable reference to a random number generator
721/// * `state_trees` - A slice of `TreeInfo` representing state trees
722///
723/// # Returns
724/// A randomly selected `TreeInfo` from the provided list, or an error if the list is empty
725///
726/// # Errors
727/// Returns `RpcError::NoStateTreesAvailable` if the provided slice is empty
728///
729/// # Example
730/// ```ignore
731/// use rand::thread_rng;
732/// let tree_infos = client.get_state_tree_infos();
733/// let mut rng = thread_rng();
734/// let selected_tree = select_state_tree_info(&mut rng, &tree_infos)?;
735/// ```
736pub fn select_state_tree_info<R: rand::Rng>(
737    rng: &mut R,
738    state_trees: &[TreeInfo],
739) -> Result<TreeInfo, RpcError> {
740    if state_trees.is_empty() {
741        return Err(RpcError::NoStateTreesAvailable);
742    }
743
744    Ok(state_trees[rng.gen_range(0..state_trees.len())])
745}