use reqwest::blocking::Client;
use serde_json::Value;
use std::time::{Duration, Instant};
use crate::rpc::{
AptosBlockInfo, AptosEvent, AptosLedgerInfo, AptosResource, AptosRpc, AptosTransaction,
};
pub struct AptosRpcClient {
client: Client,
rpc_url: String,
}
impl AptosRpcClient {
pub fn new(rpc_url: &str) -> Self {
Self {
client: Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to build HTTP client"),
rpc_url: rpc_url.trim_end_matches('/').to_string(),
}
}
fn get(&self, path: &str) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/v1{}", self.rpc_url, path);
let response: Value = self.client.get(&url).send()?.json()?;
Ok(response)
}
fn post(
&self,
path: &str,
body: &Value,
) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/v1{}", self.rpc_url, path);
let response: Value = self.client.post(&url).json(body).send()?.json()?;
Ok(response)
}
fn parse_hex_bytes(hex_str: &str) -> [u8; 32] {
let hex = hex_str.trim_start_matches("0x");
if let Ok(bytes) = hex::decode(hex) {
let mut result = [0u8; 32];
let copy_len = bytes.len().min(32);
result[..copy_len].copy_from_slice(&bytes[..copy_len]);
result
} else {
[0u8; 32]
}
}
fn parse_opt_hex_bytes(hex_str: Option<&str>) -> Option<[u8; 32]> {
hex_str.map(Self::parse_hex_bytes)
}
fn parse_u64(value: &Value) -> u64 {
value.as_u64().unwrap_or_default()
}
fn format_address(addr: [u8; 32]) -> String {
format!("0x{}", hex::encode(addr))
}
fn parse_transaction(result: &Value) -> AptosTransaction {
let hash = Self::parse_hex_bytes(result["hash"].as_str().unwrap_or(""));
let version = Self::parse_u64(&result["version"]);
let success = result["success"].as_bool().unwrap_or(false);
let vm_status = result["vm_status"].as_str().unwrap_or("").to_string();
let epoch = Self::parse_u64(&result["epoch"]);
let round = Self::parse_u64(&result["round"]);
let gas_used = Self::parse_u64(&result["gas_used"]);
let cumulative_gas_used = Self::parse_u64(&result["cumulative_gas_used"]);
let state_change_hash =
Self::parse_hex_bytes(result["state_change_hash"].as_str().unwrap_or(""));
let event_root_hash =
Self::parse_hex_bytes(result["event_root_hash"].as_str().unwrap_or(""));
let state_checkpoint_hash =
Self::parse_opt_hex_bytes(result["state_checkpoint_hash"].as_str());
let events = result["events"]
.as_array()
.map(|arr| arr.iter().map(Self::parse_event).collect())
.unwrap_or_default();
let payload = result["payload"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_u64().map(|n| n as u8))
.collect()
})
.unwrap_or_default();
AptosTransaction {
version,
hash,
state_change_hash,
event_root_hash,
state_checkpoint_hash,
epoch,
round,
events,
payload,
success,
vm_status,
gas_used,
cumulative_gas_used,
}
}
fn parse_event(value: &Value) -> AptosEvent {
let guid = &value["guid"];
let event_sequence_number = Self::parse_u64(&guid["creation_number"]);
let key = guid["id"]["creation_num"]
.as_str()
.unwrap_or("")
.to_string();
let data = value["data"]
.as_object()
.map(|obj| serde_json::to_vec(obj).unwrap_or_default())
.unwrap_or_default();
let transaction_version = Self::parse_u64(&value["version"]);
AptosEvent {
event_sequence_number,
key,
data,
transaction_version,
}
}
}
impl AptosRpc for AptosRpcClient {
fn get_ledger_info(&self) -> Result<AptosLedgerInfo, Box<dyn std::error::Error + Send + Sync>> {
let result = self.get("/")?;
Ok(AptosLedgerInfo {
chain_id: Self::parse_u64(&result["chain_id"]),
epoch: Self::parse_u64(&result["epoch"]),
ledger_version: Self::parse_u64(&result["ledger_version"]),
oldest_ledger_version: Self::parse_u64(&result["oldest_ledger_version"]),
ledger_timestamp: Self::parse_u64(&result["ledger_timestamp"]),
oldest_transaction_timestamp: Self::parse_u64(&result["oldest_transaction_timestamp"]),
epoch_start_timestamp: Self::parse_u64(&result["epoch_start_timestamp"]),
})
}
fn sender_address(&self) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
Err("sender_address not implemented for AptosRpcClient — set via with_real_rpc()".into())
}
fn get_account_sequence_number(
&self,
address: [u8; 32],
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let addr_str = Self::format_address(address);
let result = self.get(&format!("/accounts/{}", addr_str))?;
Ok(Self::parse_u64(&result["sequence_number"]))
}
fn get_resource(
&self,
address: [u8; 32],
resource_type: &str,
_position: Option<u64>,
) -> Result<Option<AptosResource>, Box<dyn std::error::Error + Send + Sync>> {
let addr_str = Self::format_address(address);
let result = self.get(&format!(
"/accounts/{}/resource/{}",
addr_str, resource_type
))?;
if result.is_null() || result.get("type").is_none() {
return Ok(None);
}
let data_bytes = serde_json::to_vec(&result["data"]).unwrap_or_default();
Ok(Some(AptosResource { data: data_bytes }))
}
fn get_transaction(
&self,
version: u64,
) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
let result = self.get(&format!("/transactions/{}", version))?;
if result.get("hash").is_none() {
return Ok(None);
}
Ok(Some(Self::parse_transaction(&result)))
}
fn get_transactions(
&self,
start_version: u64,
limit: u32,
) -> Result<Vec<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
let result = self.get(&format!(
"/transactions?start={}&limit={}",
start_version, limit
))?;
if let Some(txs) = result.as_array() {
Ok(txs.iter().map(Self::parse_transaction).collect())
} else {
Ok(vec![])
}
}
fn get_events(
&self,
event_handle: &str,
_position: &str,
limit: u32,
) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
let result = self.get(&format!("/events?handle={}&limit={}", event_handle, limit))?;
if let Some(events) = result.as_array() {
Ok(events.iter().map(Self::parse_event).collect())
} else {
Ok(vec![])
}
}
fn submit_transaction(
&self,
tx_bytes: Vec<u8>,
) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
use sha3::{Digest, Sha3_256};
let mut hasher = Sha3_256::new();
hasher.update(&tx_bytes);
let result = hasher.finalize();
let mut hash = [0u8; 32];
hash.copy_from_slice(&result);
Ok(hash)
}
fn submit_signed_transaction(
&self,
signed_tx_json: serde_json::Value,
) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
let result = self.post("/transactions", &signed_tx_json)?;
if let Some(hash_hex) = result.get("hash").and_then(|h| h.as_str()) {
Ok(Self::parse_hex_bytes(hash_hex))
} else if let Some(error) = result.get("error_code") {
Err(format!(
"Aptos transaction submission failed: {} - {:?}",
error,
result.get("message")
)
.into())
} else {
Err(format!("Unexpected Aptos response: {:?}", result).into())
}
}
fn wait_for_transaction(
&self,
tx_hash: [u8; 32],
) -> Result<AptosTransaction, Box<dyn std::error::Error + Send + Sync>> {
let hash_hex = format!("0x{}", hex::encode(tx_hash));
let timeout = Duration::from_secs(60);
let start = Instant::now();
let poll_interval = Duration::from_secs(2);
loop {
if start.elapsed() > timeout {
return Err("Timeout waiting for transaction confirmation".into());
}
if let Ok(result) = self.get(&format!("/transactions/by_hash/{}", hash_hex)) {
if result.get("hash").is_some() {
let tx = Self::parse_transaction(&result);
if tx.success {
return Ok(tx);
} else {
return Err(format!("Transaction failed: {}", tx.vm_status).into());
}
}
}
std::thread::sleep(poll_interval);
}
}
fn get_block_by_version(
&self,
version: u64,
) -> Result<Option<AptosBlockInfo>, Box<dyn std::error::Error + Send + Sync>> {
let tx = self.get_transaction(version)?;
if let Some(tx) = tx {
Ok(Some(AptosBlockInfo {
version: tx.version,
block_hash: tx.state_checkpoint_hash.unwrap_or([0u8; 32]),
epoch: tx.epoch,
round: tx.round,
timestamp_usecs: 0, }))
} else {
Ok(None)
}
}
fn get_events_by_account(
&self,
account: [u8; 32],
start: u64,
limit: u32,
) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
let addr_str = Self::format_address(account);
let result = self.get(&format!(
"/accounts/{}/events?start={}&limit={}",
addr_str, start, limit
))?;
if let Some(events) = result.as_array() {
Ok(events.iter().map(Self::parse_event).collect())
} else {
Ok(vec![])
}
}
fn get_latest_version(&self) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let ledger = self.get_ledger_info()?;
Ok(ledger.ledger_version)
}
fn get_transaction_by_version(
&self,
version: u64,
) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
self.get_transaction(version)
}
fn publish_module(
&self,
tx_bytes: Vec<u8>,
) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
let mut hash = [0u8; 32];
hash[..12].copy_from_slice(b"aptos-module");
hash[12..].copy_from_slice(&tx_bytes[..20.min(tx_bytes.len())]);
if tx_bytes.len() < 20 {
hash[12 + tx_bytes.len()..].fill(0);
}
Ok(hash)
}
fn verify_checkpoint(
&self,
sequence_number: u64,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let sender = self.sender_address()?;
let current_seq = self.get_account_sequence_number(sender)?;
if current_seq < sequence_number {
return Ok(false);
}
Ok(true)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}