1use crate::rpc::errors::RpcError;
2use crate::rpc::rpc_connection::RpcConnection;
3use crate::transaction_params::TransactionParams;
4use async_trait::async_trait;
5use borsh::BorshDeserialize;
6use log::warn;
7use solana_client::rpc_client::RpcClient;
8use solana_client::rpc_config::{RpcSendTransactionConfig, RpcTransactionConfig};
9use solana_program::clock::Slot;
10use solana_program::hash::Hash;
11use solana_program::pubkey::Pubkey;
12use solana_sdk::account::{Account, AccountSharedData};
13use solana_sdk::bs58;
14use solana_sdk::clock::UnixTimestamp;
15use solana_sdk::commitment_config::CommitmentConfig;
16use solana_sdk::epoch_info::EpochInfo;
17use solana_sdk::instruction::Instruction;
18use solana_sdk::signature::{Keypair, Signature};
19use solana_sdk::transaction::Transaction;
20use solana_transaction_status::option_serializer::OptionSerializer;
21use solana_transaction_status::{UiInstruction, UiTransactionEncoding};
22use std::fmt::{Debug, Display, Formatter};
23use std::time::Duration;
24use tokio::time::{sleep, Instant};
25
26pub enum SolanaRpcUrl {
27 Testnet,
28 Devnet,
29 Localnet,
30 ZKTestnet,
31 Custom(String),
32}
33
34impl Display for SolanaRpcUrl {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 let str = match self {
37 SolanaRpcUrl::Testnet => "https://api.testnet.solana.com".to_string(),
38 SolanaRpcUrl::Devnet => "https://api.devnet.solana.com".to_string(),
39 SolanaRpcUrl::Localnet => "http://localhost:8899".to_string(),
40 SolanaRpcUrl::ZKTestnet => "https://zk-testnet.helius.dev:8899".to_string(),
41 SolanaRpcUrl::Custom(url) => url.clone(),
42 };
43 write!(f, "{}", str)
44 }
45}
46
47#[derive(Clone, Debug, Copy)]
48pub struct RetryConfig {
49 pub max_retries: u32,
50 pub retry_delay: Duration,
51 pub timeout: Duration,
52}
53
54impl Default for RetryConfig {
55 fn default() -> Self {
56 RetryConfig {
57 max_retries: 10,
58 retry_delay: Duration::from_millis(100),
59 timeout: Duration::from_secs(60),
60 }
61 }
62}
63
64#[allow(dead_code)]
65pub struct SolanaRpcConnection {
66 pub client: RpcClient,
67 pub payer: Keypair,
68 retry_config: RetryConfig,
69}
70
71impl Debug for SolanaRpcConnection {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "SolanaRpcConnection {{ client: {:?} }}",
76 self.client.url()
77 )
78 }
79}
80
81impl SolanaRpcConnection {
82 pub fn new_with_retry<U: ToString>(
83 url: U,
84 commitment_config: Option<CommitmentConfig>,
85 retry_config: Option<RetryConfig>,
86 ) -> Self {
87 let payer = Keypair::new();
88 let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed());
89 let client = RpcClient::new_with_commitment(url.to_string(), commitment_config);
90 let retry_config = retry_config.unwrap_or_default();
91 Self {
92 client,
93 payer,
94 retry_config,
95 }
96 }
97
98 async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
99 where
100 F: Fn() -> Fut,
101 Fut: std::future::Future<Output = Result<T, RpcError>>,
102 {
103 let mut attempts = 0;
104 let start_time = Instant::now();
105 loop {
106 match operation().await {
107 Ok(result) => return Ok(result),
108 Err(e) => {
109 attempts += 1;
110 if attempts >= self.retry_config.max_retries
111 || start_time.elapsed() >= self.retry_config.timeout
112 {
113 return Err(e);
114 }
115 warn!(
116 "Operation failed, retrying in {:?} (attempt {}/{}): {:?}",
117 self.retry_config.retry_delay, attempts, self.retry_config.max_retries, e
118 );
119 sleep(self.retry_config.retry_delay).await;
120 }
121 }
122 }
123 }
124}
125
126impl SolanaRpcConnection {
127 fn parse_inner_instructions<T: BorshDeserialize>(
128 &self,
129 signature: Signature,
130 ) -> Result<T, RpcError> {
131 let rpc_transaction_config = RpcTransactionConfig {
132 encoding: Some(UiTransactionEncoding::Base64),
133 commitment: Some(self.client.commitment()),
134 ..Default::default()
135 };
136 let transaction = self
137 .client
138 .get_transaction_with_config(&signature, rpc_transaction_config)
139 .map_err(|e| RpcError::CustomError(e.to_string()))?;
140 let meta = transaction.transaction.meta.as_ref().ok_or_else(|| {
141 RpcError::CustomError("Transaction missing metadata information".to_string())
142 })?;
143 if meta.status.is_err() {
144 return Err(RpcError::CustomError(
145 "Transaction status indicates an error".to_string(),
146 ));
147 }
148
149 let inner_instructions = match &meta.inner_instructions {
150 OptionSerializer::Some(i) => i,
151 OptionSerializer::None => {
152 return Err(RpcError::CustomError(
153 "No inner instructions found".to_string(),
154 ));
155 }
156 OptionSerializer::Skip => {
157 return Err(RpcError::CustomError(
158 "No inner instructions found".to_string(),
159 ));
160 }
161 };
162
163 for ix in inner_instructions.iter() {
164 for ui_instruction in ix.instructions.iter() {
165 match ui_instruction {
166 UiInstruction::Compiled(ui_compiled_instruction) => {
167 let data = bs58::decode(&ui_compiled_instruction.data)
168 .into_vec()
169 .map_err(|_| {
170 RpcError::CustomError(
171 "Failed to decode instruction data".to_string(),
172 )
173 })?;
174
175 if let Ok(parsed_data) = T::try_from_slice(data.as_slice()) {
176 return Ok(parsed_data);
177 }
178 }
179 UiInstruction::Parsed(_) => {
180 println!("Parsed instructions are not implemented yet");
181 }
182 }
183 }
184 }
185 Err(RpcError::CustomError(
186 "Failed to find any parseable inner instructions".to_string(),
187 ))
188 }
189}
190
191#[async_trait]
192impl RpcConnection for SolanaRpcConnection {
193 fn new<U: ToString>(url: U, commitment_config: Option<CommitmentConfig>) -> Self
194 where
195 Self: Sized,
196 {
197 Self::new_with_retry(url, commitment_config, None)
198 }
199
200 fn get_payer(&self) -> &Keypair {
201 &self.payer
202 }
203
204 fn get_url(&self) -> String {
205 self.client.url()
206 }
207
208 async fn health(&self) -> Result<(), RpcError> {
209 self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
210 .await
211 }
212
213 async fn get_block_time(&self, slot: u64) -> Result<UnixTimestamp, RpcError> {
214 self.retry(|| async { self.client.get_block_time(slot).map_err(RpcError::from) })
215 .await
216 }
217
218 async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
219 self.retry(|| async { self.client.get_epoch_info().map_err(RpcError::from) })
220 .await
221 }
222
223 async fn get_program_accounts(
224 &self,
225 program_id: &Pubkey,
226 ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
227 self.retry(|| async {
228 self.client
229 .get_program_accounts(program_id)
230 .map_err(RpcError::from)
231 })
232 .await
233 }
234
235 async fn process_transaction(
236 &mut self,
237 transaction: Transaction,
238 ) -> Result<Signature, RpcError> {
239 self.retry(|| async {
240 self.client
241 .send_and_confirm_transaction(&transaction)
242 .map_err(RpcError::from)
243 })
244 .await
245 }
246
247 async fn process_transaction_with_context(
248 &mut self,
249 transaction: Transaction,
250 ) -> Result<(Signature, Slot), RpcError> {
251 self.retry(|| async {
252 let signature = self.client.send_and_confirm_transaction(&transaction)?;
253 let sig_info = self.client.get_signature_statuses(&[signature])?;
254 let slot = sig_info
255 .value
256 .first()
257 .and_then(|s| s.as_ref())
258 .map(|s| s.slot)
259 .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
260 Ok((signature, slot))
261 })
262 .await
263 }
264
265 async fn create_and_send_transaction_with_event<T>(
266 &mut self,
267 instructions: &[Instruction],
268 payer: &Pubkey,
269 signers: &[&Keypair],
270 transaction_params: Option<TransactionParams>,
271 ) -> Result<Option<(T, Signature, u64)>, RpcError>
272 where
273 T: BorshDeserialize + Send + Debug,
274 {
275 let pre_balance = self.client.get_balance(payer)?;
276 let latest_blockhash = self.client.get_latest_blockhash()?;
277
278 let mut instructions_vec = vec![
279 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1_000_000),
280 ];
281 instructions_vec.extend_from_slice(instructions);
282
283 let transaction = Transaction::new_signed_with_payer(
284 instructions_vec.as_slice(),
285 Some(payer),
286 signers,
287 latest_blockhash,
288 );
289
290 let (signature, slot) = self
291 .process_transaction_with_context(transaction.clone())
292 .await?;
293
294 let mut parsed_event = None;
295 for instruction in &transaction.message.instructions {
296 if let Ok(e) = T::deserialize(&mut &instruction.data[..]) {
297 parsed_event = Some(e);
298 break;
299 }
300 }
301
302 if parsed_event.is_none() {
303 parsed_event = self.parse_inner_instructions::<T>(signature).ok();
304 }
305
306 if let Some(transaction_params) = transaction_params {
307 let mut deduped_signers = signers.to_vec();
308 deduped_signers.dedup();
309 let post_balance = self.get_account(*payer).await?.unwrap().lamports;
310
311 let mut network_fee: i64 = 0;
313 if transaction_params.num_input_compressed_accounts != 0 {
314 network_fee += transaction_params.fee_config.network_fee as i64;
315 }
316 if transaction_params.num_new_addresses != 0 {
317 network_fee += transaction_params.fee_config.address_network_fee as i64;
318 }
319
320 let expected_post_balance = pre_balance as i64
321 - i64::from(transaction_params.num_new_addresses)
322 * transaction_params.fee_config.address_queue_rollover as i64
323 - i64::from(transaction_params.num_output_compressed_accounts)
324 * transaction_params.fee_config.state_merkle_tree_rollover as i64
325 - transaction_params.compress
326 - 5000 * deduped_signers.len() as i64
327 - network_fee;
328 if post_balance as i64 != expected_post_balance {
329 return Err(RpcError::AssertRpcError(format!("unexpected balance after transaction: expected {expected_post_balance}, got {post_balance}")));
330 }
331 }
332
333 let result = parsed_event.map(|e| (e, signature, slot));
334 Ok(result)
335 }
336
337 async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
338 self.retry(|| async {
339 self.client
340 .confirm_transaction(&signature)
341 .map_err(RpcError::from)
342 })
343 .await
344 }
345
346 async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError> {
347 self.retry(|| async {
348 self.client
349 .get_account_with_commitment(&address, self.client.commitment())
350 .map(|response| response.value)
351 .map_err(RpcError::from)
352 })
353 .await
354 }
355
356 fn set_account(&mut self, _address: &Pubkey, _account: &AccountSharedData) {
357 unimplemented!()
358 }
359
360 async fn get_minimum_balance_for_rent_exemption(
361 &mut self,
362 data_len: usize,
363 ) -> Result<u64, RpcError> {
364 self.retry(|| async {
365 self.client
366 .get_minimum_balance_for_rent_exemption(data_len)
367 .map_err(RpcError::from)
368 })
369 .await
370 }
371
372 async fn airdrop_lamports(
373 &mut self,
374 to: &Pubkey,
375 lamports: u64,
376 ) -> Result<Signature, RpcError> {
377 self.retry(|| async {
378 let signature = self
379 .client
380 .request_airdrop(to, lamports)
381 .map_err(RpcError::ClientError)?;
382 println!("Airdrop signature: {:?}", signature);
383 self.retry(|| async {
384 if self
385 .client
386 .confirm_transaction_with_commitment(&signature, self.client.commitment())?
387 .value
388 {
389 Ok(())
390 } else {
391 Err(RpcError::CustomError("Airdrop not confirmed".into()))
392 }
393 })
394 .await?;
395
396 Ok(signature)
397 })
398 .await
399 }
400
401 async fn get_balance(&mut self, pubkey: &Pubkey) -> Result<u64, RpcError> {
402 self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
403 .await
404 }
405
406 async fn get_latest_blockhash(&mut self) -> Result<Hash, RpcError> {
407 self.retry(|| async { self.client.get_latest_blockhash().map_err(RpcError::from) })
408 .await
409 }
410
411 async fn get_slot(&mut self) -> Result<u64, RpcError> {
412 self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
413 .await
414 }
415
416 async fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> {
417 Err(RpcError::CustomError(
418 "Warp to slot is not supported in SolanaRpcConnection".to_string(),
419 ))
420 }
421
422 async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
423 self.retry(|| async {
424 self.client
425 .send_transaction_with_config(
426 transaction,
427 RpcSendTransactionConfig {
428 skip_preflight: true,
429 max_retries: Some(self.retry_config.max_retries as usize),
430 ..Default::default()
431 },
432 )
433 .map_err(RpcError::from)
434 })
435 .await
436 }
437}