1use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15
16use chainindex_core::error::IndexerError;
17use chainindex_core::handler::DecodedEvent;
18use chainindex_core::indexer::IndexerConfig;
19use chainindex_core::types::{BlockSummary, EventFilter};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct AptosBlock {
28 pub height: u64,
30 pub hash: String,
32 pub timestamp: i64,
34 pub first_version: u64,
36 pub last_version: u64,
38 pub tx_count: u32,
40 pub epoch: u64,
42 pub round: u64,
44}
45
46impl AptosBlock {
47 pub fn to_block_summary(&self) -> BlockSummary {
48 BlockSummary {
49 number: self.height,
50 hash: self.hash.clone(),
51 parent_hash: format!("version:{}", self.first_version),
52 timestamp: self.timestamp,
53 tx_count: self.tx_count,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct AptosEvent {
65 pub type_tag: String,
67 pub sequence_number: u64,
69 pub data: serde_json::Value,
71 pub version: u64,
73 pub height: u64,
75 pub tx_hash: String,
77 pub account_address: String,
79 pub creation_number: u64,
81}
82
83impl AptosEvent {
84 pub fn module_name(&self) -> &str {
88 self.type_tag
89 .split("::")
90 .nth(1)
91 .unwrap_or("unknown")
92 }
93
94 pub fn event_name(&self) -> &str {
98 self.type_tag
99 .split("::")
100 .nth(2)
101 .unwrap_or("unknown")
102 }
103
104 pub fn type_address(&self) -> &str {
108 self.type_tag
109 .split("::")
110 .next()
111 .unwrap_or("0x0")
112 }
113
114 pub fn to_decoded_event(&self, chain: &str) -> DecodedEvent {
115 let schema = format!("{}::{}", self.module_name(), self.event_name());
116
117 DecodedEvent {
118 chain: chain.to_string(),
119 schema,
120 address: self.account_address.clone(),
121 tx_hash: self.tx_hash.clone(),
122 block_number: self.height,
123 log_index: self.sequence_number as u32,
124 fields_json: self.data.clone(),
125 }
126 }
127}
128
129#[async_trait]
134pub trait AptosRpcClient: Send + Sync {
135 async fn get_ledger_info(&self) -> Result<u64, IndexerError>;
137
138 async fn get_block_by_height(
140 &self,
141 height: u64,
142 ) -> Result<Option<AptosBlock>, IndexerError>;
143
144 async fn get_events(
146 &self,
147 account: &str,
148 event_handle: &str,
149 field_name: &str,
150 start: u64,
151 limit: u64,
152 ) -> Result<Vec<AptosEvent>, IndexerError>;
153
154 async fn get_transaction_events(
156 &self,
157 version: u64,
158 ) -> Result<Vec<AptosEvent>, IndexerError>;
159}
160
161#[derive(Debug, Clone, Default)]
167pub struct AptosEventFilter {
168 pub type_prefixes: Vec<String>,
170 pub modules: Vec<String>,
172 pub accounts: Vec<String>,
174}
175
176impl AptosEventFilter {
177 pub fn matches(&self, event: &AptosEvent) -> bool {
178 if !self.type_prefixes.is_empty()
179 && !self.type_prefixes.iter().any(|p| event.type_tag.starts_with(p))
180 {
181 return false;
182 }
183
184 if !self.modules.is_empty()
185 && !self.modules.iter().any(|m| m == event.module_name())
186 {
187 return false;
188 }
189
190 if !self.accounts.is_empty()
191 && !self.accounts.iter().any(|a| a == &event.account_address)
192 {
193 return false;
194 }
195
196 true
197 }
198}
199
200pub struct AptosResponseParser;
206
207impl AptosResponseParser {
208 pub fn parse_block(json: &serde_json::Value) -> Option<AptosBlock> {
210 let height_str = json["block_height"].as_str()?;
211 let height = height_str.parse::<u64>().ok()?;
212
213 let hash = json["block_hash"].as_str().unwrap_or_default().to_string();
214 let timestamp_us = json["block_timestamp"]
215 .as_str()
216 .and_then(|s| s.parse::<u64>().ok())
217 .unwrap_or(0);
218 let timestamp = (timestamp_us / 1_000_000) as i64;
219
220 let first_version = json["first_version"]
221 .as_str()
222 .and_then(|s| s.parse().ok())
223 .unwrap_or(0);
224 let last_version = json["last_version"]
225 .as_str()
226 .and_then(|s| s.parse().ok())
227 .unwrap_or(0);
228
229 let tx_count = json["transactions"]
230 .as_array()
231 .map(|a| a.len() as u32)
232 .unwrap_or_else(|| {
233 if last_version >= first_version {
234 (last_version - first_version + 1) as u32
235 } else {
236 0
237 }
238 });
239
240 Some(AptosBlock {
241 height,
242 hash,
243 timestamp,
244 first_version,
245 last_version,
246 tx_count,
247 epoch: json["epoch"].as_str().and_then(|s| s.parse().ok()).unwrap_or(0),
248 round: json["round"].as_str().and_then(|s| s.parse().ok()).unwrap_or(0),
249 })
250 }
251
252 pub fn parse_events(
254 json: &serde_json::Value,
255 height: u64,
256 ) -> Vec<AptosEvent> {
257 let events_array = json.as_array();
258 let Some(events) = events_array else {
259 return Vec::new();
260 };
261
262 events
263 .iter()
264 .filter_map(|ev| {
265 let type_tag = ev["type"].as_str()?.to_string();
266 let sequence_number = ev["sequence_number"]
267 .as_str()
268 .and_then(|s| s.parse().ok())
269 .unwrap_or(0);
270 let data = ev.get("data").cloned().unwrap_or(serde_json::Value::Null);
271 let version = ev["version"]
272 .as_str()
273 .and_then(|s| s.parse().ok())
274 .unwrap_or(0);
275
276 Some(AptosEvent {
277 type_tag,
278 sequence_number,
279 data,
280 version,
281 height,
282 tx_hash: ev["transaction_hash"]
283 .as_str()
284 .unwrap_or_default()
285 .to_string(),
286 account_address: ev["guid"]["account_address"]
287 .as_str()
288 .unwrap_or_default()
289 .to_string(),
290 creation_number: ev["guid"]["creation_number"]
291 .as_str()
292 .and_then(|s| s.parse().ok())
293 .unwrap_or(0),
294 })
295 })
296 .collect()
297 }
298}
299
300pub struct AptosIndexerBuilder {
305 from_height: u64,
306 to_height: Option<u64>,
307 type_prefixes: Vec<String>,
308 modules: Vec<String>,
309 accounts: Vec<String>,
310 batch_size: u64,
311 poll_interval_ms: u64,
312 checkpoint_interval: u64,
313 confirmation_depth: u64,
314 id: String,
315 chain: String,
316}
317
318impl AptosIndexerBuilder {
319 pub fn new() -> Self {
320 Self {
321 from_height: 0,
322 to_height: None,
323 type_prefixes: Vec::new(),
324 modules: Vec::new(),
325 accounts: Vec::new(),
326 batch_size: 100,
327 poll_interval_ms: 4000, checkpoint_interval: 100,
329 confirmation_depth: 1, id: "aptos-indexer".into(),
331 chain: "aptos".into(),
332 }
333 }
334
335 pub fn id(mut self, id: impl Into<String>) -> Self {
336 self.id = id.into();
337 self
338 }
339
340 pub fn chain(mut self, chain: impl Into<String>) -> Self {
341 self.chain = chain.into();
342 self
343 }
344
345 pub fn from_height(mut self, height: u64) -> Self {
346 self.from_height = height;
347 self
348 }
349
350 pub fn to_height(mut self, height: u64) -> Self {
351 self.to_height = Some(height);
352 self
353 }
354
355 pub fn type_prefix(mut self, prefix: impl Into<String>) -> Self {
356 self.type_prefixes.push(prefix.into());
357 self
358 }
359
360 pub fn module(mut self, module: impl Into<String>) -> Self {
361 self.modules.push(module.into());
362 self
363 }
364
365 pub fn account(mut self, account: impl Into<String>) -> Self {
366 self.accounts.push(account.into());
367 self
368 }
369
370 pub fn batch_size(mut self, size: u64) -> Self {
371 self.batch_size = size;
372 self
373 }
374
375 pub fn poll_interval_ms(mut self, ms: u64) -> Self {
376 self.poll_interval_ms = ms;
377 self
378 }
379
380 pub fn build_config(&self) -> IndexerConfig {
381 IndexerConfig {
382 id: self.id.clone(),
383 chain: self.chain.clone(),
384 from_block: self.from_height,
385 to_block: self.to_height,
386 confirmation_depth: self.confirmation_depth,
387 batch_size: self.batch_size,
388 checkpoint_interval: self.checkpoint_interval,
389 poll_interval_ms: self.poll_interval_ms,
390 filter: EventFilter {
391 addresses: self.accounts.clone(),
392 topic0_values: self.type_prefixes.clone(),
393 from_block: Some(self.from_height),
394 to_block: self.to_height,
395 },
396 }
397 }
398
399 pub fn build_filter(&self) -> AptosEventFilter {
400 AptosEventFilter {
401 type_prefixes: self.type_prefixes.clone(),
402 modules: self.modules.clone(),
403 accounts: self.accounts.clone(),
404 }
405 }
406}
407
408impl Default for AptosIndexerBuilder {
409 fn default() -> Self {
410 Self::new()
411 }
412}
413
414#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn block_to_summary() {
424 let block = AptosBlock {
425 height: 150_000_000,
426 hash: "0xabc".into(),
427 timestamp: 1700000000,
428 first_version: 500_000_000,
429 last_version: 500_000_050,
430 tx_count: 51,
431 epoch: 100,
432 round: 5,
433 };
434 let summary = block.to_block_summary();
435 assert_eq!(summary.number, 150_000_000);
436 assert_eq!(summary.hash, "0xabc");
437 assert_eq!(summary.tx_count, 51);
438 assert_eq!(summary.parent_hash, "version:500000000");
439 }
440
441 #[test]
442 fn event_type_parsing() {
443 let event = AptosEvent {
444 type_tag: "0x1::coin::DepositEvent".into(),
445 sequence_number: 42,
446 data: serde_json::json!({"amount": "1000"}),
447 version: 500_000_000,
448 height: 150_000_000,
449 tx_hash: "tx_hash_abc".into(),
450 account_address: "0xaccount".into(),
451 creation_number: 1,
452 };
453 assert_eq!(event.module_name(), "coin");
454 assert_eq!(event.event_name(), "DepositEvent");
455 assert_eq!(event.type_address(), "0x1");
456 }
457
458 #[test]
459 fn event_to_decoded() {
460 let event = AptosEvent {
461 type_tag: "0x1::coin::DepositEvent".into(),
462 sequence_number: 42,
463 data: serde_json::json!({"amount": "1000"}),
464 version: 500_000_000,
465 height: 150_000_000,
466 tx_hash: "tx_hash_abc".into(),
467 account_address: "0xaccount".into(),
468 creation_number: 1,
469 };
470 let decoded = event.to_decoded_event("aptos");
471 assert_eq!(decoded.chain, "aptos");
472 assert_eq!(decoded.schema, "coin::DepositEvent");
473 assert_eq!(decoded.address, "0xaccount");
474 assert_eq!(decoded.tx_hash, "tx_hash_abc");
475 assert_eq!(decoded.log_index, 42);
476 assert_eq!(decoded.fields_json["amount"], "1000");
477 }
478
479 #[test]
480 fn filter_type_prefix() {
481 let filter = AptosEventFilter {
482 type_prefixes: vec!["0x1::coin".into()],
483 ..Default::default()
484 };
485 let event = AptosEvent {
486 type_tag: "0x1::coin::DepositEvent".into(),
487 sequence_number: 0,
488 data: serde_json::Value::Null,
489 version: 0,
490 height: 0,
491 tx_hash: "".into(),
492 account_address: "".into(),
493 creation_number: 0,
494 };
495 assert!(filter.matches(&event));
496
497 let other = AptosEvent {
498 type_tag: "0x1::staking::StakeEvent".into(),
499 sequence_number: 0,
500 data: serde_json::Value::Null,
501 version: 0,
502 height: 0,
503 tx_hash: "".into(),
504 account_address: "".into(),
505 creation_number: 0,
506 };
507 assert!(!filter.matches(&other));
508 }
509
510 #[test]
511 fn filter_module() {
512 let filter = AptosEventFilter {
513 modules: vec!["coin".into()],
514 ..Default::default()
515 };
516 let event = AptosEvent {
517 type_tag: "0x1::coin::DepositEvent".into(),
518 sequence_number: 0,
519 data: serde_json::Value::Null,
520 version: 0,
521 height: 0,
522 tx_hash: "".into(),
523 account_address: "".into(),
524 creation_number: 0,
525 };
526 assert!(filter.matches(&event));
527 }
528
529 #[test]
530 fn filter_empty_matches_all() {
531 let filter = AptosEventFilter::default();
532 let event = AptosEvent {
533 type_tag: "anything".into(),
534 sequence_number: 0,
535 data: serde_json::Value::Null,
536 version: 0,
537 height: 0,
538 tx_hash: "".into(),
539 account_address: "".into(),
540 creation_number: 0,
541 };
542 assert!(filter.matches(&event));
543 }
544
545 #[test]
546 fn parse_block_json() {
547 let json = serde_json::json!({
548 "block_height": "150000000",
549 "block_hash": "0xblock_hash_abc",
550 "block_timestamp": "1700000000000000",
551 "first_version": "500000000",
552 "last_version": "500000050",
553 "epoch": "100",
554 "round": "5",
555 "transactions": [{"type": "user"}, {"type": "user"}]
556 });
557 let block = AptosResponseParser::parse_block(&json).unwrap();
558 assert_eq!(block.height, 150_000_000);
559 assert_eq!(block.hash, "0xblock_hash_abc");
560 assert_eq!(block.timestamp, 1700000000);
561 assert_eq!(block.first_version, 500_000_000);
562 assert_eq!(block.last_version, 500_000_050);
563 assert_eq!(block.tx_count, 2);
564 assert_eq!(block.epoch, 100);
565 }
566
567 #[test]
568 fn parse_events_json() {
569 let json = serde_json::json!([
570 {
571 "type": "0x1::coin::DepositEvent",
572 "sequence_number": "42",
573 "data": { "amount": "1000" },
574 "version": "500000000",
575 "transaction_hash": "tx_abc",
576 "guid": {
577 "account_address": "0xaccount",
578 "creation_number": "1"
579 }
580 }
581 ]);
582 let events = AptosResponseParser::parse_events(&json, 150_000_000);
583 assert_eq!(events.len(), 1);
584 assert_eq!(events[0].type_tag, "0x1::coin::DepositEvent");
585 assert_eq!(events[0].sequence_number, 42);
586 assert_eq!(events[0].account_address, "0xaccount");
587 }
588
589 #[test]
590 fn builder_defaults() {
591 let config = AptosIndexerBuilder::new().build_config();
592 assert_eq!(config.chain, "aptos");
593 assert_eq!(config.confirmation_depth, 1);
594 assert_eq!(config.poll_interval_ms, 4000);
595 }
596
597 #[test]
598 fn builder_custom() {
599 let builder = AptosIndexerBuilder::new()
600 .id("apt-idx")
601 .from_height(100_000_000)
602 .to_height(200_000_000)
603 .type_prefix("0x1::coin")
604 .module("coin")
605 .account("0xaccount1")
606 .batch_size(50);
607
608 let config = builder.build_config();
609 assert_eq!(config.id, "apt-idx");
610 assert_eq!(config.from_block, 100_000_000);
611 assert_eq!(config.to_block, Some(200_000_000));
612
613 let filter = builder.build_filter();
614 assert_eq!(filter.type_prefixes, vec!["0x1::coin"]);
615 assert_eq!(filter.modules, vec!["coin"]);
616 assert_eq!(filter.accounts, vec!["0xaccount1"]);
617 }
618
619 #[test]
620 fn block_serializable() {
621 let block = AptosBlock {
622 height: 100,
623 hash: "h".into(),
624 timestamp: 1000,
625 first_version: 500,
626 last_version: 550,
627 tx_count: 51,
628 epoch: 10,
629 round: 3,
630 };
631 let json = serde_json::to_string(&block).unwrap();
632 let back: AptosBlock = serde_json::from_str(&json).unwrap();
633 assert_eq!(back.height, 100);
634 assert_eq!(back.epoch, 10);
635 }
636
637 #[test]
638 fn event_serializable() {
639 let event = AptosEvent {
640 type_tag: "0x1::coin::DepositEvent".into(),
641 sequence_number: 0,
642 data: serde_json::json!({}),
643 version: 0,
644 height: 0,
645 tx_hash: "tx".into(),
646 account_address: "0x1".into(),
647 creation_number: 1,
648 };
649 let json = serde_json::to_string(&event).unwrap();
650 let back: AptosEvent = serde_json::from_str(&json).unwrap();
651 assert_eq!(back.type_tag, "0x1::coin::DepositEvent");
652 }
653}