1use crate::error::ReplayError;
8use crate::types::{FetchedTx, FetchedTxMeta};
9use async_trait::async_trait;
10use serde_json::{json, Value};
11use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature};
12use std::sync::Arc;
13use std::time::Duration;
14use tracing::{debug, warn};
15
16#[async_trait]
17pub trait HeliusClient: Send + Sync {
18 async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError>;
21
22 async fn get_account_info_at_slot(
25 &self,
26 pubkey: &Pubkey,
27 slot: u64,
28 ) -> Result<Option<Account>, ReplayError>;
29
30 async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError>;
33}
34
35#[async_trait]
37impl HeliusClient for Arc<dyn HeliusClient> {
38 async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
39 self.as_ref().get_transaction(sig).await
40 }
41 async fn get_account_info_at_slot(
42 &self,
43 pubkey: &Pubkey,
44 slot: u64,
45 ) -> Result<Option<Account>, ReplayError> {
46 self.as_ref().get_account_info_at_slot(pubkey, slot).await
47 }
48 async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
49 self.as_ref().get_account_info(pubkey).await
50 }
51}
52
53#[derive(Clone)]
54pub struct HeliusRpcClient {
55 http: reqwest::Client,
56 rpc_url: String,
57}
58
59impl HeliusRpcClient {
60 pub fn from_api_key(api_key: &str) -> Result<Self, ReplayError> {
62 Self::from_url(format!(
63 "https://mainnet.helius-rpc.com/?api-key={}",
64 api_key
65 ))
66 }
67
68 pub fn from_url(rpc_url: impl Into<String>) -> Result<Self, ReplayError> {
72 let http = reqwest::Client::builder()
73 .timeout(Duration::from_secs(30))
74 .build()?;
75 Ok(Self {
76 http,
77 rpc_url: rpc_url.into(),
78 })
79 }
80
81 async fn json_rpc(&self, method: &str, params: Value) -> Result<Value, ReplayError> {
82 let body = json!({
83 "jsonrpc": "2.0",
84 "id": 1,
85 "method": method,
86 "params": params,
87 });
88
89 for attempt in 0..3 {
90 let res = self.http.post(&self.rpc_url).json(&body).send().await?;
91 let status = res.status();
92
93 if status.as_u16() == 429 {
94 let wait = Duration::from_millis(500 * (1 << attempt));
95 warn!(?wait, "helius rate limited, backing off");
96 tokio::time::sleep(wait).await;
97 continue;
98 }
99
100 if !status.is_success() {
101 let text = res.text().await.unwrap_or_default();
102 return Err(ReplayError::Rpc(format!(
103 "http {}: {}",
104 status.as_u16(),
105 text
106 )));
107 }
108
109 let value: Value = res.json().await?;
110 if let Some(err) = value.get("error") {
111 return Err(ReplayError::Rpc(err.to_string()));
112 }
113 return Ok(value
114 .get("result")
115 .cloned()
116 .unwrap_or(Value::Null));
117 }
118
119 Err(ReplayError::Rpc("exhausted retries on 429".to_string()))
120 }
121}
122
123#[async_trait]
124impl HeliusClient for HeliusRpcClient {
125 #[tracing::instrument(skip(self))]
126 async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
127 let params = json!([
128 sig.to_string(),
129 {
130 "encoding": "base64",
131 "commitment": "confirmed",
132 "maxSupportedTransactionVersion": 0
133 }
134 ]);
135
136 let raw = self.json_rpc("getTransaction", params).await?;
137 if raw.is_null() {
138 return Ok(None);
139 }
140
141 let slot = raw
142 .get("slot")
143 .and_then(|v| v.as_u64())
144 .ok_or_else(|| ReplayError::Rpc("getTransaction response missing slot".into()))?;
145
146 let block_time = raw.get("blockTime").and_then(|v| v.as_i64());
147
148 let transaction_base64 = raw
149 .get("transaction")
150 .and_then(|v| v.get(0))
151 .and_then(|v| v.as_str())
152 .ok_or_else(|| ReplayError::Rpc("getTransaction response missing transaction[0]".into()))?
153 .to_string();
154
155 let meta_value = raw
156 .get("meta")
157 .cloned()
158 .ok_or_else(|| ReplayError::Rpc("getTransaction response missing meta".into()))?;
159 let meta: FetchedTxMeta = serde_json::from_value(meta_value)?;
160
161 Ok(Some(FetchedTx {
162 slot,
163 block_time,
164 transaction_base64,
165 meta,
166 }))
167 }
168
169 #[tracing::instrument(skip(self))]
170 async fn get_account_info_at_slot(
171 &self,
172 pubkey: &Pubkey,
173 slot: u64,
174 ) -> Result<Option<Account>, ReplayError> {
175 self.get_account_info_inner(pubkey, Some(slot)).await
182 }
183
184 #[tracing::instrument(skip(self))]
185 async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
186 self.get_account_info_inner(pubkey, None).await
187 }
188}
189
190impl HeliusRpcClient {
191 async fn get_account_info_inner(
192 &self,
193 pubkey: &Pubkey,
194 min_context_slot: Option<u64>,
195 ) -> Result<Option<Account>, ReplayError> {
196 let mut config = serde_json::Map::new();
197 config.insert("encoding".into(), Value::String("base64".into()));
198 config.insert("commitment".into(), Value::String("confirmed".into()));
199 if let Some(slot) = min_context_slot {
200 config.insert(
201 "minContextSlot".into(),
202 Value::Number(serde_json::Number::from(slot)),
203 );
204 }
205
206 let params = json!([pubkey.to_string(), Value::Object(config)]);
207
208 let raw = self.json_rpc("getAccountInfo", params).await?;
209
210 let v = match raw.get("value") {
211 Some(v) if !v.is_null() => v,
212 _ => {
213 debug!(?pubkey, "account not found");
214 return Ok(None);
215 }
216 };
217
218 let lamports = v
219 .get("lamports")
220 .and_then(|x| x.as_u64())
221 .ok_or_else(|| ReplayError::Rpc("account missing lamports".into()))?;
222
223 let owner_str = v
224 .get("owner")
225 .and_then(|x| x.as_str())
226 .ok_or_else(|| ReplayError::Rpc("account missing owner".into()))?;
227 let owner = owner_str
228 .parse::<Pubkey>()
229 .map_err(|e| ReplayError::Rpc(format!("invalid owner pubkey: {e}")))?;
230
231 let data_b64 = v
232 .get("data")
233 .and_then(|x| x.get(0))
234 .and_then(|x| x.as_str())
235 .unwrap_or("");
236 let data = if data_b64.is_empty() {
237 Vec::new()
238 } else {
239 use base64::Engine;
240 base64::engine::general_purpose::STANDARD
241 .decode(data_b64)
242 .map_err(|e| ReplayError::Rpc(format!("base64 decode failed: {e}")))?
243 };
244
245 let executable = v.get("executable").and_then(|x| x.as_bool()).unwrap_or(false);
246 let rent_epoch = v.get("rentEpoch").and_then(|x| x.as_u64()).unwrap_or(0);
247
248 Ok(Some(Account {
249 lamports,
250 data,
251 owner,
252 executable,
253 rent_epoch,
254 }))
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::test_support::MockHeliusClient;
262 use crate::types::FetchedTxMeta;
263
264 #[tokio::test]
265 async fn mock_client_returns_canned_tx() {
266 let sig = Signature::new_unique();
267 let mut mock = MockHeliusClient::default();
268 mock.txs.insert(
269 sig.to_string(),
270 FetchedTx {
271 slot: 12345,
272 block_time: Some(1_700_000_000),
273 transaction_base64: String::new(),
274 meta: FetchedTxMeta {
275 err: None,
276 log_messages: vec!["Program log: hi".to_string()],
277 pre_balances: vec![1_000_000_000],
278 post_balances: vec![999_995_000],
279 loaded_addresses: None,
280 compute_units_consumed: Some(1234),
281 inner_instructions: None,
282 },
283 },
284 );
285
286 let result = mock.get_transaction(&sig).await.unwrap().unwrap();
287 assert_eq!(result.slot, 12345);
288 assert_eq!(result.meta.log_messages.len(), 1);
289 }
290}