use std::{collections::HashMap, num::NonZeroU32, str::FromStr};
use alloy::primitives::{Address, U256};
use bytes::Bytes;
use nautilus_core::hex;
use nautilus_model::defi::rpc::{RpcLog, RpcNodeHttpResponse};
use nautilus_network::{
http::{HttpClient, Method},
ratelimiter::quota::Quota,
};
use serde::de::DeserializeOwned;
use crate::rpc::error::BlockchainRpcClientError;
#[derive(Debug)]
pub struct BlockchainHttpRpcClient {
http_rpc_url: String,
http_client: HttpClient,
}
impl BlockchainHttpRpcClient {
#[must_use]
pub fn new(http_rpc_url: String, rpc_request_per_second: Option<u32>) -> Self {
let default_quota =
rpc_request_per_second.and_then(|rps| Quota::per_second(NonZeroU32::new(rps)?));
let http_client = HttpClient::new(
HashMap::new(),
vec![],
Vec::new(),
default_quota,
None, None, )
.expect("Failed to create HTTP client");
Self {
http_rpc_url,
http_client,
}
}
async fn send_rpc_request(
&self,
rpc_request: serde_json::Value,
) -> Result<Bytes, BlockchainRpcClientError> {
let body_bytes = serde_json::to_vec(&rpc_request).map_err(|e| {
BlockchainRpcClientError::ClientError(format!("Failed to serialize request: {e}"))
})?;
let mut headers = HashMap::new();
headers.insert("Content-Type".to_string(), "application/json".to_string());
match self
.http_client
.request(
Method::POST,
self.http_rpc_url.clone(),
None,
Some(headers),
Some(body_bytes),
None,
None,
)
.await
{
Ok(response) => Ok(response.body),
Err(e) => Err(BlockchainRpcClientError::ClientError(e.to_string())),
}
}
pub async fn execute_rpc_call<T: DeserializeOwned>(
&self,
rpc_request: serde_json::Value,
) -> anyhow::Result<T> {
match self.send_rpc_request(rpc_request).await {
Ok(bytes) => match serde_json::from_slice::<RpcNodeHttpResponse<T>>(bytes.as_ref()) {
Ok(parsed) => {
if parsed.jsonrpc.is_none()
&& let (Some(code), Some(message)) = (parsed.code, parsed.message)
{
anyhow::bail!("RPC provider error {code}: {message}");
}
if let Some(error) = parsed.error {
Err(anyhow::anyhow!(
"RPC error {}: {}",
error.code,
error.message
))
} else if let Some(result) = parsed.result {
Ok(result)
} else {
Err(anyhow::anyhow!(
"Response missing both result and error fields"
))
}
}
Err(e) => {
let raw_response = String::from_utf8_lossy(bytes.as_ref());
let preview = if raw_response.len() > 500 {
format!(
"{}... (truncated, {} bytes total)",
&raw_response[..500],
raw_response.len()
)
} else {
raw_response.to_string()
};
Err(anyhow::anyhow!(
"Failed to parse eth call response: {e}\nRaw response: {preview}"
))
}
},
Err(e) => Err(anyhow::anyhow!(
"Failed to execute eth call RPC request: {e}"
)),
}
}
#[must_use]
pub fn construct_eth_call(
&self,
to: &str,
call_data: &[u8],
block: Option<u64>,
) -> serde_json::Value {
let encoded_data = hex::encode_prefixed(call_data);
let call = serde_json::json!({
"to": to,
"data": encoded_data
});
let block_param = if let Some(block_number) = block {
serde_json::json!(format!("0x{:x}", block_number))
} else {
serde_json::json!("latest")
};
serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_call",
"params": [call, block_param]
})
}
pub async fn get_balance(&self, address: &Address, block: Option<u64>) -> anyhow::Result<U256> {
let block_param = if let Some(block_number) = block {
serde_json::json!(format!("0x{:x}", block_number))
} else {
serde_json::json!("latest")
};
let request = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_getBalance",
"params": [address, block_param]
});
let hex_string: String = self.execute_rpc_call(request).await?;
U256::from_str(&hex_string)
.map_err(|e| anyhow::anyhow!("Failed to parse balance hex string '{hex_string}': {e}"))
}
pub async fn get_logs(
&self,
address: Option<&Address>,
topics: Option<Vec<Option<String>>>,
from_block: u64,
to_block: u64,
) -> anyhow::Result<Vec<RpcLog>> {
let mut filter = serde_json::Map::new();
filter.insert(
"fromBlock".to_string(),
serde_json::json!(format!("0x{:x}", from_block)),
);
filter.insert(
"toBlock".to_string(),
serde_json::json!(format!("0x{:x}", to_block)),
);
if let Some(addr) = address {
filter.insert(
"address".to_string(),
serde_json::json!(format!("{:?}", addr)),
);
}
if let Some(topics) = topics {
filter.insert("topics".to_string(), serde_json::json!(topics));
}
let request = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_getLogs",
"params": [filter]
});
self.execute_rpc_call(request).await
}
}