1use std::{
2 fmt::{Debug, Display, Formatter},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use borsh::BorshDeserialize;
8use bs58;
9use light_compressed_account::{
10 indexer_event::{
11 event::{BatchPublicTransactionEvent, PublicTransactionEvent},
12 parse::event_from_light_transaction,
13 },
14 TreeType,
15};
16use solana_account::Account;
17use solana_clock::Slot;
18use solana_commitment_config::CommitmentConfig;
19use solana_hash::Hash;
20use solana_instruction::Instruction;
21use solana_keypair::Keypair;
22use solana_pubkey::{pubkey, Pubkey};
23use solana_rpc_client::rpc_client::RpcClient;
24use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
25use solana_signature::Signature;
26use solana_transaction::Transaction;
27use solana_transaction_status_client_types::{
28 option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding,
29};
30use tokio::time::{sleep, Instant};
31use tracing::warn;
32
33use super::LightClientConfig;
34use crate::{
35 indexer::{photon_indexer::PhotonIndexer, Indexer, TreeInfo},
36 rpc::{
37 errors::RpcError,
38 get_light_state_tree_infos::{
39 default_state_tree_lookup_tables, get_light_state_tree_infos,
40 },
41 merkle_tree::MerkleTreeExt,
42 Rpc,
43 },
44};
45
46pub enum RpcUrl {
47 Testnet,
48 Devnet,
49 Localnet,
50 ZKTestnet,
51 Custom(String),
52}
53
54impl Display for RpcUrl {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 let str = match self {
57 RpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
58 RpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
59 RpcUrl::Localnet => "http://localhost:8899".to_string(),
60 RpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
61 RpcUrl::Custom(url) => url.clone(),
62 };
63 write!(f, "{}", str)
64 }
65}
66
67#[derive(Clone, Debug, Copy)]
68pub struct RetryConfig {
69 pub max_retries: u32,
70 pub retry_delay: Duration,
71 pub timeout: Duration,
74}
75
76impl Default for RetryConfig {
77 fn default() -> Self {
78 RetryConfig {
79 max_retries: 30,
80 retry_delay: Duration::from_secs(1),
81 timeout: Duration::from_secs(60),
82 }
83 }
84}
85
86#[allow(dead_code)]
87pub struct LightClient {
88 pub client: RpcClient,
89 pub payer: Keypair,
90 pub retry_config: RetryConfig,
91 pub indexer: Option<PhotonIndexer>,
92 pub state_merkle_trees: Vec<TreeInfo>,
93}
94
95impl Debug for LightClient {
96 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97 write!(f, "LightClient {{ client: {:?} }}", self.client.url())
98 }
99}
100
101impl LightClient {
102 pub async fn new_with_retry(
103 config: LightClientConfig,
104 retry_config: Option<RetryConfig>,
105 ) -> Result<Self, RpcError> {
106 let payer = Keypair::new();
107 let commitment_config = config
108 .commitment_config
109 .unwrap_or(CommitmentConfig::confirmed());
110 let rpc_url = if let Some(api_key) = &config.api_key {
111 format!("{}{}", config.url.trim_end_matches('/'), api_key)
112 } else {
113 config.url.clone()
114 };
115 let client = RpcClient::new_with_commitment(rpc_url, commitment_config);
116 let retry_config = retry_config.unwrap_or_default();
117
118 let indexer = config
119 .photon_url
120 .map(|path| PhotonIndexer::new(path, config.api_key));
121
122 let mut new = Self {
123 client,
124 payer,
125 retry_config,
126 indexer,
127 state_merkle_trees: Vec::new(),
128 };
129 if config.fetch_active_tree {
130 new.get_latest_active_state_trees().await?;
131 }
132 Ok(new)
133 }
134
135 pub fn add_indexer(&mut self, path: String, api_key: Option<String>) {
136 self.indexer = Some(PhotonIndexer::new(path, api_key));
137 }
138
139 fn detect_network(&self) -> RpcUrl {
141 let url = self.client.url();
142
143 if url.contains("devnet") {
144 RpcUrl::Devnet
145 } else if url.contains("testnet") {
146 RpcUrl::Testnet
147 } else if url.contains("localhost") || url.contains("127.0.0.1") {
148 RpcUrl::Localnet
149 } else if url.contains("zk-testnet") {
150 RpcUrl::ZKTestnet
151 } else {
152 RpcUrl::Custom(url.to_string())
154 }
155 }
156
157 async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
158 where
159 F: Fn() -> Fut,
160 Fut: std::future::Future<Output = Result<T, RpcError>>,
161 {
162 let mut attempts = 0;
163 let start_time = Instant::now();
164 loop {
165 match operation().await {
166 Ok(result) => return Ok(result),
167 Err(e) => {
168 let retry = self.should_retry(&e);
169 if retry {
170 attempts += 1;
171 if attempts >= self.retry_config.max_retries
172 || start_time.elapsed() >= self.retry_config.timeout
173 {
174 return Err(e);
175 }
176 warn!(
177 "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
178 self.retry_config.retry_delay,
179 attempts,
180 self.retry_config.max_retries,
181 e
182 );
183 tokio::task::yield_now().await;
184 sleep(self.retry_config.retry_delay).await;
185 } else {
186 return Err(e);
187 }
188 }
189 }
190 }
191 }
192
193 async fn _create_and_send_transaction_with_batched_event(
194 &mut self,
195 instructions: &[Instruction],
196 payer: &Pubkey,
197 signers: &[&Keypair],
198 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
199 let latest_blockhash = self.client.get_latest_blockhash()?;
200
201 let mut instructions_vec = vec![
202 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
203 1_000_000,
204 ),
205 ];
206 instructions_vec.extend_from_slice(instructions);
207
208 let transaction = Transaction::new_signed_with_payer(
209 instructions_vec.as_slice(),
210 Some(payer),
211 signers,
212 latest_blockhash,
213 );
214
215 let (signature, slot) = self
216 .process_transaction_with_context(transaction.clone())
217 .await?;
218
219 let mut vec = Vec::new();
220 let mut vec_accounts = Vec::new();
221 let mut program_ids = Vec::new();
222 instructions_vec.iter().for_each(|x| {
223 program_ids.push(light_compressed_account::Pubkey::new_from_array(
224 x.program_id.to_bytes(),
225 ));
226 vec.push(x.data.clone());
227 vec_accounts.push(
228 x.accounts
229 .iter()
230 .map(|x| light_compressed_account::Pubkey::new_from_array(x.pubkey.to_bytes()))
231 .collect(),
232 );
233 });
234 {
235 let rpc_transaction_config = RpcTransactionConfig {
236 encoding: Some(UiTransactionEncoding::Base64),
237 commitment: Some(self.client.commitment()),
238 ..Default::default()
239 };
240 let transaction = self
241 .client
242 .get_transaction_with_config(&signature, rpc_transaction_config)
243 .map_err(|e| RpcError::CustomError(e.to_string()))?;
244 let decoded_transaction = transaction
245 .transaction
246 .transaction
247 .decode()
248 .clone()
249 .unwrap();
250 let account_keys = decoded_transaction.message.static_account_keys();
251 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
252 RpcError::CustomError("Transaction missing metadata information".to_string())
253 })?;
254 if meta.status.is_err() {
255 return Err(RpcError::CustomError(
256 "Transaction status indicates an error".to_string(),
257 ));
258 }
259
260 let inner_instructions = match &meta.inner_instructions {
261 OptionSerializer::Some(i) => i,
262 OptionSerializer::None => {
263 return Err(RpcError::CustomError(
264 "No inner instructions found".to_string(),
265 ));
266 }
267 OptionSerializer::Skip => {
268 return Err(RpcError::CustomError(
269 "No inner instructions found".to_string(),
270 ));
271 }
272 };
273
274 for ix in inner_instructions.iter() {
275 for ui_instruction in ix.instructions.iter() {
276 match ui_instruction {
277 UiInstruction::Compiled(ui_compiled_instruction) => {
278 let accounts = &ui_compiled_instruction.accounts;
279 let data = bs58::decode(&ui_compiled_instruction.data)
280 .into_vec()
281 .map_err(|_| {
282 RpcError::CustomError(
283 "Failed to decode instruction data".to_string(),
284 )
285 })?;
286 vec.push(data);
287 program_ids.push(light_compressed_account::Pubkey::new_from_array(
288 account_keys[ui_compiled_instruction.program_id_index as usize]
289 .to_bytes(),
290 ));
291 vec_accounts.push(
292 accounts
293 .iter()
294 .map(|x| {
295 light_compressed_account::Pubkey::new_from_array(
296 account_keys[(*x) as usize].to_bytes(),
297 )
298 })
299 .collect(),
300 );
301 }
302 UiInstruction::Parsed(_) => {
303 println!("Parsed instructions are not implemented yet");
304 }
305 }
306 }
307 }
308 }
309 let parsed_event =
310 event_from_light_transaction(program_ids.as_slice(), vec.as_slice(), vec_accounts)
311 .unwrap();
312 let event = parsed_event.map(|e| (e, signature, slot));
313 Ok(event)
314 }
315
316 async fn _create_and_send_transaction_with_event<T>(
317 &mut self,
318 instructions: &[Instruction],
319 payer: &Pubkey,
320 signers: &[&Keypair],
321 ) -> Result<Option<(T, Signature, u64)>, RpcError>
322 where
323 T: BorshDeserialize + Send + Debug,
324 {
325 let latest_blockhash = self.client.get_latest_blockhash()?;
326
327 let mut instructions_vec = vec![
328 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
329 1_000_000,
330 ),
331 ];
332 instructions_vec.extend_from_slice(instructions);
333
334 let transaction = Transaction::new_signed_with_payer(
335 instructions_vec.as_slice(),
336 Some(payer),
337 signers,
338 latest_blockhash,
339 );
340
341 let (signature, slot) = self
342 .process_transaction_with_context(transaction.clone())
343 .await?;
344
345 let mut parsed_event = None;
346 for instruction in &transaction.message.instructions {
347 let ix_data = instruction.data.clone();
348 match T::deserialize(&mut &instruction.data[..]) {
349 Ok(e) => {
350 parsed_event = Some(e);
351 break;
352 }
353 Err(e) => {
354 warn!(
355 "Failed to parse event: {:?}, type: {:?}, ix data: {:?}",
356 e,
357 std::any::type_name::<T>(),
358 ix_data
359 );
360 }
361 }
362 }
363
364 if parsed_event.is_none() {
365 parsed_event = self.parse_inner_instructions::<T>(signature).ok();
366 }
367
368 let result = parsed_event.map(|e| (e, signature, slot));
369 Ok(result)
370 }
371}
372
373impl LightClient {
374 #[allow(clippy::result_large_err)]
375 fn parse_inner_instructions<T: BorshDeserialize>(
376 &self,
377 signature: Signature,
378 ) -> Result<T, RpcError> {
379 let rpc_transaction_config = RpcTransactionConfig {
380 encoding: Some(UiTransactionEncoding::Base64),
381 commitment: Some(self.client.commitment()),
382 ..Default::default()
383 };
384 let transaction = self
385 .client
386 .get_transaction_with_config(&signature, rpc_transaction_config)
387 .map_err(|e| RpcError::CustomError(e.to_string()))?;
388 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
389 RpcError::CustomError("Transaction missing metadata information".to_string())
390 })?;
391 if meta.status.is_err() {
392 return Err(RpcError::CustomError(
393 "Transaction status indicates an error".to_string(),
394 ));
395 }
396
397 let inner_instructions = match &meta.inner_instructions {
398 OptionSerializer::Some(i) => i,
399 OptionSerializer::None => {
400 return Err(RpcError::CustomError(
401 "No inner instructions found".to_string(),
402 ));
403 }
404 OptionSerializer::Skip => {
405 return Err(RpcError::CustomError(
406 "No inner instructions found".to_string(),
407 ));
408 }
409 };
410
411 for ix in inner_instructions.iter() {
412 for ui_instruction in ix.instructions.iter() {
413 match ui_instruction {
414 UiInstruction::Compiled(ui_compiled_instruction) => {
415 let data = bs58::decode(&ui_compiled_instruction.data)
416 .into_vec()
417 .map_err(|_| {
418 RpcError::CustomError(
419 "Failed to decode instruction data".to_string(),
420 )
421 })?;
422
423 match T::try_from_slice(data.as_slice()) {
424 Ok(parsed_data) => return Ok(parsed_data),
425 Err(e) => {
426 warn!("Failed to parse inner instruction: {:?}", e);
427 }
428 }
429 }
430 UiInstruction::Parsed(_) => {
431 println!("Parsed instructions are not implemented yet");
432 }
433 }
434 }
435 }
436 Err(RpcError::CustomError(
437 "Failed to find any parseable inner instructions".to_string(),
438 ))
439 }
440}
441
442#[async_trait]
443impl Rpc for LightClient {
444 async fn new(config: LightClientConfig) -> Result<Self, RpcError>
445 where
446 Self: Sized,
447 {
448 Self::new_with_retry(config, None).await
449 }
450
451 fn get_payer(&self) -> &Keypair {
452 &self.payer
453 }
454
455 fn get_url(&self) -> String {
456 self.client.url()
457 }
458
459 async fn health(&self) -> Result<(), RpcError> {
460 self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
461 .await
462 }
463
464 async fn get_program_accounts(
465 &self,
466 program_id: &Pubkey,
467 ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
468 self.retry(|| async {
469 self.client
470 .get_program_accounts(program_id)
471 .map_err(RpcError::from)
472 })
473 .await
474 }
475
476 async fn process_transaction(
477 &mut self,
478 transaction: Transaction,
479 ) -> Result<Signature, RpcError> {
480 self.retry(|| async {
481 self.client
482 .send_and_confirm_transaction(&transaction)
483 .map_err(RpcError::from)
484 })
485 .await
486 }
487
488 async fn process_transaction_with_context(
489 &mut self,
490 transaction: Transaction,
491 ) -> Result<(Signature, Slot), RpcError> {
492 self.retry(|| async {
493 let signature = self.client.send_and_confirm_transaction(&transaction)?;
494 let sig_info = self.client.get_signature_statuses(&[signature])?;
495 let slot = sig_info
496 .value
497 .first()
498 .and_then(|s| s.as_ref())
499 .map(|s| s.slot)
500 .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
501 Ok((signature, slot))
502 })
503 .await
504 }
505
506 async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
507 self.retry(|| async {
508 self.client
509 .confirm_transaction(&signature)
510 .map_err(RpcError::from)
511 })
512 .await
513 }
514
515 async fn get_account(&self, address: Pubkey) -> Result<Option<Account>, RpcError> {
516 self.retry(|| async {
517 self.client
518 .get_account_with_commitment(&address, self.client.commitment())
519 .map(|response| response.value)
520 .map_err(RpcError::from)
521 })
522 .await
523 }
524
525 async fn get_minimum_balance_for_rent_exemption(
526 &self,
527 data_len: usize,
528 ) -> Result<u64, RpcError> {
529 self.retry(|| async {
530 self.client
531 .get_minimum_balance_for_rent_exemption(data_len)
532 .map_err(RpcError::from)
533 })
534 .await
535 }
536
537 async fn airdrop_lamports(
538 &mut self,
539 to: &Pubkey,
540 lamports: u64,
541 ) -> Result<Signature, RpcError> {
542 self.retry(|| async {
543 let signature = self
544 .client
545 .request_airdrop(to, lamports)
546 .map_err(RpcError::ClientError)?;
547 self.retry(|| async {
548 if self
549 .client
550 .confirm_transaction_with_commitment(&signature, self.client.commitment())?
551 .value
552 {
553 Ok(())
554 } else {
555 Err(RpcError::CustomError("Airdrop not confirmed".into()))
556 }
557 })
558 .await?;
559
560 Ok(signature)
561 })
562 .await
563 }
564
565 async fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, RpcError> {
566 self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
567 .await
568 }
569
570 async fn get_latest_blockhash(&mut self) -> Result<(Hash, u64), RpcError> {
571 self.retry(|| async {
572 self.client
573 .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
576 .map_err(RpcError::from)
577 })
578 .await
579 }
580
581 async fn get_slot(&self) -> Result<u64, RpcError> {
582 self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
583 .await
584 }
585
586 async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
587 self.retry(|| async {
588 self.client
589 .send_transaction_with_config(
590 transaction,
591 RpcSendTransactionConfig {
592 skip_preflight: true,
593 max_retries: Some(self.retry_config.max_retries as usize),
594 ..Default::default()
595 },
596 )
597 .map_err(RpcError::from)
598 })
599 .await
600 }
601
602 async fn send_transaction_with_config(
603 &self,
604 transaction: &Transaction,
605 config: RpcSendTransactionConfig,
606 ) -> Result<Signature, RpcError> {
607 self.retry(|| async {
608 self.client
609 .send_transaction_with_config(transaction, config)
610 .map_err(RpcError::from)
611 })
612 .await
613 }
614
615 async fn get_transaction_slot(&self, signature: &Signature) -> Result<u64, RpcError> {
616 self.retry(|| async {
617 Ok(self
618 .client
619 .get_transaction_with_config(
620 signature,
621 RpcTransactionConfig {
622 encoding: Some(UiTransactionEncoding::Base64),
623 commitment: Some(self.client.commitment()),
624 ..Default::default()
625 },
626 )
627 .map_err(RpcError::from)?
628 .slot)
629 })
630 .await
631 }
632
633 async fn get_signature_statuses(
634 &self,
635 signatures: &[Signature],
636 ) -> Result<Vec<Option<TransactionStatus>>, RpcError> {
637 self.client
638 .get_signature_statuses(signatures)
639 .map(|response| response.value)
640 .map_err(RpcError::from)
641 }
642
643 async fn create_and_send_transaction_with_event<T>(
644 &mut self,
645 instructions: &[Instruction],
646 payer: &Pubkey,
647 signers: &[&Keypair],
648 ) -> Result<Option<(T, Signature, u64)>, RpcError>
649 where
650 T: BorshDeserialize + Send + Debug,
651 {
652 self._create_and_send_transaction_with_event::<T>(instructions, payer, signers)
653 .await
654 }
655
656 async fn create_and_send_transaction_with_public_event(
657 &mut self,
658 instructions: &[Instruction],
659 payer: &Pubkey,
660 signers: &[&Keypair],
661 ) -> Result<Option<(PublicTransactionEvent, Signature, Slot)>, RpcError> {
662 let parsed_event = self
663 ._create_and_send_transaction_with_batched_event(instructions, payer, signers)
664 .await?;
665
666 let event = parsed_event.map(|(e, signature, slot)| (e[0].event.clone(), signature, slot));
667 Ok(event)
668 }
669
670 async fn create_and_send_transaction_with_batched_event(
671 &mut self,
672 instructions: &[Instruction],
673 payer: &Pubkey,
674 signers: &[&Keypair],
675 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
676 self._create_and_send_transaction_with_batched_event(instructions, payer, signers)
677 .await
678 }
679
680 fn indexer(&self) -> Result<&impl Indexer, RpcError> {
681 self.indexer.as_ref().ok_or(RpcError::IndexerNotInitialized)
682 }
683
684 fn indexer_mut(&mut self) -> Result<&mut impl Indexer, RpcError> {
685 self.indexer.as_mut().ok_or(RpcError::IndexerNotInitialized)
686 }
687
688 async fn get_latest_active_state_trees(&mut self) -> Result<Vec<TreeInfo>, RpcError> {
690 let network = self.detect_network();
691
692 if matches!(network, RpcUrl::Localnet) {
694 use light_compressed_account::TreeType;
695 use solana_pubkey::pubkey;
696
697 use crate::indexer::TreeInfo;
698
699 #[cfg(feature = "v2")]
700 let default_trees = vec![TreeInfo {
701 tree: pubkey!("HLKs5NJ8FXkJg8BrzJt56adFYYuwg5etzDtBbQYTsixu"),
702 queue: pubkey!("6L7SzhYB3anwEQ9cphpJ1U7Scwj57bx2xueReg7R9cKU"),
703 cpi_context: Some(pubkey!("7Hp52chxaew8bW1ApR4fck2bh6Y8qA1pu3qwH6N9zaLj")),
704 next_tree_info: None,
705 tree_type: TreeType::StateV2,
706 }];
707
708 #[cfg(not(feature = "v2"))]
709 let default_trees = vec![TreeInfo {
710 tree: pubkey!("smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT"),
711 queue: pubkey!("nfq1NvQDJ2GEgnS8zt9prAe8rjjpAW1zFkrvZoBR148"),
712 cpi_context: Some(pubkey!("cpi1uHzrEhBG733DoEJNgHCyRS3XmmyVNZx5fonubE4")),
713 next_tree_info: None,
714 tree_type: TreeType::StateV1,
715 }];
716
717 self.state_merkle_trees = default_trees.clone();
718 return Ok(default_trees);
719 }
720
721 let (mainnet_tables, devnet_tables) = default_state_tree_lookup_tables();
722
723 let lookup_tables = match network {
724 RpcUrl::Devnet | RpcUrl::Testnet | RpcUrl::ZKTestnet => &devnet_tables,
725 _ => &mainnet_tables, };
727
728 let res = get_light_state_tree_infos(
729 self,
730 &lookup_tables[0].state_tree_lookup_table,
731 &lookup_tables[0].nullify_table,
732 )
733 .await?;
734 self.state_merkle_trees = res.clone();
735 Ok(res)
736 }
737
738 fn get_state_tree_infos(&self) -> Vec<TreeInfo> {
740 self.state_merkle_trees.to_vec()
741 }
742
743 fn get_random_state_tree_info(&self) -> Result<TreeInfo, RpcError> {
746 let mut rng = rand::thread_rng();
747 select_state_tree_info(&mut rng, &self.state_merkle_trees)
748 }
749
750 fn get_address_tree_v1(&self) -> TreeInfo {
751 TreeInfo {
752 tree: pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"),
753 queue: pubkey!("aq1S9z4reTSQAdgWHGD2zDaS39sjGrAxbR31vxJ2F4F"),
754 cpi_context: None,
755 next_tree_info: None,
756 tree_type: TreeType::AddressV1,
757 }
758 }
759}
760
761impl MerkleTreeExt for LightClient {}
762
763pub fn select_state_tree_info<R: rand::Rng>(
786 rng: &mut R,
787 state_trees: &[TreeInfo],
788) -> Result<TreeInfo, RpcError> {
789 if state_trees.is_empty() {
790 return Err(RpcError::NoStateTreesAvailable);
791 }
792
793 Ok(state_trees[rng.gen_range(0..state_trees.len())])
794}