1use std::{
2 fmt::{Debug, Display, Formatter},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use borsh::BorshDeserialize;
8use bs58;
9use light_compressed_account::{
10 indexer_event::{
11 event::{BatchPublicTransactionEvent, PublicTransactionEvent},
12 parse::event_from_light_transaction,
13 },
14 TreeType,
15};
16use solana_account::Account;
17use solana_clock::Slot;
18use solana_commitment_config::CommitmentConfig;
19use solana_hash::Hash;
20use solana_instruction::Instruction;
21use solana_keypair::Keypair;
22use solana_pubkey::{pubkey, Pubkey};
23use solana_rpc_client::rpc_client::RpcClient;
24use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
25use solana_signature::Signature;
26use solana_transaction::Transaction;
27use solana_transaction_status_client_types::{
28 option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding,
29};
30use tokio::time::{sleep, Instant};
31use tracing::warn;
32
33use super::LightClientConfig;
34use crate::{
35 indexer::{photon_indexer::PhotonIndexer, Indexer, TreeInfo},
36 rpc::{
37 errors::RpcError,
38 get_light_state_tree_infos::{
39 default_state_tree_lookup_tables, get_light_state_tree_infos,
40 },
41 merkle_tree::MerkleTreeExt,
42 Rpc,
43 },
44};
45
46pub enum RpcUrl {
47 Testnet,
48 Devnet,
49 Localnet,
50 ZKTestnet,
51 Custom(String),
52}
53
54impl Display for RpcUrl {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 let str = match self {
57 RpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
58 RpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
59 RpcUrl::Localnet => "http://localhost:8899".to_string(),
60 RpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
61 RpcUrl::Custom(url) => url.clone(),
62 };
63 write!(f, "{}", str)
64 }
65}
66
67#[derive(Clone, Debug, Copy)]
68pub struct RetryConfig {
69 pub max_retries: u32,
70 pub retry_delay: Duration,
71 pub timeout: Duration,
74}
75
76impl Default for RetryConfig {
77 fn default() -> Self {
78 RetryConfig {
79 max_retries: 30,
80 retry_delay: Duration::from_secs(1),
81 timeout: Duration::from_secs(60),
82 }
83 }
84}
85
86#[allow(dead_code)]
87pub struct LightClient {
88 pub client: RpcClient,
89 pub payer: Keypair,
90 pub retry_config: RetryConfig,
91 pub indexer: Option<PhotonIndexer>,
92 pub state_merkle_trees: Vec<TreeInfo>,
93}
94
95impl Debug for LightClient {
96 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97 write!(f, "LightClient {{ client: {:?} }}", self.client.url())
98 }
99}
100
101impl LightClient {
102 pub async fn new_with_retry(
103 config: LightClientConfig,
104 retry_config: Option<RetryConfig>,
105 ) -> Result<Self, RpcError> {
106 let payer = Keypair::new();
107 let commitment_config = config
108 .commitment_config
109 .unwrap_or(CommitmentConfig::confirmed());
110 let client = RpcClient::new_with_commitment(config.url.to_string(), commitment_config);
111 let retry_config = retry_config.unwrap_or_default();
112
113 let indexer = if config.with_indexer {
114 if config.url == RpcUrl::Localnet.to_string() {
115 Some(PhotonIndexer::new(
116 "http://127.0.0.1:8784".to_string(),
117 None,
118 ))
119 } else {
120 Some(PhotonIndexer::new(
121 config.url.to_string(),
122 None, ))
124 }
125 } else {
126 None
127 };
128 let mut new = Self {
129 client,
130 payer,
131 retry_config,
132 indexer,
133 state_merkle_trees: Vec::new(),
134 };
135 if config.fetch_active_tree {
136 new.get_latest_active_state_trees().await?;
137 }
138 Ok(new)
139 }
140
141 pub fn add_indexer(&mut self, path: String, api_key: Option<String>) {
142 self.indexer = Some(PhotonIndexer::new(path, api_key));
143 }
144
145 async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
146 where
147 F: Fn() -> Fut,
148 Fut: std::future::Future<Output = Result<T, RpcError>>,
149 {
150 let mut attempts = 0;
151 let start_time = Instant::now();
152 loop {
153 match operation().await {
154 Ok(result) => return Ok(result),
155 Err(e) => {
156 let retry = self.should_retry(&e);
157 if retry {
158 attempts += 1;
159 if attempts >= self.retry_config.max_retries
160 || start_time.elapsed() >= self.retry_config.timeout
161 {
162 return Err(e);
163 }
164 warn!(
165 "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
166 self.retry_config.retry_delay,
167 attempts,
168 self.retry_config.max_retries,
169 e
170 );
171 tokio::task::yield_now().await;
172 sleep(self.retry_config.retry_delay).await;
173 } else {
174 return Err(e);
175 }
176 }
177 }
178 }
179 }
180
181 async fn _create_and_send_transaction_with_batched_event(
182 &mut self,
183 instructions: &[Instruction],
184 payer: &Pubkey,
185 signers: &[&Keypair],
186 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
187 let latest_blockhash = self.client.get_latest_blockhash()?;
188
189 let mut instructions_vec = vec![
190 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
191 1_000_000,
192 ),
193 ];
194 instructions_vec.extend_from_slice(instructions);
195
196 let transaction = Transaction::new_signed_with_payer(
197 instructions_vec.as_slice(),
198 Some(payer),
199 signers,
200 latest_blockhash,
201 );
202
203 let (signature, slot) = self
204 .process_transaction_with_context(transaction.clone())
205 .await?;
206
207 let mut vec = Vec::new();
208 let mut vec_accounts = Vec::new();
209 let mut program_ids = Vec::new();
210 instructions_vec.iter().for_each(|x| {
211 program_ids.push(light_compressed_account::Pubkey::new_from_array(
212 x.program_id.to_bytes(),
213 ));
214 vec.push(x.data.clone());
215 vec_accounts.push(
216 x.accounts
217 .iter()
218 .map(|x| light_compressed_account::Pubkey::new_from_array(x.pubkey.to_bytes()))
219 .collect(),
220 );
221 });
222 {
223 let rpc_transaction_config = RpcTransactionConfig {
224 encoding: Some(UiTransactionEncoding::Base64),
225 commitment: Some(self.client.commitment()),
226 ..Default::default()
227 };
228 let transaction = self
229 .client
230 .get_transaction_with_config(&signature, rpc_transaction_config)
231 .map_err(|e| RpcError::CustomError(e.to_string()))?;
232 let decoded_transaction = transaction
233 .transaction
234 .transaction
235 .decode()
236 .clone()
237 .unwrap();
238 let account_keys = decoded_transaction.message.static_account_keys();
239 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
240 RpcError::CustomError("Transaction missing metadata information".to_string())
241 })?;
242 if meta.status.is_err() {
243 return Err(RpcError::CustomError(
244 "Transaction status indicates an error".to_string(),
245 ));
246 }
247
248 let inner_instructions = match &meta.inner_instructions {
249 OptionSerializer::Some(i) => i,
250 OptionSerializer::None => {
251 return Err(RpcError::CustomError(
252 "No inner instructions found".to_string(),
253 ));
254 }
255 OptionSerializer::Skip => {
256 return Err(RpcError::CustomError(
257 "No inner instructions found".to_string(),
258 ));
259 }
260 };
261
262 for ix in inner_instructions.iter() {
263 for ui_instruction in ix.instructions.iter() {
264 match ui_instruction {
265 UiInstruction::Compiled(ui_compiled_instruction) => {
266 let accounts = &ui_compiled_instruction.accounts;
267 let data = bs58::decode(&ui_compiled_instruction.data)
268 .into_vec()
269 .map_err(|_| {
270 RpcError::CustomError(
271 "Failed to decode instruction data".to_string(),
272 )
273 })?;
274 vec.push(data);
275 program_ids.push(light_compressed_account::Pubkey::new_from_array(
276 account_keys[ui_compiled_instruction.program_id_index as usize]
277 .to_bytes(),
278 ));
279 vec_accounts.push(
280 accounts
281 .iter()
282 .map(|x| {
283 light_compressed_account::Pubkey::new_from_array(
284 account_keys[(*x) as usize].to_bytes(),
285 )
286 })
287 .collect(),
288 );
289 }
290 UiInstruction::Parsed(_) => {
291 println!("Parsed instructions are not implemented yet");
292 }
293 }
294 }
295 }
296 }
297 let parsed_event =
298 event_from_light_transaction(program_ids.as_slice(), vec.as_slice(), vec_accounts)
299 .unwrap();
300 let event = parsed_event.map(|e| (e, signature, slot));
301 Ok(event)
302 }
303
304 async fn _create_and_send_transaction_with_event<T>(
305 &mut self,
306 instructions: &[Instruction],
307 payer: &Pubkey,
308 signers: &[&Keypair],
309 ) -> Result<Option<(T, Signature, u64)>, RpcError>
310 where
311 T: BorshDeserialize + Send + Debug,
312 {
313 let latest_blockhash = self.client.get_latest_blockhash()?;
314
315 let mut instructions_vec = vec![
316 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
317 1_000_000,
318 ),
319 ];
320 instructions_vec.extend_from_slice(instructions);
321
322 let transaction = Transaction::new_signed_with_payer(
323 instructions_vec.as_slice(),
324 Some(payer),
325 signers,
326 latest_blockhash,
327 );
328
329 let (signature, slot) = self
330 .process_transaction_with_context(transaction.clone())
331 .await?;
332
333 let mut parsed_event = None;
334 for instruction in &transaction.message.instructions {
335 let ix_data = instruction.data.clone();
336 match T::deserialize(&mut &instruction.data[..]) {
337 Ok(e) => {
338 parsed_event = Some(e);
339 break;
340 }
341 Err(e) => {
342 warn!(
343 "Failed to parse event: {:?}, type: {:?}, ix data: {:?}",
344 e,
345 std::any::type_name::<T>(),
346 ix_data
347 );
348 }
349 }
350 }
351
352 if parsed_event.is_none() {
353 parsed_event = self.parse_inner_instructions::<T>(signature).ok();
354 }
355
356 let result = parsed_event.map(|e| (e, signature, slot));
357 Ok(result)
358 }
359}
360
361impl LightClient {
362 #[allow(clippy::result_large_err)]
363 fn parse_inner_instructions<T: BorshDeserialize>(
364 &self,
365 signature: Signature,
366 ) -> Result<T, RpcError> {
367 let rpc_transaction_config = RpcTransactionConfig {
368 encoding: Some(UiTransactionEncoding::Base64),
369 commitment: Some(self.client.commitment()),
370 ..Default::default()
371 };
372 let transaction = self
373 .client
374 .get_transaction_with_config(&signature, rpc_transaction_config)
375 .map_err(|e| RpcError::CustomError(e.to_string()))?;
376 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
377 RpcError::CustomError("Transaction missing metadata information".to_string())
378 })?;
379 if meta.status.is_err() {
380 return Err(RpcError::CustomError(
381 "Transaction status indicates an error".to_string(),
382 ));
383 }
384
385 let inner_instructions = match &meta.inner_instructions {
386 OptionSerializer::Some(i) => i,
387 OptionSerializer::None => {
388 return Err(RpcError::CustomError(
389 "No inner instructions found".to_string(),
390 ));
391 }
392 OptionSerializer::Skip => {
393 return Err(RpcError::CustomError(
394 "No inner instructions found".to_string(),
395 ));
396 }
397 };
398
399 for ix in inner_instructions.iter() {
400 for ui_instruction in ix.instructions.iter() {
401 match ui_instruction {
402 UiInstruction::Compiled(ui_compiled_instruction) => {
403 let data = bs58::decode(&ui_compiled_instruction.data)
404 .into_vec()
405 .map_err(|_| {
406 RpcError::CustomError(
407 "Failed to decode instruction data".to_string(),
408 )
409 })?;
410
411 match T::try_from_slice(data.as_slice()) {
412 Ok(parsed_data) => return Ok(parsed_data),
413 Err(e) => {
414 warn!("Failed to parse inner instruction: {:?}", e);
415 }
416 }
417 }
418 UiInstruction::Parsed(_) => {
419 println!("Parsed instructions are not implemented yet");
420 }
421 }
422 }
423 }
424 Err(RpcError::CustomError(
425 "Failed to find any parseable inner instructions".to_string(),
426 ))
427 }
428}
429
430#[async_trait]
431impl Rpc for LightClient {
432 async fn new(config: LightClientConfig) -> Result<Self, RpcError>
433 where
434 Self: Sized,
435 {
436 Self::new_with_retry(config, None).await
437 }
438
439 fn get_payer(&self) -> &Keypair {
440 &self.payer
441 }
442
443 fn get_url(&self) -> String {
444 self.client.url()
445 }
446
447 async fn health(&self) -> Result<(), RpcError> {
448 self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
449 .await
450 }
451
452 async fn get_program_accounts(
453 &self,
454 program_id: &Pubkey,
455 ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
456 self.retry(|| async {
457 self.client
458 .get_program_accounts(program_id)
459 .map_err(RpcError::from)
460 })
461 .await
462 }
463
464 async fn process_transaction(
465 &mut self,
466 transaction: Transaction,
467 ) -> Result<Signature, RpcError> {
468 self.retry(|| async {
469 self.client
470 .send_and_confirm_transaction(&transaction)
471 .map_err(RpcError::from)
472 })
473 .await
474 }
475
476 async fn process_transaction_with_context(
477 &mut self,
478 transaction: Transaction,
479 ) -> Result<(Signature, Slot), RpcError> {
480 self.retry(|| async {
481 let signature = self.client.send_and_confirm_transaction(&transaction)?;
482 let sig_info = self.client.get_signature_statuses(&[signature])?;
483 let slot = sig_info
484 .value
485 .first()
486 .and_then(|s| s.as_ref())
487 .map(|s| s.slot)
488 .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
489 Ok((signature, slot))
490 })
491 .await
492 }
493
494 async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
495 self.retry(|| async {
496 self.client
497 .confirm_transaction(&signature)
498 .map_err(RpcError::from)
499 })
500 .await
501 }
502
503 async fn get_account(&self, address: Pubkey) -> Result<Option<Account>, RpcError> {
504 self.retry(|| async {
505 self.client
506 .get_account_with_commitment(&address, self.client.commitment())
507 .map(|response| response.value)
508 .map_err(RpcError::from)
509 })
510 .await
511 }
512
513 async fn get_minimum_balance_for_rent_exemption(
514 &self,
515 data_len: usize,
516 ) -> Result<u64, RpcError> {
517 self.retry(|| async {
518 self.client
519 .get_minimum_balance_for_rent_exemption(data_len)
520 .map_err(RpcError::from)
521 })
522 .await
523 }
524
525 async fn airdrop_lamports(
526 &mut self,
527 to: &Pubkey,
528 lamports: u64,
529 ) -> Result<Signature, RpcError> {
530 self.retry(|| async {
531 let signature = self
532 .client
533 .request_airdrop(to, lamports)
534 .map_err(RpcError::ClientError)?;
535 self.retry(|| async {
536 if self
537 .client
538 .confirm_transaction_with_commitment(&signature, self.client.commitment())?
539 .value
540 {
541 Ok(())
542 } else {
543 Err(RpcError::CustomError("Airdrop not confirmed".into()))
544 }
545 })
546 .await?;
547
548 Ok(signature)
549 })
550 .await
551 }
552
553 async fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, RpcError> {
554 self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
555 .await
556 }
557
558 async fn get_latest_blockhash(&mut self) -> Result<(Hash, u64), RpcError> {
559 self.retry(|| async {
560 self.client
561 .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
564 .map_err(RpcError::from)
565 })
566 .await
567 }
568
569 async fn get_slot(&self) -> Result<u64, RpcError> {
570 self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
571 .await
572 }
573
574 async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
575 self.retry(|| async {
576 self.client
577 .send_transaction_with_config(
578 transaction,
579 RpcSendTransactionConfig {
580 skip_preflight: true,
581 max_retries: Some(self.retry_config.max_retries as usize),
582 ..Default::default()
583 },
584 )
585 .map_err(RpcError::from)
586 })
587 .await
588 }
589
590 async fn send_transaction_with_config(
591 &self,
592 transaction: &Transaction,
593 config: RpcSendTransactionConfig,
594 ) -> Result<Signature, RpcError> {
595 self.retry(|| async {
596 self.client
597 .send_transaction_with_config(transaction, config)
598 .map_err(RpcError::from)
599 })
600 .await
601 }
602
603 async fn get_transaction_slot(&self, signature: &Signature) -> Result<u64, RpcError> {
604 self.retry(|| async {
605 Ok(self
606 .client
607 .get_transaction_with_config(
608 signature,
609 RpcTransactionConfig {
610 encoding: Some(UiTransactionEncoding::Base64),
611 commitment: Some(self.client.commitment()),
612 ..Default::default()
613 },
614 )
615 .map_err(RpcError::from)?
616 .slot)
617 })
618 .await
619 }
620
621 async fn get_signature_statuses(
622 &self,
623 signatures: &[Signature],
624 ) -> Result<Vec<Option<TransactionStatus>>, RpcError> {
625 self.client
626 .get_signature_statuses(signatures)
627 .map(|response| response.value)
628 .map_err(RpcError::from)
629 }
630
631 async fn create_and_send_transaction_with_event<T>(
632 &mut self,
633 instructions: &[Instruction],
634 payer: &Pubkey,
635 signers: &[&Keypair],
636 ) -> Result<Option<(T, Signature, u64)>, RpcError>
637 where
638 T: BorshDeserialize + Send + Debug,
639 {
640 self._create_and_send_transaction_with_event::<T>(instructions, payer, signers)
641 .await
642 }
643
644 async fn create_and_send_transaction_with_public_event(
645 &mut self,
646 instructions: &[Instruction],
647 payer: &Pubkey,
648 signers: &[&Keypair],
649 ) -> Result<Option<(PublicTransactionEvent, Signature, Slot)>, RpcError> {
650 let parsed_event = self
651 ._create_and_send_transaction_with_batched_event(instructions, payer, signers)
652 .await?;
653
654 let event = parsed_event.map(|(e, signature, slot)| (e[0].event.clone(), signature, slot));
655 Ok(event)
656 }
657
658 async fn create_and_send_transaction_with_batched_event(
659 &mut self,
660 instructions: &[Instruction],
661 payer: &Pubkey,
662 signers: &[&Keypair],
663 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
664 self._create_and_send_transaction_with_batched_event(instructions, payer, signers)
665 .await
666 }
667
668 fn indexer(&self) -> Result<&impl Indexer, RpcError> {
669 self.indexer.as_ref().ok_or(RpcError::IndexerNotInitialized)
670 }
671
672 fn indexer_mut(&mut self) -> Result<&mut impl Indexer, RpcError> {
673 self.indexer.as_mut().ok_or(RpcError::IndexerNotInitialized)
674 }
675
676 async fn get_latest_active_state_trees(&mut self) -> Result<Vec<TreeInfo>, RpcError> {
678 let res = default_state_tree_lookup_tables().0;
679 let res = get_light_state_tree_infos(
680 self,
681 &res[0].state_tree_lookup_table,
682 &res[0].nullify_table,
683 )
684 .await?;
685 self.state_merkle_trees = res.clone();
686 Ok(res)
687 }
688
689 fn get_state_tree_infos(&self) -> Vec<TreeInfo> {
691 self.state_merkle_trees.to_vec()
692 }
693
694 fn get_random_state_tree_info(&self) -> Result<TreeInfo, RpcError> {
697 let mut rng = rand::thread_rng();
698 select_state_tree_info(&mut rng, &self.state_merkle_trees)
699 }
700
701 fn get_address_tree_v1(&self) -> TreeInfo {
702 TreeInfo {
703 tree: pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"),
704 queue: pubkey!("aq1S9z4reTSQAdgWHGD2zDaS39sjGrAxbR31vxJ2F4F"),
705 cpi_context: None,
706 next_tree_info: None,
707 tree_type: TreeType::AddressV1,
708 }
709 }
710}
711
712impl MerkleTreeExt for LightClient {}
713
714pub fn select_state_tree_info<R: rand::Rng>(
737 rng: &mut R,
738 state_trees: &[TreeInfo],
739) -> Result<TreeInfo, RpcError> {
740 if state_trees.is_empty() {
741 return Err(RpcError::NoStateTreesAvailable);
742 }
743
744 Ok(state_trees[rng.gen_range(0..state_trees.len())])
745}