use crate::error::ReplayError;
use crate::types::{FetchedTx, FetchedTxMeta};
use async_trait::async_trait;
use serde_json::{json, Value};
use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, warn};
#[async_trait]
pub trait HeliusClient: Send + Sync {
async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError>;
async fn get_account_info_at_slot(
&self,
pubkey: &Pubkey,
slot: u64,
) -> Result<Option<Account>, ReplayError>;
async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError>;
}
#[async_trait]
impl HeliusClient for Arc<dyn HeliusClient> {
async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
self.as_ref().get_transaction(sig).await
}
async fn get_account_info_at_slot(
&self,
pubkey: &Pubkey,
slot: u64,
) -> Result<Option<Account>, ReplayError> {
self.as_ref().get_account_info_at_slot(pubkey, slot).await
}
async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
self.as_ref().get_account_info(pubkey).await
}
}
#[derive(Clone)]
pub struct HeliusRpcClient {
http: reqwest::Client,
rpc_url: String,
}
impl HeliusRpcClient {
pub fn from_api_key(api_key: &str) -> Result<Self, ReplayError> {
Self::from_url(format!(
"https://mainnet.helius-rpc.com/?api-key={}",
api_key
))
}
pub fn from_url(rpc_url: impl Into<String>) -> Result<Self, ReplayError> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
Ok(Self {
http,
rpc_url: rpc_url.into(),
})
}
async fn json_rpc(&self, method: &str, params: Value) -> Result<Value, ReplayError> {
let body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
});
for attempt in 0..3 {
let res = self.http.post(&self.rpc_url).json(&body).send().await?;
let status = res.status();
if status.as_u16() == 429 {
let wait = Duration::from_millis(500 * (1 << attempt));
warn!(?wait, "helius rate limited, backing off");
tokio::time::sleep(wait).await;
continue;
}
if !status.is_success() {
let text = res.text().await.unwrap_or_default();
return Err(ReplayError::Rpc(format!(
"http {}: {}",
status.as_u16(),
text
)));
}
let value: Value = res.json().await?;
if let Some(err) = value.get("error") {
return Err(ReplayError::Rpc(err.to_string()));
}
return Ok(value
.get("result")
.cloned()
.unwrap_or(Value::Null));
}
Err(ReplayError::Rpc("exhausted retries on 429".to_string()))
}
}
#[async_trait]
impl HeliusClient for HeliusRpcClient {
#[tracing::instrument(skip(self))]
async fn get_transaction(&self, sig: &Signature) -> Result<Option<FetchedTx>, ReplayError> {
let params = json!([
sig.to_string(),
{
"encoding": "base64",
"commitment": "confirmed",
"maxSupportedTransactionVersion": 0
}
]);
let raw = self.json_rpc("getTransaction", params).await?;
if raw.is_null() {
return Ok(None);
}
let slot = raw
.get("slot")
.and_then(|v| v.as_u64())
.ok_or_else(|| ReplayError::Rpc("getTransaction response missing slot".into()))?;
let block_time = raw.get("blockTime").and_then(|v| v.as_i64());
let transaction_base64 = raw
.get("transaction")
.and_then(|v| v.get(0))
.and_then(|v| v.as_str())
.ok_or_else(|| ReplayError::Rpc("getTransaction response missing transaction[0]".into()))?
.to_string();
let meta_value = raw
.get("meta")
.cloned()
.ok_or_else(|| ReplayError::Rpc("getTransaction response missing meta".into()))?;
let meta: FetchedTxMeta = serde_json::from_value(meta_value)?;
Ok(Some(FetchedTx {
slot,
block_time,
transaction_base64,
meta,
}))
}
#[tracing::instrument(skip(self))]
async fn get_account_info_at_slot(
&self,
pubkey: &Pubkey,
slot: u64,
) -> Result<Option<Account>, ReplayError> {
self.get_account_info_inner(pubkey, Some(slot)).await
}
#[tracing::instrument(skip(self))]
async fn get_account_info(&self, pubkey: &Pubkey) -> Result<Option<Account>, ReplayError> {
self.get_account_info_inner(pubkey, None).await
}
}
impl HeliusRpcClient {
async fn get_account_info_inner(
&self,
pubkey: &Pubkey,
min_context_slot: Option<u64>,
) -> Result<Option<Account>, ReplayError> {
let mut config = serde_json::Map::new();
config.insert("encoding".into(), Value::String("base64".into()));
config.insert("commitment".into(), Value::String("confirmed".into()));
if let Some(slot) = min_context_slot {
config.insert(
"minContextSlot".into(),
Value::Number(serde_json::Number::from(slot)),
);
}
let params = json!([pubkey.to_string(), Value::Object(config)]);
let raw = self.json_rpc("getAccountInfo", params).await?;
let v = match raw.get("value") {
Some(v) if !v.is_null() => v,
_ => {
debug!(?pubkey, "account not found");
return Ok(None);
}
};
let lamports = v
.get("lamports")
.and_then(|x| x.as_u64())
.ok_or_else(|| ReplayError::Rpc("account missing lamports".into()))?;
let owner_str = v
.get("owner")
.and_then(|x| x.as_str())
.ok_or_else(|| ReplayError::Rpc("account missing owner".into()))?;
let owner = owner_str
.parse::<Pubkey>()
.map_err(|e| ReplayError::Rpc(format!("invalid owner pubkey: {e}")))?;
let data_b64 = v
.get("data")
.and_then(|x| x.get(0))
.and_then(|x| x.as_str())
.unwrap_or("");
let data = if data_b64.is_empty() {
Vec::new()
} else {
use base64::Engine;
base64::engine::general_purpose::STANDARD
.decode(data_b64)
.map_err(|e| ReplayError::Rpc(format!("base64 decode failed: {e}")))?
};
let executable = v.get("executable").and_then(|x| x.as_bool()).unwrap_or(false);
let rent_epoch = v.get("rentEpoch").and_then(|x| x.as_u64()).unwrap_or(0);
Ok(Some(Account {
lamports,
data,
owner,
executable,
rent_epoch,
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::MockHeliusClient;
use crate::types::FetchedTxMeta;
#[tokio::test]
async fn mock_client_returns_canned_tx() {
let sig = Signature::new_unique();
let mut mock = MockHeliusClient::default();
mock.txs.insert(
sig.to_string(),
FetchedTx {
slot: 12345,
block_time: Some(1_700_000_000),
transaction_base64: String::new(),
meta: FetchedTxMeta {
err: None,
log_messages: vec!["Program log: hi".to_string()],
pre_balances: vec![1_000_000_000],
post_balances: vec![999_995_000],
loaded_addresses: None,
compute_units_consumed: Some(1234),
inner_instructions: None,
},
},
);
let result = mock.get_transaction(&sig).await.unwrap().unwrap();
assert_eq!(result.slot, 12345);
assert_eq!(result.meta.log_messages.len(), 1);
}
}