1use std::{
2 fmt::{Debug, Display, Formatter},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use borsh::BorshDeserialize;
8use bs58;
9use light_compressed_account::{
10 indexer_event::{
11 event::{BatchPublicTransactionEvent, PublicTransactionEvent},
12 parse::event_from_light_transaction,
13 },
14 TreeType,
15};
16use solana_account::Account;
17use solana_clock::Slot;
18use solana_commitment_config::CommitmentConfig;
19use solana_hash::Hash;
20use solana_instruction::Instruction;
21use solana_keypair::Keypair;
22use solana_pubkey::{pubkey, Pubkey};
23use solana_rpc_client::rpc_client::RpcClient;
24use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
25use solana_signature::Signature;
26use solana_transaction::Transaction;
27use solana_transaction_status_client_types::{
28 option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding,
29};
30use tokio::time::{sleep, Instant};
31use tracing::warn;
32
33use super::LightClientConfig;
34use crate::{
35 indexer::{photon_indexer::PhotonIndexer, Indexer, TreeInfo},
36 rpc::{
37 errors::RpcError,
38 get_light_state_tree_infos::{
39 default_state_tree_lookup_tables, get_light_state_tree_infos,
40 },
41 merkle_tree::MerkleTreeExt,
42 Rpc,
43 },
44};
45
46pub enum RpcUrl {
47 Testnet,
48 Devnet,
49 Localnet,
50 ZKTestnet,
51 Custom(String),
52}
53
54impl Display for RpcUrl {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 let str = match self {
57 RpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
58 RpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
59 RpcUrl::Localnet => "http://localhost:8899".to_string(),
60 RpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
61 RpcUrl::Custom(url) => url.clone(),
62 };
63 write!(f, "{}", str)
64 }
65}
66
67#[derive(Clone, Debug, Copy)]
68pub struct RetryConfig {
69 pub max_retries: u32,
70 pub retry_delay: Duration,
71 pub timeout: Duration,
74}
75
76impl Default for RetryConfig {
77 fn default() -> Self {
78 RetryConfig {
79 max_retries: 30,
80 retry_delay: Duration::from_secs(1),
81 timeout: Duration::from_secs(60),
82 }
83 }
84}
85
86#[allow(dead_code)]
87pub struct LightClient {
88 pub client: RpcClient,
89 pub payer: Keypair,
90 pub retry_config: RetryConfig,
91 pub indexer: Option<PhotonIndexer>,
92 pub state_merkle_trees: Vec<TreeInfo>,
93}
94
95impl Debug for LightClient {
96 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97 write!(f, "LightClient {{ client: {:?} }}", self.client.url())
98 }
99}
100
101impl LightClient {
102 pub async fn new_with_retry(
103 config: LightClientConfig,
104 retry_config: Option<RetryConfig>,
105 ) -> Result<Self, RpcError> {
106 let payer = Keypair::new();
107 let commitment_config = config
108 .commitment_config
109 .unwrap_or(CommitmentConfig::confirmed());
110 let client = RpcClient::new_with_commitment(config.url.to_string(), commitment_config);
111 let retry_config = retry_config.unwrap_or_default();
112
113 let indexer = config.photon_url.map(|path| PhotonIndexer::new(path, None));
114
115 let mut new = Self {
116 client,
117 payer,
118 retry_config,
119 indexer,
120 state_merkle_trees: Vec::new(),
121 };
122 if config.fetch_active_tree {
123 new.get_latest_active_state_trees().await?;
124 }
125 Ok(new)
126 }
127
128 pub fn add_indexer(&mut self, path: String, api_key: Option<String>) {
129 self.indexer = Some(PhotonIndexer::new(path, api_key));
130 }
131
132 async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
133 where
134 F: Fn() -> Fut,
135 Fut: std::future::Future<Output = Result<T, RpcError>>,
136 {
137 let mut attempts = 0;
138 let start_time = Instant::now();
139 loop {
140 match operation().await {
141 Ok(result) => return Ok(result),
142 Err(e) => {
143 let retry = self.should_retry(&e);
144 if retry {
145 attempts += 1;
146 if attempts >= self.retry_config.max_retries
147 || start_time.elapsed() >= self.retry_config.timeout
148 {
149 return Err(e);
150 }
151 warn!(
152 "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
153 self.retry_config.retry_delay,
154 attempts,
155 self.retry_config.max_retries,
156 e
157 );
158 tokio::task::yield_now().await;
159 sleep(self.retry_config.retry_delay).await;
160 } else {
161 return Err(e);
162 }
163 }
164 }
165 }
166 }
167
168 async fn _create_and_send_transaction_with_batched_event(
169 &mut self,
170 instructions: &[Instruction],
171 payer: &Pubkey,
172 signers: &[&Keypair],
173 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
174 let latest_blockhash = self.client.get_latest_blockhash()?;
175
176 let mut instructions_vec = vec![
177 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
178 1_000_000,
179 ),
180 ];
181 instructions_vec.extend_from_slice(instructions);
182
183 let transaction = Transaction::new_signed_with_payer(
184 instructions_vec.as_slice(),
185 Some(payer),
186 signers,
187 latest_blockhash,
188 );
189
190 let (signature, slot) = self
191 .process_transaction_with_context(transaction.clone())
192 .await?;
193
194 let mut vec = Vec::new();
195 let mut vec_accounts = Vec::new();
196 let mut program_ids = Vec::new();
197 instructions_vec.iter().for_each(|x| {
198 program_ids.push(light_compressed_account::Pubkey::new_from_array(
199 x.program_id.to_bytes(),
200 ));
201 vec.push(x.data.clone());
202 vec_accounts.push(
203 x.accounts
204 .iter()
205 .map(|x| light_compressed_account::Pubkey::new_from_array(x.pubkey.to_bytes()))
206 .collect(),
207 );
208 });
209 {
210 let rpc_transaction_config = RpcTransactionConfig {
211 encoding: Some(UiTransactionEncoding::Base64),
212 commitment: Some(self.client.commitment()),
213 ..Default::default()
214 };
215 let transaction = self
216 .client
217 .get_transaction_with_config(&signature, rpc_transaction_config)
218 .map_err(|e| RpcError::CustomError(e.to_string()))?;
219 let decoded_transaction = transaction
220 .transaction
221 .transaction
222 .decode()
223 .clone()
224 .unwrap();
225 let account_keys = decoded_transaction.message.static_account_keys();
226 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
227 RpcError::CustomError("Transaction missing metadata information".to_string())
228 })?;
229 if meta.status.is_err() {
230 return Err(RpcError::CustomError(
231 "Transaction status indicates an error".to_string(),
232 ));
233 }
234
235 let inner_instructions = match &meta.inner_instructions {
236 OptionSerializer::Some(i) => i,
237 OptionSerializer::None => {
238 return Err(RpcError::CustomError(
239 "No inner instructions found".to_string(),
240 ));
241 }
242 OptionSerializer::Skip => {
243 return Err(RpcError::CustomError(
244 "No inner instructions found".to_string(),
245 ));
246 }
247 };
248
249 for ix in inner_instructions.iter() {
250 for ui_instruction in ix.instructions.iter() {
251 match ui_instruction {
252 UiInstruction::Compiled(ui_compiled_instruction) => {
253 let accounts = &ui_compiled_instruction.accounts;
254 let data = bs58::decode(&ui_compiled_instruction.data)
255 .into_vec()
256 .map_err(|_| {
257 RpcError::CustomError(
258 "Failed to decode instruction data".to_string(),
259 )
260 })?;
261 vec.push(data);
262 program_ids.push(light_compressed_account::Pubkey::new_from_array(
263 account_keys[ui_compiled_instruction.program_id_index as usize]
264 .to_bytes(),
265 ));
266 vec_accounts.push(
267 accounts
268 .iter()
269 .map(|x| {
270 light_compressed_account::Pubkey::new_from_array(
271 account_keys[(*x) as usize].to_bytes(),
272 )
273 })
274 .collect(),
275 );
276 }
277 UiInstruction::Parsed(_) => {
278 println!("Parsed instructions are not implemented yet");
279 }
280 }
281 }
282 }
283 }
284 let parsed_event =
285 event_from_light_transaction(program_ids.as_slice(), vec.as_slice(), vec_accounts)
286 .unwrap();
287 let event = parsed_event.map(|e| (e, signature, slot));
288 Ok(event)
289 }
290
291 async fn _create_and_send_transaction_with_event<T>(
292 &mut self,
293 instructions: &[Instruction],
294 payer: &Pubkey,
295 signers: &[&Keypair],
296 ) -> Result<Option<(T, Signature, u64)>, RpcError>
297 where
298 T: BorshDeserialize + Send + Debug,
299 {
300 let latest_blockhash = self.client.get_latest_blockhash()?;
301
302 let mut instructions_vec = vec![
303 solana_compute_budget_interface::ComputeBudgetInstruction::set_compute_unit_limit(
304 1_000_000,
305 ),
306 ];
307 instructions_vec.extend_from_slice(instructions);
308
309 let transaction = Transaction::new_signed_with_payer(
310 instructions_vec.as_slice(),
311 Some(payer),
312 signers,
313 latest_blockhash,
314 );
315
316 let (signature, slot) = self
317 .process_transaction_with_context(transaction.clone())
318 .await?;
319
320 let mut parsed_event = None;
321 for instruction in &transaction.message.instructions {
322 let ix_data = instruction.data.clone();
323 match T::deserialize(&mut &instruction.data[..]) {
324 Ok(e) => {
325 parsed_event = Some(e);
326 break;
327 }
328 Err(e) => {
329 warn!(
330 "Failed to parse event: {:?}, type: {:?}, ix data: {:?}",
331 e,
332 std::any::type_name::<T>(),
333 ix_data
334 );
335 }
336 }
337 }
338
339 if parsed_event.is_none() {
340 parsed_event = self.parse_inner_instructions::<T>(signature).ok();
341 }
342
343 let result = parsed_event.map(|e| (e, signature, slot));
344 Ok(result)
345 }
346}
347
348impl LightClient {
349 #[allow(clippy::result_large_err)]
350 fn parse_inner_instructions<T: BorshDeserialize>(
351 &self,
352 signature: Signature,
353 ) -> Result<T, RpcError> {
354 let rpc_transaction_config = RpcTransactionConfig {
355 encoding: Some(UiTransactionEncoding::Base64),
356 commitment: Some(self.client.commitment()),
357 ..Default::default()
358 };
359 let transaction = self
360 .client
361 .get_transaction_with_config(&signature, rpc_transaction_config)
362 .map_err(|e| RpcError::CustomError(e.to_string()))?;
363 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
364 RpcError::CustomError("Transaction missing metadata information".to_string())
365 })?;
366 if meta.status.is_err() {
367 return Err(RpcError::CustomError(
368 "Transaction status indicates an error".to_string(),
369 ));
370 }
371
372 let inner_instructions = match &meta.inner_instructions {
373 OptionSerializer::Some(i) => i,
374 OptionSerializer::None => {
375 return Err(RpcError::CustomError(
376 "No inner instructions found".to_string(),
377 ));
378 }
379 OptionSerializer::Skip => {
380 return Err(RpcError::CustomError(
381 "No inner instructions found".to_string(),
382 ));
383 }
384 };
385
386 for ix in inner_instructions.iter() {
387 for ui_instruction in ix.instructions.iter() {
388 match ui_instruction {
389 UiInstruction::Compiled(ui_compiled_instruction) => {
390 let data = bs58::decode(&ui_compiled_instruction.data)
391 .into_vec()
392 .map_err(|_| {
393 RpcError::CustomError(
394 "Failed to decode instruction data".to_string(),
395 )
396 })?;
397
398 match T::try_from_slice(data.as_slice()) {
399 Ok(parsed_data) => return Ok(parsed_data),
400 Err(e) => {
401 warn!("Failed to parse inner instruction: {:?}", e);
402 }
403 }
404 }
405 UiInstruction::Parsed(_) => {
406 println!("Parsed instructions are not implemented yet");
407 }
408 }
409 }
410 }
411 Err(RpcError::CustomError(
412 "Failed to find any parseable inner instructions".to_string(),
413 ))
414 }
415}
416
417#[async_trait]
418impl Rpc for LightClient {
419 async fn new(config: LightClientConfig) -> Result<Self, RpcError>
420 where
421 Self: Sized,
422 {
423 Self::new_with_retry(config, None).await
424 }
425
426 fn get_payer(&self) -> &Keypair {
427 &self.payer
428 }
429
430 fn get_url(&self) -> String {
431 self.client.url()
432 }
433
434 async fn health(&self) -> Result<(), RpcError> {
435 self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
436 .await
437 }
438
439 async fn get_program_accounts(
440 &self,
441 program_id: &Pubkey,
442 ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
443 self.retry(|| async {
444 self.client
445 .get_program_accounts(program_id)
446 .map_err(RpcError::from)
447 })
448 .await
449 }
450
451 async fn process_transaction(
452 &mut self,
453 transaction: Transaction,
454 ) -> Result<Signature, RpcError> {
455 self.retry(|| async {
456 self.client
457 .send_and_confirm_transaction(&transaction)
458 .map_err(RpcError::from)
459 })
460 .await
461 }
462
463 async fn process_transaction_with_context(
464 &mut self,
465 transaction: Transaction,
466 ) -> Result<(Signature, Slot), RpcError> {
467 self.retry(|| async {
468 let signature = self.client.send_and_confirm_transaction(&transaction)?;
469 let sig_info = self.client.get_signature_statuses(&[signature])?;
470 let slot = sig_info
471 .value
472 .first()
473 .and_then(|s| s.as_ref())
474 .map(|s| s.slot)
475 .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
476 Ok((signature, slot))
477 })
478 .await
479 }
480
481 async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
482 self.retry(|| async {
483 self.client
484 .confirm_transaction(&signature)
485 .map_err(RpcError::from)
486 })
487 .await
488 }
489
490 async fn get_account(&self, address: Pubkey) -> Result<Option<Account>, RpcError> {
491 self.retry(|| async {
492 self.client
493 .get_account_with_commitment(&address, self.client.commitment())
494 .map(|response| response.value)
495 .map_err(RpcError::from)
496 })
497 .await
498 }
499
500 async fn get_minimum_balance_for_rent_exemption(
501 &self,
502 data_len: usize,
503 ) -> Result<u64, RpcError> {
504 self.retry(|| async {
505 self.client
506 .get_minimum_balance_for_rent_exemption(data_len)
507 .map_err(RpcError::from)
508 })
509 .await
510 }
511
512 async fn airdrop_lamports(
513 &mut self,
514 to: &Pubkey,
515 lamports: u64,
516 ) -> Result<Signature, RpcError> {
517 self.retry(|| async {
518 let signature = self
519 .client
520 .request_airdrop(to, lamports)
521 .map_err(RpcError::ClientError)?;
522 self.retry(|| async {
523 if self
524 .client
525 .confirm_transaction_with_commitment(&signature, self.client.commitment())?
526 .value
527 {
528 Ok(())
529 } else {
530 Err(RpcError::CustomError("Airdrop not confirmed".into()))
531 }
532 })
533 .await?;
534
535 Ok(signature)
536 })
537 .await
538 }
539
540 async fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, RpcError> {
541 self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
542 .await
543 }
544
545 async fn get_latest_blockhash(&mut self) -> Result<(Hash, u64), RpcError> {
546 self.retry(|| async {
547 self.client
548 .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
551 .map_err(RpcError::from)
552 })
553 .await
554 }
555
556 async fn get_slot(&self) -> Result<u64, RpcError> {
557 self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
558 .await
559 }
560
561 async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
562 self.retry(|| async {
563 self.client
564 .send_transaction_with_config(
565 transaction,
566 RpcSendTransactionConfig {
567 skip_preflight: true,
568 max_retries: Some(self.retry_config.max_retries as usize),
569 ..Default::default()
570 },
571 )
572 .map_err(RpcError::from)
573 })
574 .await
575 }
576
577 async fn send_transaction_with_config(
578 &self,
579 transaction: &Transaction,
580 config: RpcSendTransactionConfig,
581 ) -> Result<Signature, RpcError> {
582 self.retry(|| async {
583 self.client
584 .send_transaction_with_config(transaction, config)
585 .map_err(RpcError::from)
586 })
587 .await
588 }
589
590 async fn get_transaction_slot(&self, signature: &Signature) -> Result<u64, RpcError> {
591 self.retry(|| async {
592 Ok(self
593 .client
594 .get_transaction_with_config(
595 signature,
596 RpcTransactionConfig {
597 encoding: Some(UiTransactionEncoding::Base64),
598 commitment: Some(self.client.commitment()),
599 ..Default::default()
600 },
601 )
602 .map_err(RpcError::from)?
603 .slot)
604 })
605 .await
606 }
607
608 async fn get_signature_statuses(
609 &self,
610 signatures: &[Signature],
611 ) -> Result<Vec<Option<TransactionStatus>>, RpcError> {
612 self.client
613 .get_signature_statuses(signatures)
614 .map(|response| response.value)
615 .map_err(RpcError::from)
616 }
617
618 async fn create_and_send_transaction_with_event<T>(
619 &mut self,
620 instructions: &[Instruction],
621 payer: &Pubkey,
622 signers: &[&Keypair],
623 ) -> Result<Option<(T, Signature, u64)>, RpcError>
624 where
625 T: BorshDeserialize + Send + Debug,
626 {
627 self._create_and_send_transaction_with_event::<T>(instructions, payer, signers)
628 .await
629 }
630
631 async fn create_and_send_transaction_with_public_event(
632 &mut self,
633 instructions: &[Instruction],
634 payer: &Pubkey,
635 signers: &[&Keypair],
636 ) -> Result<Option<(PublicTransactionEvent, Signature, Slot)>, RpcError> {
637 let parsed_event = self
638 ._create_and_send_transaction_with_batched_event(instructions, payer, signers)
639 .await?;
640
641 let event = parsed_event.map(|(e, signature, slot)| (e[0].event.clone(), signature, slot));
642 Ok(event)
643 }
644
645 async fn create_and_send_transaction_with_batched_event(
646 &mut self,
647 instructions: &[Instruction],
648 payer: &Pubkey,
649 signers: &[&Keypair],
650 ) -> Result<Option<(Vec<BatchPublicTransactionEvent>, Signature, Slot)>, RpcError> {
651 self._create_and_send_transaction_with_batched_event(instructions, payer, signers)
652 .await
653 }
654
655 fn indexer(&self) -> Result<&impl Indexer, RpcError> {
656 self.indexer.as_ref().ok_or(RpcError::IndexerNotInitialized)
657 }
658
659 fn indexer_mut(&mut self) -> Result<&mut impl Indexer, RpcError> {
660 self.indexer.as_mut().ok_or(RpcError::IndexerNotInitialized)
661 }
662
663 async fn get_latest_active_state_trees(&mut self) -> Result<Vec<TreeInfo>, RpcError> {
665 let res = default_state_tree_lookup_tables().0;
666 let res = get_light_state_tree_infos(
667 self,
668 &res[0].state_tree_lookup_table,
669 &res[0].nullify_table,
670 )
671 .await?;
672 self.state_merkle_trees = res.clone();
673 Ok(res)
674 }
675
676 fn get_state_tree_infos(&self) -> Vec<TreeInfo> {
678 self.state_merkle_trees.to_vec()
679 }
680
681 fn get_random_state_tree_info(&self) -> Result<TreeInfo, RpcError> {
684 let mut rng = rand::thread_rng();
685 select_state_tree_info(&mut rng, &self.state_merkle_trees)
686 }
687
688 fn get_address_tree_v1(&self) -> TreeInfo {
689 TreeInfo {
690 tree: pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"),
691 queue: pubkey!("aq1S9z4reTSQAdgWHGD2zDaS39sjGrAxbR31vxJ2F4F"),
692 cpi_context: None,
693 next_tree_info: None,
694 tree_type: TreeType::AddressV1,
695 }
696 }
697}
698
699impl MerkleTreeExt for LightClient {}
700
701pub fn select_state_tree_info<R: rand::Rng>(
724 rng: &mut R,
725 state_trees: &[TreeInfo],
726) -> Result<TreeInfo, RpcError> {
727 if state_trees.is_empty() {
728 return Err(RpcError::NoStateTreesAvailable);
729 }
730
731 Ok(state_trees[rng.gen_range(0..state_trees.len())])
732}