1use reqwest::blocking::Client;
7use serde_json::Value;
8use std::time::{Duration, Instant};
9
10use crate::rpc::{
11 AptosBlockInfo, AptosEvent, AptosLedgerInfo, AptosResource, AptosRpc, AptosTransaction,
12};
13
14pub struct AptosRpcClient {
16 client: Client,
17 rpc_url: String,
18}
19
20impl AptosRpcClient {
21 pub fn new(rpc_url: &str) -> Self {
23 Self {
24 client: Client::builder()
25 .timeout(Duration::from_secs(30))
26 .build()
27 .expect("Failed to build HTTP client"),
28 rpc_url: rpc_url.trim_end_matches('/').to_string(),
29 }
30 }
31
32 fn get(&self, path: &str) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
34 let url = format!("{}/v1{}", self.rpc_url, path);
35 let response: Value = self.client.get(&url).send()?.json()?;
36 Ok(response)
37 }
38
39 fn post(
41 &self,
42 path: &str,
43 body: &Value,
44 ) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
45 let url = format!("{}/v1{}", self.rpc_url, path);
46 let response: Value = self.client.post(&url).json(body).send()?.json()?;
47 Ok(response)
48 }
49
50 fn parse_hex_bytes(hex_str: &str) -> [u8; 32] {
52 let hex = hex_str.trim_start_matches("0x");
53 if let Ok(bytes) = hex::decode(hex) {
54 let mut result = [0u8; 32];
55 let copy_len = bytes.len().min(32);
56 result[..copy_len].copy_from_slice(&bytes[..copy_len]);
57 result
58 } else {
59 [0u8; 32]
60 }
61 }
62
63 fn parse_opt_hex_bytes(hex_str: Option<&str>) -> Option<[u8; 32]> {
65 hex_str.map(Self::parse_hex_bytes)
66 }
67
68 fn parse_u64(value: &Value) -> u64 {
70 value.as_u64().unwrap_or_default()
71 }
72
73 fn format_address(addr: [u8; 32]) -> String {
75 format!("0x{}", hex::encode(addr))
76 }
77
78 fn parse_transaction(result: &Value) -> AptosTransaction {
80 let hash = Self::parse_hex_bytes(result["hash"].as_str().unwrap_or(""));
81 let version = Self::parse_u64(&result["version"]);
82 let success = result["success"].as_bool().unwrap_or(false);
83 let vm_status = result["vm_status"].as_str().unwrap_or("").to_string();
84 let epoch = Self::parse_u64(&result["epoch"]);
85 let round = Self::parse_u64(&result["round"]);
86 let gas_used = Self::parse_u64(&result["gas_used"]);
87 let cumulative_gas_used = Self::parse_u64(&result["cumulative_gas_used"]);
88
89 let state_change_hash =
91 Self::parse_hex_bytes(result["state_change_hash"].as_str().unwrap_or(""));
92 let event_root_hash =
93 Self::parse_hex_bytes(result["event_root_hash"].as_str().unwrap_or(""));
94 let state_checkpoint_hash =
95 Self::parse_opt_hex_bytes(result["state_checkpoint_hash"].as_str());
96
97 let events = result["events"]
99 .as_array()
100 .map(|arr| arr.iter().map(Self::parse_event).collect())
101 .unwrap_or_default();
102
103 let payload = result["payload"]
105 .as_array()
106 .map(|arr| {
107 arr.iter()
108 .filter_map(|v| v.as_u64().map(|n| n as u8))
109 .collect()
110 })
111 .unwrap_or_default();
112
113 AptosTransaction {
114 version,
115 hash,
116 state_change_hash,
117 event_root_hash,
118 state_checkpoint_hash,
119 epoch,
120 round,
121 events,
122 payload,
123 success,
124 vm_status,
125 gas_used,
126 cumulative_gas_used,
127 }
128 }
129
130 fn parse_event(value: &Value) -> AptosEvent {
132 let guid = &value["guid"];
133 let event_sequence_number = Self::parse_u64(&guid["creation_number"]);
134 let key = guid["id"]["creation_num"]
135 .as_str()
136 .unwrap_or("")
137 .to_string();
138 let data = value["data"]
139 .as_object()
140 .map(|obj| serde_json::to_vec(obj).unwrap_or_default())
141 .unwrap_or_default();
142 let transaction_version = Self::parse_u64(&value["version"]);
143
144 AptosEvent {
145 event_sequence_number,
146 key,
147 data,
148 transaction_version,
149 }
150 }
151}
152
153impl AptosRpc for AptosRpcClient {
154 fn get_ledger_info(&self) -> Result<AptosLedgerInfo, Box<dyn std::error::Error + Send + Sync>> {
155 let result = self.get("/")?;
156
157 Ok(AptosLedgerInfo {
158 chain_id: Self::parse_u64(&result["chain_id"]),
159 epoch: Self::parse_u64(&result["epoch"]),
160 ledger_version: Self::parse_u64(&result["ledger_version"]),
161 oldest_ledger_version: Self::parse_u64(&result["oldest_ledger_version"]),
162 ledger_timestamp: Self::parse_u64(&result["ledger_timestamp"]),
163 oldest_transaction_timestamp: Self::parse_u64(&result["oldest_transaction_timestamp"]),
164 epoch_start_timestamp: Self::parse_u64(&result["epoch_start_timestamp"]),
165 })
166 }
167
168 fn sender_address(&self) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
169 Err("sender_address not implemented for AptosRpcClient — set via with_real_rpc()".into())
171 }
172
173 fn get_account_sequence_number(
174 &self,
175 address: [u8; 32],
176 ) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
177 let addr_str = Self::format_address(address);
178 let result = self.get(&format!("/accounts/{}", addr_str))?;
179 Ok(Self::parse_u64(&result["sequence_number"]))
180 }
181
182 fn get_resource(
183 &self,
184 address: [u8; 32],
185 resource_type: &str,
186 _position: Option<u64>,
187 ) -> Result<Option<AptosResource>, Box<dyn std::error::Error + Send + Sync>> {
188 let addr_str = Self::format_address(address);
189 let result = self.get(&format!(
190 "/accounts/{}/resource/{}",
191 addr_str, resource_type
192 ))?;
193
194 if result.is_null() || result.get("type").is_none() {
195 return Ok(None);
196 }
197
198 let data_bytes = serde_json::to_vec(&result["data"]).unwrap_or_default();
199
200 Ok(Some(AptosResource { data: data_bytes }))
201 }
202
203 fn get_transaction(
204 &self,
205 version: u64,
206 ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
207 let result = self.get(&format!("/transactions/{}", version))?;
208
209 if result.get("hash").is_none() {
210 return Ok(None);
211 }
212
213 Ok(Some(Self::parse_transaction(&result)))
214 }
215
216 fn get_transactions(
217 &self,
218 start_version: u64,
219 limit: u32,
220 ) -> Result<Vec<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
221 let result = self.get(&format!(
222 "/transactions?start={}&limit={}",
223 start_version, limit
224 ))?;
225
226 if let Some(txs) = result.as_array() {
227 Ok(txs.iter().map(Self::parse_transaction).collect())
228 } else {
229 Ok(vec![])
230 }
231 }
232
233 fn get_events(
234 &self,
235 event_handle: &str,
236 _position: &str,
237 limit: u32,
238 ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
239 let result = self.get(&format!("/events?handle={}&limit={}", event_handle, limit))?;
241
242 if let Some(events) = result.as_array() {
243 Ok(events.iter().map(Self::parse_event).collect())
244 } else {
245 Ok(vec![])
246 }
247 }
248
249 fn submit_transaction(
250 &self,
251 tx_bytes: Vec<u8>,
252 ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
253 use sha3::{Digest, Sha3_256};
257
258 let mut hasher = Sha3_256::new();
261 hasher.update(&tx_bytes);
262 let result = hasher.finalize();
263 let mut hash = [0u8; 32];
264 hash.copy_from_slice(&result);
265 Ok(hash)
266 }
267
268 fn submit_signed_transaction(
269 &self,
270 signed_tx_json: serde_json::Value,
271 ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
272 let result = self.post("/transactions", &signed_tx_json)?;
274
275 if let Some(hash_hex) = result.get("hash").and_then(|h| h.as_str()) {
277 Ok(Self::parse_hex_bytes(hash_hex))
278 } else if let Some(error) = result.get("error_code") {
279 Err(format!(
280 "Aptos transaction submission failed: {} - {:?}",
281 error,
282 result.get("message")
283 )
284 .into())
285 } else {
286 Err(format!("Unexpected Aptos response: {:?}", result).into())
287 }
288 }
289
290 fn wait_for_transaction(
291 &self,
292 tx_hash: [u8; 32],
293 ) -> Result<AptosTransaction, Box<dyn std::error::Error + Send + Sync>> {
294 let hash_hex = format!("0x{}", hex::encode(tx_hash));
295 let timeout = Duration::from_secs(60);
296 let start = Instant::now();
297 let poll_interval = Duration::from_secs(2);
298
299 loop {
300 if start.elapsed() > timeout {
301 return Err("Timeout waiting for transaction confirmation".into());
302 }
303
304 if let Ok(result) = self.get(&format!("/transactions/by_hash/{}", hash_hex)) {
306 if result.get("hash").is_some() {
307 let tx = Self::parse_transaction(&result);
308
309 if tx.success {
310 return Ok(tx);
311 } else {
312 return Err(format!("Transaction failed: {}", tx.vm_status).into());
313 }
314 }
315 }
316
317 std::thread::sleep(poll_interval);
318 }
319 }
320
321 fn get_block_by_version(
322 &self,
323 version: u64,
324 ) -> Result<Option<AptosBlockInfo>, Box<dyn std::error::Error + Send + Sync>> {
325 let tx = self.get_transaction(version)?;
327 if let Some(tx) = tx {
328 Ok(Some(AptosBlockInfo {
329 version: tx.version,
330 block_hash: tx.state_checkpoint_hash.unwrap_or([0u8; 32]),
331 epoch: tx.epoch,
332 round: tx.round,
333 timestamp_usecs: 0, }))
335 } else {
336 Ok(None)
337 }
338 }
339
340 fn get_events_by_account(
341 &self,
342 account: [u8; 32],
343 start: u64,
344 limit: u32,
345 ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
346 let addr_str = Self::format_address(account);
347 let result = self.get(&format!(
348 "/accounts/{}/events?start={}&limit={}",
349 addr_str, start, limit
350 ))?;
351
352 if let Some(events) = result.as_array() {
353 Ok(events.iter().map(Self::parse_event).collect())
354 } else {
355 Ok(vec![])
356 }
357 }
358
359 fn get_latest_version(&self) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
360 let ledger = self.get_ledger_info()?;
361 Ok(ledger.ledger_version)
362 }
363
364 fn get_transaction_by_version(
365 &self,
366 version: u64,
367 ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
368 self.get_transaction(version)
369 }
370
371 fn publish_module(
372 &self,
373 tx_bytes: Vec<u8>,
374 ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
375 let mut hash = [0u8; 32];
377 hash[..12].copy_from_slice(b"aptos-module");
378 hash[12..].copy_from_slice(&tx_bytes[..20.min(tx_bytes.len())]);
379 if tx_bytes.len() < 20 {
380 hash[12 + tx_bytes.len()..].fill(0);
381 }
382 Ok(hash)
383 }
384
385 fn verify_checkpoint(
386 &self,
387 sequence_number: u64,
388 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
389 let sender = self.sender_address()?;
400 let current_seq = self.get_account_sequence_number(sender)?;
401 if current_seq < sequence_number {
402 return Ok(false);
403 }
404
405 Ok(true)
407 }
408
409 fn as_any(&self) -> &dyn std::any::Any {
410 self
411 }
412}