Skip to main content

csv_adapter_aptos/
rpc.rs

1//! Aptos RPC trait and mock implementation
2
3/// Trait for Aptos RPC operations
4pub trait AptosRpc: Send + Sync {
5    fn get_ledger_info(&self) -> Result<AptosLedgerInfo, Box<dyn std::error::Error + Send + Sync>>;
6
7    /// Get the sender's account address
8    fn sender_address(&self) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>>;
9
10    /// Get the account sequence number (transaction count)
11    fn get_account_sequence_number(
12        &self,
13        address: [u8; 32],
14    ) -> Result<u64, Box<dyn std::error::Error + Send + Sync>>;
15
16    fn get_resource(
17        &self,
18        address: [u8; 32],
19        resource_type: &str,
20        position: Option<u64>,
21    ) -> Result<Option<AptosResource>, Box<dyn std::error::Error + Send + Sync>>;
22
23    fn get_transaction(
24        &self,
25        version: u64,
26    ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>>;
27
28    fn get_transactions(
29        &self,
30        start_version: u64,
31        limit: u32,
32    ) -> Result<Vec<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>>;
33
34    fn get_events(
35        &self,
36        event_handle: &str,
37        position: &str,
38        limit: u32,
39    ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>>;
40
41    fn submit_transaction(
42        &self,
43        tx_bytes: Vec<u8>,
44    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>>;
45
46    /// Submit a signed transaction as JSON to the Aptos REST API
47    /// Returns the transaction hash on success
48    fn submit_signed_transaction(
49        &self,
50        signed_tx_json: serde_json::Value,
51    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>>;
52
53    fn wait_for_transaction(
54        &self,
55        tx_hash: [u8; 32],
56    ) -> Result<AptosTransaction, Box<dyn std::error::Error + Send + Sync>>;
57
58    fn get_block_by_version(
59        &self,
60        version: u64,
61    ) -> Result<Option<AptosBlockInfo>, Box<dyn std::error::Error + Send + Sync>>;
62
63    fn get_events_by_account(
64        &self,
65        account: [u8; 32],
66        start: u64,
67        limit: u32,
68    ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>>;
69
70    fn get_latest_version(&self) -> Result<u64, Box<dyn std::error::Error + Send + Sync>>;
71
72    fn get_transaction_by_version(
73        &self,
74        version: u64,
75    ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>>;
76
77    fn publish_module(
78        &self,
79        tx_bytes: Vec<u8>,
80    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>>;
81
82    fn verify_checkpoint(
83        &self,
84        sequence_number: u64,
85    ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>>;
86
87    fn as_any(&self) -> &dyn std::any::Any {
88        unimplemented!("as_any() must be implemented by concrete types")
89    }
90}
91
92/// Aptos ledger info
93#[derive(Clone, Debug)]
94pub struct AptosLedgerInfo {
95    pub chain_id: u64,
96    pub epoch: u64,
97    pub ledger_version: u64,
98    pub oldest_ledger_version: u64,
99    pub ledger_timestamp: u64,
100    pub oldest_transaction_timestamp: u64,
101    pub epoch_start_timestamp: u64,
102}
103
104/// Aptos resource
105#[derive(Clone, Debug)]
106pub struct AptosResource {
107    pub data: Vec<u8>,
108}
109
110/// Aptos transaction
111#[derive(Clone, Debug)]
112pub struct AptosTransaction {
113    pub version: u64,
114    pub hash: [u8; 32],
115    pub state_change_hash: [u8; 32],
116    pub event_root_hash: [u8; 32],
117    pub state_checkpoint_hash: Option<[u8; 32]>,
118    pub epoch: u64,
119    pub round: u64,
120    pub events: Vec<AptosEvent>,
121    pub payload: Vec<u8>,
122    pub success: bool,
123    pub vm_status: String,
124    pub gas_used: u64,
125    pub cumulative_gas_used: u64,
126}
127
128/// Aptos event
129#[derive(Clone, Debug)]
130pub struct AptosEvent {
131    pub event_sequence_number: u64,
132    pub key: String,
133    pub data: Vec<u8>,
134    pub transaction_version: u64,
135}
136
137/// Aptos block info
138#[derive(Clone, Debug)]
139pub struct AptosBlockInfo {
140    pub version: u64,
141    pub block_hash: [u8; 32],
142    pub epoch: u64,
143    pub round: u64,
144    pub timestamp_usecs: u64,
145}
146
147/// Mock Aptos RPC for testing
148///
149/// This implementation is only compiled in debug builds to prevent
150/// accidental use in production environments.
151#[cfg(debug_assertions)]
152pub struct MockAptosRpc {
153    pub latest_version: u64,
154    pub chain_id: u64,
155    pub mock_address: [u8; 32],
156    pub tx_counter: std::sync::atomic::AtomicU64,
157    pub resources: std::sync::Mutex<std::collections::HashMap<([u8; 32], String), AptosResource>>,
158    pub transactions: std::sync::Mutex<std::collections::HashMap<u64, AptosTransaction>>,
159    pub events: std::sync::Mutex<std::collections::HashMap<String, Vec<AptosEvent>>>,
160    pub blocks: std::sync::Mutex<std::collections::HashMap<u64, AptosBlockInfo>>,
161    pub sent_transactions: std::sync::Mutex<Vec<Vec<u8>>>,
162    pub next_tx_events: std::sync::Mutex<Vec<AptosEvent>>,
163}
164
165#[cfg(debug_assertions)]
166impl MockAptosRpc {
167    pub fn new(latest_version: u64) -> Self {
168        Self {
169            latest_version,
170            chain_id: 1,
171            mock_address: [0x42; 32],
172            tx_counter: std::sync::atomic::AtomicU64::new(0),
173            resources: std::sync::Mutex::new(std::collections::HashMap::new()),
174            transactions: std::sync::Mutex::new(std::collections::HashMap::new()),
175            events: std::sync::Mutex::new(std::collections::HashMap::new()),
176            sent_transactions: std::sync::Mutex::new(Vec::new()),
177            blocks: std::sync::Mutex::new(std::collections::HashMap::new()),
178            next_tx_events: std::sync::Mutex::new(Vec::new()),
179        }
180    }
181
182    pub fn with_chain_id(latest_version: u64, chain_id: u64) -> Self {
183        Self {
184            latest_version,
185            chain_id,
186            ..Self::new(latest_version)
187        }
188    }
189
190    pub fn set_resource(&self, address: [u8; 32], resource_type: &str, resource: AptosResource) {
191        self.resources
192            .lock()
193            .unwrap()
194            .insert((address, resource_type.to_string()), resource);
195    }
196
197    pub fn add_transaction(&self, version: u64, tx: AptosTransaction) {
198        self.transactions.lock().unwrap().insert(version, tx);
199    }
200
201    pub fn add_event(&self, handle: &str, event: AptosEvent) {
202        self.events
203            .lock()
204            .unwrap()
205            .entry(handle.to_string())
206            .or_default()
207            .push(event);
208    }
209
210    pub fn add_events(&self, handle: &str, events: Vec<AptosEvent>) {
211        self.events
212            .lock()
213            .unwrap()
214            .entry(handle.to_string())
215            .or_default()
216            .extend(events);
217    }
218
219    pub fn set_block(&self, version: u64, block: AptosBlockInfo) {
220        self.blocks.lock().unwrap().insert(version, block);
221    }
222}
223
224#[cfg(debug_assertions)]
225impl AptosRpc for MockAptosRpc {
226    fn get_ledger_info(&self) -> Result<AptosLedgerInfo, Box<dyn std::error::Error + Send + Sync>> {
227        Ok(AptosLedgerInfo {
228            chain_id: self.chain_id,
229            epoch: 1,
230            ledger_version: self.latest_version,
231            oldest_ledger_version: 0,
232            ledger_timestamp: 0,
233            oldest_transaction_timestamp: 0,
234            epoch_start_timestamp: 0,
235        })
236    }
237
238    fn sender_address(&self) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
239        Ok(self.mock_address)
240    }
241
242    fn get_account_sequence_number(
243        &self,
244        _address: [u8; 32],
245    ) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
246        Ok(self.tx_counter.load(std::sync::atomic::Ordering::SeqCst))
247    }
248
249    fn get_resource(
250        &self,
251        address: [u8; 32],
252        resource_type: &str,
253        _position: Option<u64>,
254    ) -> Result<Option<AptosResource>, Box<dyn std::error::Error + Send + Sync>> {
255        Ok(self
256            .resources
257            .lock()
258            .unwrap()
259            .get(&(address, resource_type.to_string()))
260            .cloned())
261    }
262
263    fn get_transaction(
264        &self,
265        version: u64,
266    ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
267        Ok(self.transactions.lock().unwrap().get(&version).cloned())
268    }
269
270    fn get_transactions(
271        &self,
272        start_version: u64,
273        limit: u32,
274    ) -> Result<Vec<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
275        let transactions = self.transactions.lock().unwrap();
276        Ok(transactions
277            .iter()
278            .filter(|(v, _)| **v >= start_version)
279            .take(limit as usize)
280            .map(|(_, tx)| tx.clone())
281            .collect())
282    }
283
284    fn get_events(
285        &self,
286        event_handle: &str,
287        _position: &str,
288        limit: u32,
289    ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
290        Ok(self
291            .events
292            .lock()
293            .unwrap()
294            .get(event_handle)
295            .cloned()
296            .unwrap_or_default()
297            .into_iter()
298            .take(limit as usize)
299            .collect())
300    }
301
302    fn submit_transaction(
303        &self,
304        tx_bytes: Vec<u8>,
305    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
306        self.sent_transactions.lock().unwrap().push(tx_bytes);
307        Ok([0xAB; 32])
308    }
309
310    fn submit_signed_transaction(
311        &self,
312        signed_tx_json: serde_json::Value,
313    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
314        // Store the JSON and return a mock hash
315        let tx_bytes = serde_json::to_vec(&signed_tx_json).unwrap_or_default();
316        self.sent_transactions.lock().unwrap().push(tx_bytes);
317
318        // Extract payload arguments to build mock event data
319        // New format: consume_seal only takes commitment (seal is at signer's address)
320        if let Some(payload) = signed_tx_json.get("payload") {
321            if let Some(args) = payload.get("arguments").and_then(|a| a.as_array()) {
322                if !args.is_empty() {
323                    // Parse commitment from argument
324                    let commit_str = args[0].as_str().unwrap_or("");
325
326                    let commitment = if let Some(hex) = commit_str.strip_prefix("0x") {
327                        hex::decode(hex).unwrap_or_default()
328                    } else {
329                        hex::decode(commit_str).unwrap_or_default()
330                    };
331
332                    // Build event data: module_address (32) + seal_address (mock) + commitment (32)
333                    let mut event_data = vec![0u8; 96];
334                    // module_address (0x1 padded to 32)
335                    event_data[31] = 0x01;
336                    // seal_address (use mock_address)
337                    event_data[32..64].copy_from_slice(&self.mock_address);
338                    // commitment
339                    event_data[64..96].copy_from_slice(&commitment[..32.min(commitment.len())]);
340
341                    let mut events = self.next_tx_events.lock().unwrap();
342                    events.push(AptosEvent {
343                        data: event_data,
344                        event_sequence_number: 0,
345                        key: "CSV::AnchorEvent".to_string(),
346                        transaction_version: self.latest_version,
347                    });
348                }
349            }
350        }
351
352        let counter = self
353            .tx_counter
354            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
355        let mut hash = [0u8; 32];
356        hash[..4].copy_from_slice(b"mock");
357        hash[4..12].copy_from_slice(&counter.to_le_bytes());
358        Ok(hash)
359    }
360
361    fn wait_for_transaction(
362        &self,
363        _tx_hash: [u8; 32],
364    ) -> Result<AptosTransaction, Box<dyn std::error::Error + Send + Sync>> {
365        let events = self.next_tx_events.lock().unwrap().drain(..).collect();
366        let tx = AptosTransaction {
367            version: self.latest_version,
368            hash: [0xCD; 32],
369            state_change_hash: [0xEF; 32],
370            event_root_hash: [0x1F; 32],
371            state_checkpoint_hash: None,
372            epoch: 1,
373            round: 0,
374            events,
375            payload: Vec::new(),
376            success: true,
377            vm_status: "Executed".to_string(),
378            gas_used: 0,
379            cumulative_gas_used: 0,
380        };
381        // Add to transactions map so get_transaction_by_version can find it
382        self.transactions
383            .lock()
384            .unwrap()
385            .insert(self.latest_version, tx.clone());
386        Ok(tx)
387    }
388
389    fn get_block_by_version(
390        &self,
391        version: u64,
392    ) -> Result<Option<AptosBlockInfo>, Box<dyn std::error::Error + Send + Sync>> {
393        Ok(self.blocks.lock().unwrap().get(&version).cloned())
394    }
395
396    fn get_events_by_account(
397        &self,
398        _account: [u8; 32],
399        start: u64,
400        limit: u32,
401    ) -> Result<Vec<AptosEvent>, Box<dyn std::error::Error + Send + Sync>> {
402        let events = self.events.lock().unwrap();
403        Ok(events
404            .values()
405            .flatten()
406            .skip(start as usize)
407            .take(limit as usize)
408            .cloned()
409            .collect())
410    }
411
412    fn get_latest_version(&self) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
413        Ok(self.latest_version)
414    }
415
416    fn get_transaction_by_version(
417        &self,
418        version: u64,
419    ) -> Result<Option<AptosTransaction>, Box<dyn std::error::Error + Send + Sync>> {
420        Ok(self.transactions.lock().unwrap().get(&version).cloned())
421    }
422
423    fn publish_module(
424        &self,
425        tx_bytes: Vec<u8>,
426    ) -> Result<[u8; 32], Box<dyn std::error::Error + Send + Sync>> {
427        self.sent_transactions.lock().unwrap().push(tx_bytes);
428        Ok([0xAB; 32])
429    }
430
431    fn verify_checkpoint(
432        &self,
433        _sequence_number: u64,
434    ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
435        Ok(true)
436    }
437
438    fn as_any(&self) -> &dyn std::any::Any {
439        self
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    #[test]
448    fn test_mock_ledger_info() {
449        let rpc = MockAptosRpc::new(1000);
450        let info = rpc.get_ledger_info().unwrap();
451        assert_eq!(info.chain_id, 1);
452        assert_eq!(info.ledger_version, 1000);
453    }
454
455    #[test]
456    fn test_mock_resource() {
457        let rpc = MockAptosRpc::new(1000);
458        let address = [1u8; 32];
459        let resource = AptosResource {
460            data: vec![0xAB, 0xCD],
461        };
462        rpc.set_resource(address, "CSV::Seal", resource.clone());
463
464        let fetched = rpc.get_resource(address, "CSV::Seal", None).unwrap();
465        assert_eq!(fetched.unwrap().data, vec![0xAB, 0xCD]);
466    }
467
468    #[test]
469    fn test_mock_transaction() {
470        let rpc = MockAptosRpc::new(1000);
471        let tx = AptosTransaction {
472            version: 500,
473            hash: [1u8; 32],
474            state_change_hash: [2u8; 32],
475            event_root_hash: [3u8; 32],
476            state_checkpoint_hash: None,
477            epoch: 1,
478            round: 0,
479            events: vec![],
480            payload: vec![0x01, 0x02],
481            success: true,
482            vm_status: "Executed".to_string(),
483            gas_used: 100,
484            cumulative_gas_used: 100,
485        };
486        rpc.add_transaction(500, tx.clone());
487
488        let fetched = rpc.get_transaction(500).unwrap();
489        assert_eq!(fetched.unwrap().version, 500);
490    }
491
492    #[test]
493    fn test_mock_events() {
494        let rpc = MockAptosRpc::new(1000);
495        let event = AptosEvent {
496            event_sequence_number: 1,
497            key: "CSV::Seal".to_string(),
498            data: vec![0xAB, 0xCD],
499            transaction_version: 500,
500        };
501        rpc.add_event("CSV::Seal", event.clone());
502
503        let fetched = rpc.get_events("CSV::Seal", "0", 10).unwrap();
504        assert_eq!(fetched.len(), 1);
505    }
506
507    #[test]
508    fn test_mock_submit_transaction() {
509        let rpc = MockAptosRpc::new(1000);
510        let tx_hash = rpc.submit_transaction(vec![0x01, 0x02]).unwrap();
511        assert_eq!(tx_hash, [0xAB; 32]);
512    }
513
514    #[test]
515    fn test_mock_wait_for_transaction() {
516        let rpc = MockAptosRpc::new(1000);
517        let tx_hash = [1u8; 32];
518        let tx = rpc.wait_for_transaction(tx_hash).unwrap();
519        assert_eq!(tx.version, 1000);
520        assert!(tx.success);
521    }
522}