light_client/rpc/
client.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    time::Duration,
4};
5
6use async_trait::async_trait;
7use borsh::BorshDeserialize;
8use bs58;
9use light_compressed_account::{
10    indexer_event::{
11        event::{BatchPublicTransactionEvent, PublicTransactionEvent},
12        parse::event_from_light_transaction,
13    },
14    TreeType,
15};
16use solana_account::Account;
17use solana_clock::Slot;
18use solana_commitment_config::CommitmentConfig;
19use solana_hash::Hash;
20use solana_instruction::Instruction;
21use solana_keypair::Keypair;
22use solana_pubkey::{pubkey, Pubkey};
23use solana_rpc_client::rpc_client::RpcClient;
24use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
25use solana_signature::Signature;
26use solana_transaction::Transaction;
27use solana_transaction_status_client_types::{
28    option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding,
29};
30use tokio::time::{sleep, Instant};
31use tracing::warn;
32
33use super::LightClientConfig;
34use crate::{
35    indexer::{photon_indexer::PhotonIndexer, Indexer, TreeInfo},
36    rpc::{
37        errors::RpcError,
38        get_light_state_tree_infos::{
39            default_state_tree_lookup_tables, get_light_state_tree_infos,
40        },
41        merkle_tree::MerkleTreeExt,
42        Rpc,
43    },
44};
45
46pub enum RpcUrl {
47    Testnet,
48    Devnet,
49    Localnet,
50    ZKTestnet,
51    Custom(String),
52}
53
54impl Display for RpcUrl {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        let str = match self {
57            RpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
58            RpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
59            RpcUrl::Localnet => "http://localhost:8899".to_string(),
60            RpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
61            RpcUrl::Custom(url) => url.clone(),
62        };
63        write!(f, "{}", str)
64    }
65}
66
67#[derive(Clone, Debug, Copy)]
68pub struct RetryConfig {
69    pub max_retries: u32,
70    pub retry_delay: Duration,
71    /// Max Light slot timeout in time based on solana slot length and light
72    /// slot length.
73    pub timeout: Duration,
74}
75
76impl Default for RetryConfig {
77    fn default() -> Self {
78        RetryConfig {
79            max_retries: 30,
80            retry_delay: Duration::from_secs(1),
81            timeout: Duration::from_secs(60),
82        }
83    }
84}
85
86#[allow(dead_code)]
87pub struct LightClient {
88    pub client: RpcClient,
89    pub payer: Keypair,
90    pub retry_config: RetryConfig,
91    pub indexer: Option<PhotonIndexer>,
92    pub state_merkle_trees: Vec<TreeInfo>,
93}
94
95impl Debug for LightClient {
96    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97        write!(f, "LightClient {{ client: {:?} }}", self.client.url())
98    }
99}
100
101impl LightClient {
102    pub async fn new_with_retry(
103        config: LightClientConfig,
104        retry_config: Option<RetryConfig>,
105    ) -> Result<Self, RpcError> {
106        let payer = Keypair::new();
107        let commitment_config = config
108            .commitment_config
109            .unwrap_or(CommitmentConfig::confirmed());
110        let client = RpcClient::new_with_commitment(config.url.to_string(), commitment_config);
111        let retry_config = retry_config.unwrap_or_default();
112
113        let indexer = config.photon_url.map(|path| PhotonIndexer::new(path, None));
114
115        let mut new = Self {
116            client,
117            payer,
118            retry_config,
119            indexer,
120            state_merkle_trees: Vec::new(),
121        };
122        if config.fetch_active_tree {
123            new.get_latest_active_state_trees().await?;
124        }
125        Ok(new)
126    }
127
128    pub fn add_indexer(&mut self, path: String, api_key: Option<String>) {
129        self.indexer = Some(PhotonIndexer::new(path, api_key));
130    }
131
132    async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
133    where
134        F: Fn() -> Fut,
135        Fut: std::future::Future<Output = Result<T, RpcError>>,
136    {
137        let mut attempts = 0;
138        let start_time = Instant::now();
139        loop {
140            match operation().await {
141                Ok(result) => return Ok(result),
142                Err(e) => {
143                    let retry = self.should_retry(&e);
144                    if retry {
145                        attempts += 1;
146                        if attempts >= self.retry_config.max_retries
147                            || start_time.elapsed() >= self.retry_config.timeout
148                        {
149                            return Err(e);
150                        }
151                        warn!(
152                            "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
153                            self.retry_config.retry_delay,
154                            attempts,
155                            self.retry_config.max_retries,
156                            e
157                        );
158                        tokio::task::yield_now().await;
159                        sleep(self.retry_config.retry_delay).await;
160                    } else {
161                        return Err(e);
162                    }
163                }
164            }
165        }
166    }
167
168    async fn _create_and_send_transaction_with_batched_event(
169        &mut self,
170        instructions: &[Instruction],
171        payer: &Pubkey,
172        signers: &[&Keypair],
173    ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
174        let latest_blockhash = self.client.get_latest_blockhash()?;
175
176        let mut instructions_vec = vec![
177            solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
178                1_000_000,
179            ),
180        ];
181        instructions_vec.extend_from_slice(instructions);
182
183        let transaction = Transaction::new_signed_with_payer(
184            instructions_vec.as_slice(),
185            Some(payer),
186            signers,
187            latest_blockhash,
188        );
189
190        let (signature, slot) = self
191            .process_transaction_with_context(transaction.clone())
192            .await?;
193
194        let mut vec = Vec::new();
195        let mut vec_accounts = Vec::new();
196        let mut program_ids = Vec::new();
197        instructions_vec.iter().for_each(|x| {
198            program_ids.push(light_compressed_account::Pubkey::new_from_array(
199                x.program_id.to_bytes(),
200            ));
201            vec.push(x.data.clone());
202            vec_accounts.push(
203                x.accounts
204                    .iter()
205                    .map(|x| light_compressed_account::Pubkey::new_from_array(x.pubkey.to_bytes()))
206                    .collect(),
207            );
208        });
209        {
210            let rpc_transaction_config = RpcTransactionConfig {
211                encoding: Some(UiTransactionEncoding::Base64),
212                commitment: Some(self.client.commitment()),
213                ..Default::default()
214            };
215            let transaction = self
216                .client
217                .get_transaction_with_config(&signature, rpc_transaction_config)
218                .map_err(|e| RpcError::CustomError(e.to_string()))?;
219            let decoded_transaction = transaction
220                .transaction
221                .transaction
222                .decode()
223                .clone()
224                .unwrap();
225            let account_keys = decoded_transaction.message.static_account_keys();
226            let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
227                RpcError::CustomError("Transaction missing metadata information".to_string())
228            })?;
229            if meta.status.is_err() {
230                return Err(RpcError::CustomError(
231                    "Transaction status indicates an error".to_string(),
232                ));
233            }
234
235            let inner_instructions = match &meta.inner_instructions {
236                OptionSerializer::Some(i) => i,
237                OptionSerializer::None => {
238                    return Err(RpcError::CustomError(
239                        "No inner instructions found".to_string(),
240                    ));
241                }
242                OptionSerializer::Skip => {
243                    return Err(RpcError::CustomError(
244                        "No inner instructions found".to_string(),
245                    ));
246                }
247            };
248
249            for ix in inner_instructions.iter() {
250                for ui_instruction in ix.instructions.iter() {
251                    match ui_instruction {
252                        UiInstruction::Compiled(ui_compiled_instruction) => {
253                            let accounts = &ui_compiled_instruction.accounts;
254                            let data = bs58::decode(&ui_compiled_instruction.data)
255                                .into_vec()
256                                .map_err(|_| {
257                                    RpcError::CustomError(
258                                        "Failed to decode instruction data".to_string(),
259                                    )
260                                })?;
261                            vec.push(data);
262                            program_ids.push(light_compressed_account::Pubkey::new_from_array(
263                                account_keys[ui_compiled_instruction.program_id_index as usize]
264                                    .to_bytes(),
265                            ));
266                            vec_accounts.push(
267                                accounts
268                                    .iter()
269                                    .map(|x| {
270                                        light_compressed_account::Pubkey::new_from_array(
271                                            account_keys[(*x) as usize].to_bytes(),
272                                        )
273                                    })
274                                    .collect(),
275                            );
276                        }
277                        UiInstruction::Parsed(_) => {
278                            println!("Parsed instructions are not implemented yet");
279                        }
280                    }
281                }
282            }
283        }
284        let parsed_event =
285            event_from_light_transaction(program_ids.as_slice(), vec.as_slice(), vec_accounts)
286                .unwrap();
287        let event = parsed_event.map(|e| (e, signature, slot));
288        Ok(event)
289    }
290
291    async fn _create_and_send_transaction_with_event<T>(
292        &mut self,
293        instructions: &[Instruction],
294        payer: &Pubkey,
295        signers: &[&Keypair],
296    ) -> Result<Option<(T, Signature, u64)>, RpcError>
297    where
298        T: BorshDeserialize + Send + Debug,
299    {
300        let latest_blockhash = self.client.get_latest_blockhash()?;
301
302        let mut instructions_vec = vec![
303            solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
304                1_000_000,
305            ),
306        ];
307        instructions_vec.extend_from_slice(instructions);
308
309        let transaction = Transaction::new_signed_with_payer(
310            instructions_vec.as_slice(),
311            Some(payer),
312            signers,
313            latest_blockhash,
314        );
315
316        let (signature, slot) = self
317            .process_transaction_with_context(transaction.clone())
318            .await?;
319
320        let mut parsed_event = None;
321        for instruction in &transaction.message.instructions {
322            let ix_data = instruction.data.clone();
323            match T::deserialize(&mut &instruction.data[..]) {
324                Ok(e) => {
325                    parsed_event = Some(e);
326                    break;
327                }
328                Err(e) => {
329                    warn!(
330                        "Failed to parse event: {:?}, type: {:?}, ix data: {:?}",
331                        e,
332                        std::any::type_name::<T>(),
333                        ix_data
334                    );
335                }
336            }
337        }
338
339        if parsed_event.is_none() {
340            parsed_event = self.parse_inner_instructions::<T>(signature).ok();
341        }
342
343        let result = parsed_event.map(|e| (e, signature, slot));
344        Ok(result)
345    }
346}
347
348impl LightClient {
349    #[allow(clippy::result_large_err)]
350    fn parse_inner_instructions<T: BorshDeserialize>(
351        &self,
352        signature: Signature,
353    ) -> Result<T, RpcError> {
354        let rpc_transaction_config = RpcTransactionConfig {
355            encoding: Some(UiTransactionEncoding::Base64),
356            commitment: Some(self.client.commitment()),
357            ..Default::default()
358        };
359        let transaction = self
360            .client
361            .get_transaction_with_config(&signature, rpc_transaction_config)
362            .map_err(|e| RpcError::CustomError(e.to_string()))?;
363        let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
364            RpcError::CustomError("Transaction missing metadata information".to_string())
365        })?;
366        if meta.status.is_err() {
367            return Err(RpcError::CustomError(
368                "Transaction status indicates an error".to_string(),
369            ));
370        }
371
372        let inner_instructions = match &meta.inner_instructions {
373            OptionSerializer::Some(i) => i,
374            OptionSerializer::None => {
375                return Err(RpcError::CustomError(
376                    "No inner instructions found".to_string(),
377                ));
378            }
379            OptionSerializer::Skip => {
380                return Err(RpcError::CustomError(
381                    "No inner instructions found".to_string(),
382                ));
383            }
384        };
385
386        for ix in inner_instructions.iter() {
387            for ui_instruction in ix.instructions.iter() {
388                match ui_instruction {
389                    UiInstruction::Compiled(ui_compiled_instruction) => {
390                        let data = bs58::decode(&ui_compiled_instruction.data)
391                            .into_vec()
392                            .map_err(|_| {
393                                RpcError::CustomError(
394                                    "Failed to decode instruction data".to_string(),
395                                )
396                            })?;
397
398                        match T::try_from_slice(data.as_slice()) {
399                            Ok(parsed_data) => return Ok(parsed_data),
400                            Err(e) => {
401                                warn!("Failed to parse inner instruction: {:?}", e);
402                            }
403                        }
404                    }
405                    UiInstruction::Parsed(_) => {
406                        println!("Parsed instructions are not implemented yet");
407                    }
408                }
409            }
410        }
411        Err(RpcError::CustomError(
412            "Failed to find any parseable inner instructions".to_string(),
413        ))
414    }
415}
416
417#[async_trait]
418impl Rpc for LightClient {
419    async fn new(config: LightClientConfig) -> Result<Self, RpcError>
420    where
421        Self: Sized,
422    {
423        Self::new_with_retry(config, None).await
424    }
425
426    fn get_payer(&self) -> &Keypair {
427        &self.payer
428    }
429
430    fn get_url(&self) -> String {
431        self.client.url()
432    }
433
434    async fn health(&self) -> Result<(), RpcError> {
435        self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
436            .await
437    }
438
439    async fn get_program_accounts(
440        &self,
441        program_id: &Pubkey,
442    ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
443        self.retry(|| async {
444            self.client
445                .get_program_accounts(program_id)
446                .map_err(RpcError::from)
447        })
448        .await
449    }
450
451    async fn process_transaction(
452        &mut self,
453        transaction: Transaction,
454    ) -> Result<Signature, RpcError> {
455        self.retry(|| async {
456            self.client
457                .send_and_confirm_transaction(&transaction)
458                .map_err(RpcError::from)
459        })
460        .await
461    }
462
463    async fn process_transaction_with_context(
464        &mut self,
465        transaction: Transaction,
466    ) -> Result<(Signature, Slot), RpcError> {
467        self.retry(|| async {
468            let signature = self.client.send_and_confirm_transaction(&transaction)?;
469            let sig_info = self.client.get_signature_statuses(&[signature])?;
470            let slot = sig_info
471                .value
472                .first()
473                .and_then(|s| s.as_ref())
474                .map(|s| s.slot)
475                .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
476            Ok((signature, slot))
477        })
478        .await
479    }
480
481    async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
482        self.retry(|| async {
483            self.client
484                .confirm_transaction(&signature)
485                .map_err(RpcError::from)
486        })
487        .await
488    }
489
490    async fn get_account(&self, address: Pubkey) -> Result<Option<Account>, RpcError> {
491        self.retry(|| async {
492            self.client
493                .get_account_with_commitment(&address, self.client.commitment())
494                .map(|response| response.value)
495                .map_err(RpcError::from)
496        })
497        .await
498    }
499
500    async fn get_minimum_balance_for_rent_exemption(
501        &self,
502        data_len: usize,
503    ) -> Result<u64, RpcError> {
504        self.retry(|| async {
505            self.client
506                .get_minimum_balance_for_rent_exemption(data_len)
507                .map_err(RpcError::from)
508        })
509        .await
510    }
511
512    async fn airdrop_lamports(
513        &mut self,
514        to: &Pubkey,
515        lamports: u64,
516    ) -> Result<Signature, RpcError> {
517        self.retry(|| async {
518            let signature = self
519                .client
520                .request_airdrop(to, lamports)
521                .map_err(RpcError::ClientError)?;
522            self.retry(|| async {
523                if self
524                    .client
525                    .confirm_transaction_with_commitment(&signature, self.client.commitment())?
526                    .value
527                {
528                    Ok(())
529                } else {
530                    Err(RpcError::CustomError("Airdrop not confirmed".into()))
531                }
532            })
533            .await?;
534
535            Ok(signature)
536        })
537        .await
538    }
539
540    async fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, RpcError> {
541        self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
542            .await
543    }
544
545    async fn get_latest_blockhash(&mut self) -> Result<(Hash, u64), RpcError> {
546        self.retry(|| async {
547            self.client
548                // Confirmed commitments land more reliably than finalized
549                // https://www.helius.dev/blog/how-to-deal-with-blockhash-errors-on-solana#how-to-deal-with-blockhash-errors
550                .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
551                .map_err(RpcError::from)
552        })
553        .await
554    }
555
556    async fn get_slot(&self) -> Result<u64, RpcError> {
557        self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
558            .await
559    }
560
561    async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
562        self.retry(|| async {
563            self.client
564                .send_transaction_with_config(
565                    transaction,
566                    RpcSendTransactionConfig {
567                        skip_preflight: true,
568                        max_retries: Some(self.retry_config.max_retries as usize),
569                        ..Default::default()
570                    },
571                )
572                .map_err(RpcError::from)
573        })
574        .await
575    }
576
577    async fn send_transaction_with_config(
578        &self,
579        transaction: &Transaction,
580        config: RpcSendTransactionConfig,
581    ) -> Result<Signature, RpcError> {
582        self.retry(|| async {
583            self.client
584                .send_transaction_with_config(transaction, config)
585                .map_err(RpcError::from)
586        })
587        .await
588    }
589
590    async fn get_transaction_slot(&self, signature: &Signature) -> Result<u64, RpcError> {
591        self.retry(|| async {
592            Ok(self
593                .client
594                .get_transaction_with_config(
595                    signature,
596                    RpcTransactionConfig {
597                        encoding: Some(UiTransactionEncoding::Base64),
598                        commitment: Some(self.client.commitment()),
599                        ..Default::default()
600                    },
601                )
602                .map_err(RpcError::from)?
603                .slot)
604        })
605        .await
606    }
607
608    async fn get_signature_statuses(
609        &self,
610        signatures: &[Signature],
611    ) -> Result<Vec<Option<TransactionStatus>>, RpcError> {
612        self.client
613            .get_signature_statuses(signatures)
614            .map(|response| response.value)
615            .map_err(RpcError::from)
616    }
617
618    async fn create_and_send_transaction_with_event<T>(
619        &mut self,
620        instructions: &[Instruction],
621        payer: &Pubkey,
622        signers: &[&Keypair],
623    ) -> Result<Option<(T, Signature, u64)>, RpcError>
624    where
625        T: BorshDeserialize + Send + Debug,
626    {
627        self._create_and_send_transaction_with_event::<T>(instructions, payer, signers)
628            .await
629    }
630
631    async fn create_and_send_transaction_with_public_event(
632        &mut self,
633        instructions: &[Instruction],
634        payer: &Pubkey,
635        signers: &[&Keypair],
636    ) -> Result<Option<(PublicTransactionEvent, Signature, Slot)>, RpcError> {
637        let parsed_event = self
638            ._create_and_send_transaction_with_batched_event(instructions, payer, signers)
639            .await?;
640
641        let event = parsed_event.map(|(e, signature, slot)| (e[0].event.clone(), signature, slot));
642        Ok(event)
643    }
644
645    async fn create_and_send_transaction_with_batched_event(
646        &mut self,
647        instructions: &[Instruction],
648        payer: &Pubkey,
649        signers: &[&Keypair],
650    ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
651        self._create_and_send_transaction_with_batched_event(instructions, payer, signers)
652            .await
653    }
654
655    fn indexer(&self) -> Result<&impl Indexer, RpcError> {
656        self.indexer.as_ref().ok_or(RpcError::IndexerNotInitialized)
657    }
658
659    fn indexer_mut(&mut self) -> Result<&mut impl Indexer, RpcError> {
660        self.indexer.as_mut().ok_or(RpcError::IndexerNotInitialized)
661    }
662
663    /// Fetch the latest state tree addresses from the cluster.
664    async fn get_latest_active_state_trees(&mut self) -> Result<Vec<TreeInfo>, RpcError> {
665        let res = default_state_tree_lookup_tables().0;
666        let res = get_light_state_tree_infos(
667            self,
668            &res[0].state_tree_lookup_table,
669            &res[0].nullify_table,
670        )
671        .await?;
672        self.state_merkle_trees = res.clone();
673        Ok(res)
674    }
675
676    /// Fetch the latest state tree addresses from the cluster.
677    fn get_state_tree_infos(&self) -> Vec<TreeInfo> {
678        self.state_merkle_trees.to_vec()
679    }
680
681    /// Gets a random active state tree.
682    /// State trees are cached and have to be fetched or set.
683    fn get_random_state_tree_info(&self) -> Result<TreeInfo, RpcError> {
684        let mut rng = rand::thread_rng();
685        select_state_tree_info(&mut rng, &self.state_merkle_trees)
686    }
687
688    fn get_address_tree_v1(&self) -> TreeInfo {
689        TreeInfo {
690            tree: pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"),
691            queue: pubkey!("aq1S9z4reTSQAdgWHGD2zDaS39sjGrAxbR31vxJ2F4F"),
692            cpi_context: None,
693            next_tree_info: None,
694            tree_type: TreeType::AddressV1,
695        }
696    }
697}
698
699impl MerkleTreeExt for LightClient {}
700
701/// Selects a random state tree from the provided list.
702///
703/// This function should be used together with `get_state_tree_infos()` to first
704/// retrieve the list of state trees, then select one randomly.
705///
706/// # Arguments
707/// * `rng` - A mutable reference to a random number generator
708/// * `state_trees` - A slice of `TreeInfo` representing state trees
709///
710/// # Returns
711/// A randomly selected `TreeInfo` from the provided list, or an error if the list is empty
712///
713/// # Errors
714/// Returns `RpcError::NoStateTreesAvailable` if the provided slice is empty
715///
716/// # Example
717/// ```ignore
718/// use rand::thread_rng;
719/// let tree_infos = client.get_state_tree_infos();
720/// let mut rng = thread_rng();
721/// let selected_tree = select_state_tree_info(&mut rng, &tree_infos)?;
722/// ```
723pub fn select_state_tree_info<R: rand::Rng>(
724    rng: &mut R,
725    state_trees: &[TreeInfo],
726) -> Result<TreeInfo, RpcError> {
727    if state_trees.is_empty() {
728        return Err(RpcError::NoStateTreesAvailable);
729    }
730
731    Ok(state_trees[rng.gen_range(0..state_trees.len())])
732}