Skip to main content

replay_core/
rpc.rs

1//! Helius RPC client. A trait + concrete impl so tests can mock.
2//!
3//! The real implementation uses reqwest directly instead of `solana-client`
4//! because we need JSON-RPC extras (Helius Enhanced APIs) and finer control
5//! over `minContextSlot` / encoding options than the SDK exposes cleanly.
6
7use crate::error::ReplayError;
8use crate::types::{FetchedTx, FetchedTxMeta};
9use async_trait::async_trait;
10use serde_json::{json, Value};
11use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature};
12use std::sync::Arc;
13use std::time::Duration;
14use tracing::{debug, warn};
15
16#[async_trait]
17pub trait HeliusClient: Send + Sync {
18    /// Fetch a full transaction by signature. Uses `encoding: "base64"` so
19    /// we can faithfully deserialize the versioned message.
20    async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError>;
21
22    /// Fetch account info at a specific slot, with `commitment: "confirmed"`
23    /// and `minContextSlot: slot`. Returns `None` if the account does not exist.
24    async fn get_account_info_at_slot(
25        &self,
26        pubkey: &Pubkey,
27        slot: u64,
28    ) -> Result<Option<Account>, ReplayError>;
29
30    /// Fetch current account info (no slot constraint). Used for IDL accounts
31    /// and similar cases where current state is what we want.
32    async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError>;
33}
34
35/// Blanket impl so `Arc<dyn HeliusClient>` can be passed wherever `&C: HeliusClient`.
36#[async_trait]
37impl HeliusClient for Arc<dyn HeliusClient> {
38    async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
39        self.as_ref().get_transaction(sig).await
40    }
41    async fn get_account_info_at_slot(
42        &self,
43        pubkey: &Pubkey,
44        slot: u64,
45    ) -> Result<Option<Account>, ReplayError> {
46        self.as_ref().get_account_info_at_slot(pubkey, slot).await
47    }
48    async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
49        self.as_ref().get_account_info(pubkey).await
50    }
51}
52
53#[derive(Clone)]
54pub struct HeliusRpcClient {
55    http: reqwest::Client,
56    rpc_url: String,
57}
58
59impl HeliusRpcClient {
60    /// Construct from a Helius API key. Uses the mainnet endpoint.
61    pub fn from_api_key(api_key: &str) -> Result<Self, ReplayError> {
62        Self::from_url(format!(
63            "https://mainnet.helius-rpc.com/?api-key={}",
64            api_key
65        ))
66    }
67
68    /// Construct from an explicit RPC URL (any JSON-RPC-compatible Solana
69    /// endpoint will work for the basic methods; historical queries are
70    /// Helius-dependent).
71    pub fn from_url(rpc_url: impl Into<String>) -> Result<Self, ReplayError> {
72        let http = reqwest::Client::builder()
73            .timeout(Duration::from_secs(30))
74            .build()?;
75        Ok(Self {
76            http,
77            rpc_url: rpc_url.into(),
78        })
79    }
80
81    async fn json_rpc(&self, method: &str, params: Value) -> Result<Value, ReplayError> {
82        let body = json!({
83            "jsonrpc": "2.0",
84            "id": 1,
85            "method": method,
86            "params": params,
87        });
88
89        for attempt in 0..3 {
90            let res = self.http.post(&self.rpc_url).json(&body).send().await?;
91            let status = res.status();
92
93            if status.as_u16() == 429 {
94                let wait = Duration::from_millis(500 * (1 << attempt));
95                warn!(?wait, "helius rate limited, backing off");
96                tokio::time::sleep(wait).await;
97                continue;
98            }
99
100            if !status.is_success() {
101                let text = res.text().await.unwrap_or_default();
102                return Err(ReplayError::Rpc(format!(
103                    "http {}: {}",
104                    status.as_u16(),
105                    text
106                )));
107            }
108
109            let value: Value = res.json().await?;
110            if let Some(err) = value.get("error") {
111                return Err(ReplayError::Rpc(err.to_string()));
112            }
113            return Ok(value
114                .get("result")
115                .cloned()
116                .unwrap_or(Value::Null));
117        }
118
119        Err(ReplayError::Rpc("exhausted retries on 429".to_string()))
120    }
121}
122
123#[async_trait]
124impl HeliusClient for HeliusRpcClient {
125    #[tracing::instrument(skip(self))]
126    async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
127        let params = json!([
128            sig.to_string(),
129            {
130                "encoding": "base64",
131                "commitment": "confirmed",
132                "maxSupportedTransactionVersion": 0
133            }
134        ]);
135
136        let raw = self.json_rpc("getTransaction", params).await?;
137        if raw.is_null() {
138            return Ok(None);
139        }
140
141        let slot = raw
142            .get("slot")
143            .and_then(|v| v.as_u64())
144            .ok_or_else(|| ReplayError::Rpc("getTransaction response missing slot".into()))?;
145
146        let block_time = raw.get("blockTime").and_then(|v| v.as_i64());
147
148        let transaction_base64 = raw
149            .get("transaction")
150            .and_then(|v| v.get(0))
151            .and_then(|v| v.as_str())
152            .ok_or_else(|| ReplayError::Rpc("getTransaction response missing transaction[0]".into()))?
153            .to_string();
154
155        let meta_value = raw
156            .get("meta")
157            .cloned()
158            .ok_or_else(|| ReplayError::Rpc("getTransaction response missing meta".into()))?;
159        let meta: FetchedTxMeta = serde_json::from_value(meta_value)?;
160
161        Ok(Some(FetchedTx {
162            slot,
163            block_time,
164            transaction_base64,
165            meta,
166        }))
167    }
168
169    #[tracing::instrument(skip(self))]
170    async fn get_account_info_at_slot(
171        &self,
172        pubkey: &Pubkey,
173        slot: u64,
174    ) -> Result<Option<Account>, ReplayError> {
175        // Honors the Day-1 contract: pass `minContextSlot: slot` to the RPC.
176        // CAVEAT: `minContextSlot` does NOT actually return historical state
177        // — it asserts the node has observed the given slot before answering.
178        // For TRUE historical state we rely on Helius enhanced APIs (Day 14)
179        // or on the fact that for recent txs (<~few hours) current state is
180        // close enough. See docs/04-solana-gotchas.md §12.
181        self.get_account_info_inner(pubkey, Some(slot)).await
182    }
183
184    #[tracing::instrument(skip(self))]
185    async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
186        self.get_account_info_inner(pubkey, None).await
187    }
188}
189
190impl HeliusRpcClient {
191    async fn get_account_info_inner(
192        &self,
193        pubkey: &Pubkey,
194        min_context_slot: Option<u64>,
195    ) -> Result<Option<Account>, ReplayError> {
196        let mut config = serde_json::Map::new();
197        config.insert("encoding".into(), Value::String("base64".into()));
198        config.insert("commitment".into(), Value::String("confirmed".into()));
199        if let Some(slot) = min_context_slot {
200            config.insert(
201                "minContextSlot".into(),
202                Value::Number(serde_json::Number::from(slot)),
203            );
204        }
205
206        let params = json!([pubkey.to_string(), Value::Object(config)]);
207
208        let raw = self.json_rpc("getAccountInfo", params).await?;
209
210        let v = match raw.get("value") {
211            Some(v) if !v.is_null() => v,
212            _ => {
213                debug!(?pubkey, "account not found");
214                return Ok(None);
215            }
216        };
217
218        let lamports = v
219            .get("lamports")
220            .and_then(|x| x.as_u64())
221            .ok_or_else(|| ReplayError::Rpc("account missing lamports".into()))?;
222
223        let owner_str = v
224            .get("owner")
225            .and_then(|x| x.as_str())
226            .ok_or_else(|| ReplayError::Rpc("account missing owner".into()))?;
227        let owner = owner_str
228            .parse::<Pubkey>()
229            .map_err(|e| ReplayError::Rpc(format!("invalid owner pubkey: {e}")))?;
230
231        let data_b64 = v
232            .get("data")
233            .and_then(|x| x.get(0))
234            .and_then(|x| x.as_str())
235            .unwrap_or("");
236        let data = if data_b64.is_empty() {
237            Vec::new()
238        } else {
239            use base64::Engine;
240            base64::engine::general_purpose::STANDARD
241                .decode(data_b64)
242                .map_err(|e| ReplayError::Rpc(format!("base64 decode failed: {e}")))?
243        };
244
245        let executable = v.get("executable").and_then(|x| x.as_bool()).unwrap_or(false);
246        let rent_epoch = v.get("rentEpoch").and_then(|x| x.as_u64()).unwrap_or(0);
247
248        Ok(Some(Account {
249            lamports,
250            data,
251            owner,
252            executable,
253            rent_epoch,
254        }))
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::test_support::MockHeliusClient;
262    use crate::types::FetchedTxMeta;
263
264    #[tokio::test]
265    async fn mock_client_returns_canned_tx() {
266        let sig = Signature::new_unique();
267        let mut mock = MockHeliusClient::default();
268        mock.txs.insert(
269            sig.to_string(),
270            FetchedTx {
271                slot: 12345,
272                block_time: Some(1_700_000_000),
273                transaction_base64: String::new(),
274                meta: FetchedTxMeta {
275                    err: None,
276                    log_messages: vec!["Program log: hi".to_string()],
277                    pre_balances: vec![1_000_000_000],
278                    post_balances: vec![999_995_000],
279                    loaded_addresses: None,
280                    compute_units_consumed: Some(1234),
281                    inner_instructions: None,
282                },
283            },
284        );
285
286        let result = mock.get_transaction(&sig).await.unwrap().unwrap();
287        assert_eq!(result.slot, 12345);
288        assert_eq!(result.meta.log_messages.len(), 1);
289    }
290}