1use crate::config::AptosConfig;
33use crate::error::{AptosError, AptosResult};
34use crate::retry::{RetryConfig, RetryExecutor};
35use crate::types::AccountAddress;
36use reqwest::Client;
37use serde::{Deserialize, Serialize};
38use std::sync::Arc;
39use url::Url;
40
41#[derive(Debug, Clone)]
62pub struct IndexerClient {
63 indexer_url: Url,
64 client: Client,
65 retry_config: Arc<RetryConfig>,
66}
67
68#[derive(Debug, Serialize)]
70struct GraphQLRequest {
71 query: String,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 variables: Option<serde_json::Value>,
74}
75
76#[derive(Debug, Deserialize)]
78struct GraphQLResponse<T> {
79 data: Option<T>,
80 errors: Option<Vec<GraphQLError>>,
81}
82
83#[derive(Debug, Deserialize)]
85struct GraphQLError {
86 message: String,
87}
88
89impl IndexerClient {
90 pub fn new(config: &AptosConfig) -> AptosResult<Self> {
103 let indexer_url = config
104 .indexer_url()
105 .cloned()
106 .ok_or_else(|| AptosError::Config("indexer URL not configured".into()))?;
107
108 let pool = config.pool_config();
109
110 let mut builder = Client::builder()
112 .timeout(config.timeout)
113 .pool_max_idle_per_host(pool.max_idle_per_host.unwrap_or(usize::MAX))
114 .pool_idle_timeout(pool.idle_timeout)
115 .tcp_nodelay(pool.tcp_nodelay);
116
117 if let Some(keepalive) = pool.tcp_keepalive {
118 builder = builder.tcp_keepalive(keepalive);
119 }
120
121 let client = builder.build().map_err(AptosError::Http)?;
122
123 let retry_config = Arc::new(config.retry_config().clone());
124
125 Ok(Self {
126 indexer_url,
127 client,
128 retry_config,
129 })
130 }
131
132 pub fn with_url(url: &str) -> AptosResult<Self> {
138 let indexer_url = Url::parse(url)?;
139 let client = Client::new();
140 Ok(Self {
141 indexer_url,
142 client,
143 retry_config: Arc::new(RetryConfig::default()),
144 })
145 }
146
147 pub async fn query<T: for<'de> Deserialize<'de> + Send + 'static>(
155 &self,
156 query: &str,
157 variables: Option<serde_json::Value>,
158 ) -> AptosResult<T> {
159 let request = GraphQLRequest {
160 query: query.to_string(),
161 variables,
162 };
163
164 let client = self.client.clone();
165 let url = self.indexer_url.clone();
166 let retry_config = self.retry_config.clone();
167
168 let executor = RetryExecutor::new((*retry_config).clone());
169 executor
170 .execute(|| {
171 let client = client.clone();
172 let url = url.clone();
173 let request = GraphQLRequest {
174 query: request.query.clone(),
175 variables: request.variables.clone(),
176 };
177 async move {
178 let response = client.post(url.as_str()).json(&request).send().await?;
179
180 if response.status().is_success() {
181 let graphql_response: GraphQLResponse<T> = response.json().await?;
182
183 if let Some(errors) = graphql_response.errors {
184 let messages: Vec<String> =
185 errors.iter().map(|e| e.message.clone()).collect();
186 return Err(AptosError::Api {
187 status_code: 400,
188 message: messages.join("; "),
189 error_code: Some("GRAPHQL_ERROR".to_string()),
190 vm_error_code: None,
191 });
192 }
193
194 graphql_response.data.ok_or_else(|| {
195 AptosError::Internal("no data in GraphQL response".into())
196 })
197 } else {
198 let status = response.status();
199 let body = response.text().await.unwrap_or_default();
200 Err(AptosError::api(status.as_u16(), body))
201 }
202 }
203 })
204 .await
205 }
206
207 pub async fn get_fungible_asset_balances(
213 &self,
214 address: AccountAddress,
215 ) -> AptosResult<Vec<FungibleAssetBalance>> {
216 #[derive(Deserialize)]
217 struct Response {
218 current_fungible_asset_balances: Vec<FungibleAssetBalance>,
219 }
220
221 let query = r"
222 query GetFungibleAssetBalances($address: String!) {
223 current_fungible_asset_balances(
224 where: { owner_address: { _eq: $address } }
225 ) {
226 asset_type
227 amount
228 metadata {
229 name
230 symbol
231 decimals
232 }
233 }
234 }
235 ";
236
237 let variables = serde_json::json!({
238 "address": address.to_string()
239 });
240
241 let response: Response = self.query(query, Some(variables)).await?;
242 Ok(response.current_fungible_asset_balances)
243 }
244
245 pub async fn get_account_tokens(
251 &self,
252 address: AccountAddress,
253 ) -> AptosResult<Vec<TokenBalance>> {
254 #[derive(Deserialize)]
255 struct Response {
256 current_token_ownerships_v2: Vec<TokenBalance>,
257 }
258
259 let query = r"
260 query GetAccountTokens($address: String!) {
261 current_token_ownerships_v2(
262 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
263 ) {
264 token_data_id
265 amount
266 current_token_data {
267 token_name
268 description
269 token_uri
270 current_collection {
271 collection_name
272 }
273 }
274 }
275 }
276 ";
277
278 let variables = serde_json::json!({
279 "address": address.to_string()
280 });
281
282 let response: Response = self.query(query, Some(variables)).await?;
283 Ok(response.current_token_ownerships_v2)
284 }
285
286 pub async fn get_account_transactions(
292 &self,
293 address: AccountAddress,
294 limit: Option<u32>,
295 ) -> AptosResult<Vec<Transaction>> {
296 #[derive(Deserialize)]
297 struct Response {
298 account_transactions: Vec<Transaction>,
299 }
300
301 let query = r"
302 query GetAccountTransactions($address: String!, $limit: Int!) {
303 account_transactions(
304 where: { account_address: { _eq: $address } }
305 order_by: { transaction_version: desc }
306 limit: $limit
307 ) {
308 transaction_version
309 coin_activities {
310 activity_type
311 amount
312 coin_type
313 }
314 }
315 }
316 ";
317
318 let variables = serde_json::json!({
319 "address": address.to_string(),
320 "limit": limit.unwrap_or(25)
321 });
322
323 let response: Response = self.query(query, Some(variables)).await?;
324 Ok(response.account_transactions)
325 }
326}
327
328#[derive(Debug, Clone, Deserialize)]
330pub struct FungibleAssetBalance {
331 pub asset_type: String,
333 pub amount: String,
335 pub metadata: Option<FungibleAssetMetadata>,
337}
338
339#[derive(Debug, Clone, Deserialize)]
341pub struct FungibleAssetMetadata {
342 pub name: String,
344 pub symbol: String,
346 pub decimals: u8,
348}
349
350#[derive(Debug, Clone, Deserialize)]
352pub struct TokenBalance {
353 pub token_data_id: String,
355 pub amount: String,
357 pub current_token_data: Option<TokenData>,
359}
360
361#[derive(Debug, Clone, Deserialize)]
363pub struct TokenData {
364 pub token_name: String,
366 pub description: String,
368 pub token_uri: String,
370 pub current_collection: Option<CollectionData>,
372}
373
374#[derive(Debug, Clone, Deserialize)]
376pub struct CollectionData {
377 pub collection_name: String,
379}
380
381#[derive(Debug, Clone, Deserialize)]
383pub struct Transaction {
384 pub transaction_version: String,
386 pub coin_activities: Vec<CoinActivity>,
388}
389
390#[derive(Debug, Clone, Deserialize)]
392pub struct CoinActivity {
393 pub activity_type: String,
395 pub amount: Option<String>,
397 pub coin_type: String,
399}
400
401#[derive(Debug, Clone, Default)]
403pub struct PaginationParams {
404 pub limit: u32,
406 pub offset: u32,
408}
409
410impl PaginationParams {
411 pub fn new(limit: u32, offset: u32) -> Self {
413 Self { limit, offset }
414 }
415
416 pub fn first(limit: u32) -> Self {
418 Self { limit, offset: 0 }
419 }
420}
421
422#[derive(Debug, Clone)]
424pub struct Page<T> {
425 pub items: Vec<T>,
427 pub has_more: bool,
429 pub total_count: Option<u64>,
431}
432
433#[derive(Debug, Clone, Deserialize)]
435pub struct Event {
436 pub sequence_number: String,
438 #[serde(rename = "type")]
440 pub event_type: String,
441 pub data: serde_json::Value,
443 pub transaction_version: Option<String>,
445 pub account_address: Option<String>,
447 pub creation_number: Option<String>,
449}
450
451#[derive(Debug, Clone, Deserialize)]
453pub struct Collection {
454 pub collection_id: String,
456 pub collection_name: String,
458 pub creator_address: String,
460 pub current_supply: String,
462 pub max_supply: Option<String>,
464 pub uri: String,
466 pub description: String,
468}
469
470#[derive(Debug, Clone, Deserialize)]
472pub struct CoinBalance {
473 pub coin_type: String,
475 pub amount: String,
477}
478
479#[derive(Debug, Clone, Deserialize)]
481pub struct ProcessorStatus {
482 pub processor: String,
484 pub last_success_version: u64,
486 pub last_updated: Option<String>,
488}
489
490impl IndexerClient {
491 pub async fn get_account_tokens_paginated(
499 &self,
500 address: AccountAddress,
501 pagination: Option<PaginationParams>,
502 ) -> AptosResult<Page<TokenBalance>> {
503 #[derive(Deserialize)]
504 struct AggregateCount {
505 count: u64,
506 }
507
508 #[derive(Deserialize)]
509 struct Aggregate {
510 aggregate: Option<AggregateCount>,
511 }
512
513 #[derive(Deserialize)]
514 struct Response {
515 current_token_ownerships_v2: Vec<TokenBalance>,
516 current_token_ownerships_v2_aggregate: Aggregate,
517 }
518
519 let pagination = pagination.unwrap_or(PaginationParams {
520 limit: 25,
521 offset: 0,
522 });
523
524 let query = r"
525 query GetAccountTokens($address: String!, $limit: Int!, $offset: Int!) {
526 current_token_ownerships_v2(
527 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
528 limit: $limit
529 offset: $offset
530 ) {
531 token_data_id
532 amount
533 current_token_data {
534 token_name
535 description
536 token_uri
537 current_collection {
538 collection_name
539 }
540 }
541 }
542 current_token_ownerships_v2_aggregate(
543 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
544 ) {
545 aggregate {
546 count
547 }
548 }
549 }
550 ";
551
552 let variables = serde_json::json!({
553 "address": address.to_string(),
554 "limit": pagination.limit,
555 "offset": pagination.offset
556 });
557
558 let response: Response = self.query(query, Some(variables)).await?;
559 let total_count = response
560 .current_token_ownerships_v2_aggregate
561 .aggregate
562 .map(|a| a.count);
563 let has_more = total_count.is_some_and(|total| {
564 (u64::from(pagination.offset) + response.current_token_ownerships_v2.len() as u64)
565 < total
566 });
567
568 Ok(Page {
569 items: response.current_token_ownerships_v2,
570 has_more,
571 total_count,
572 })
573 }
574
575 pub async fn get_account_transactions_paginated(
581 &self,
582 address: AccountAddress,
583 pagination: Option<PaginationParams>,
584 ) -> AptosResult<Page<Transaction>> {
585 #[derive(Deserialize)]
586 struct AggregateCount {
587 count: u64,
588 }
589
590 #[derive(Deserialize)]
591 struct Aggregate {
592 aggregate: Option<AggregateCount>,
593 }
594
595 #[derive(Deserialize)]
596 struct Response {
597 account_transactions: Vec<Transaction>,
598 account_transactions_aggregate: Aggregate,
599 }
600
601 let pagination = pagination.unwrap_or(PaginationParams {
602 limit: 25,
603 offset: 0,
604 });
605
606 let query = r"
607 query GetAccountTransactions($address: String!, $limit: Int!, $offset: Int!) {
608 account_transactions(
609 where: { account_address: { _eq: $address } }
610 order_by: { transaction_version: desc }
611 limit: $limit
612 offset: $offset
613 ) {
614 transaction_version
615 coin_activities {
616 activity_type
617 amount
618 coin_type
619 }
620 }
621 account_transactions_aggregate(
622 where: { account_address: { _eq: $address } }
623 ) {
624 aggregate {
625 count
626 }
627 }
628 }
629 ";
630
631 let variables = serde_json::json!({
632 "address": address.to_string(),
633 "limit": pagination.limit,
634 "offset": pagination.offset
635 });
636
637 let response: Response = self.query(query, Some(variables)).await?;
638 let total_count = response
639 .account_transactions_aggregate
640 .aggregate
641 .map(|a| a.count);
642 let has_more = total_count.is_some_and(|total| {
643 (u64::from(pagination.offset) + response.account_transactions.len() as u64) < total
644 });
645
646 Ok(Page {
647 items: response.account_transactions,
648 has_more,
649 total_count,
650 })
651 }
652
653 pub async fn get_events_by_type(
659 &self,
660 event_type: &str,
661 limit: Option<u32>,
662 ) -> AptosResult<Vec<Event>> {
663 #[derive(Deserialize)]
664 struct Response {
665 events: Vec<Event>,
666 }
667
668 let query = r"
669 query GetEventsByType($type: String!, $limit: Int!) {
670 events(
671 where: { type: { _eq: $type } }
672 order_by: { transaction_version: desc }
673 limit: $limit
674 ) {
675 sequence_number
676 type
677 data
678 transaction_version
679 account_address
680 creation_number
681 }
682 }
683 ";
684
685 let variables = serde_json::json!({
686 "type": event_type,
687 "limit": limit.unwrap_or(25)
688 });
689
690 let response: Response = self.query(query, Some(variables)).await?;
691 Ok(response.events)
692 }
693
694 pub async fn get_events_by_account(
700 &self,
701 address: AccountAddress,
702 limit: Option<u32>,
703 ) -> AptosResult<Vec<Event>> {
704 #[derive(Deserialize)]
705 struct Response {
706 events: Vec<Event>,
707 }
708
709 let query = r"
710 query GetEventsByAccount($address: String!, $limit: Int!) {
711 events(
712 where: { account_address: { _eq: $address } }
713 order_by: { transaction_version: desc }
714 limit: $limit
715 ) {
716 sequence_number
717 type
718 data
719 transaction_version
720 account_address
721 creation_number
722 }
723 }
724 ";
725
726 let variables = serde_json::json!({
727 "address": address.to_string(),
728 "limit": limit.unwrap_or(25)
729 });
730
731 let response: Response = self.query(query, Some(variables)).await?;
732 Ok(response.events)
733 }
734
735 pub async fn get_collection(
742 &self,
743 collection_address: AccountAddress,
744 ) -> AptosResult<Collection> {
745 #[derive(Deserialize)]
746 struct Response {
747 current_collections_v2: Vec<Collection>,
748 }
749
750 let query = r"
751 query GetCollection($address: String!) {
752 current_collections_v2(
753 where: { collection_id: { _eq: $address } }
754 limit: 1
755 ) {
756 collection_id
757 collection_name
758 creator_address
759 current_supply
760 max_supply
761 uri
762 description
763 }
764 }
765 ";
766
767 let variables = serde_json::json!({
768 "address": collection_address.to_string()
769 });
770
771 let response: Response = self.query(query, Some(variables)).await?;
772 response
773 .current_collections_v2
774 .into_iter()
775 .next()
776 .ok_or_else(|| {
777 AptosError::NotFound(format!("Collection not found: {collection_address}"))
778 })
779 }
780
781 pub async fn get_collection_tokens(
787 &self,
788 collection_address: AccountAddress,
789 pagination: Option<PaginationParams>,
790 ) -> AptosResult<Page<TokenBalance>> {
791 #[derive(Deserialize)]
792 struct Response {
793 current_token_ownerships_v2: Vec<TokenBalance>,
794 }
795
796 let pagination = pagination.unwrap_or(PaginationParams {
797 limit: 25,
798 offset: 0,
799 });
800
801 let query = r"
802 query GetCollectionTokens($address: String!, $limit: Int!, $offset: Int!) {
803 current_token_ownerships_v2(
804 where: {
805 current_token_data: {
806 current_collection: {
807 collection_id: { _eq: $address }
808 }
809 }
810 amount: { _gt: 0 }
811 }
812 limit: $limit
813 offset: $offset
814 ) {
815 token_data_id
816 amount
817 current_token_data {
818 token_name
819 description
820 token_uri
821 current_collection {
822 collection_name
823 }
824 }
825 }
826 }
827 ";
828
829 let variables = serde_json::json!({
830 "address": collection_address.to_string(),
831 "limit": pagination.limit,
832 "offset": pagination.offset
833 });
834
835 let response: Response = self.query(query, Some(variables)).await?;
836 let items_count = response.current_token_ownerships_v2.len();
837
838 Ok(Page {
839 items: response.current_token_ownerships_v2,
840 has_more: items_count == pagination.limit as usize,
841 total_count: None,
842 })
843 }
844
845 pub async fn get_coin_balances(
851 &self,
852 address: AccountAddress,
853 ) -> AptosResult<Vec<CoinBalance>> {
854 #[derive(Deserialize)]
855 struct Response {
856 current_coin_balances: Vec<CoinBalance>,
857 }
858
859 let query = r"
860 query GetCoinBalances($address: String!) {
861 current_coin_balances(
862 where: { owner_address: { _eq: $address } }
863 ) {
864 coin_type
865 amount
866 }
867 }
868 ";
869
870 let variables = serde_json::json!({
871 "address": address.to_string()
872 });
873
874 let response: Response = self.query(query, Some(variables)).await?;
875 Ok(response.current_coin_balances)
876 }
877
878 pub async fn get_coin_activities(
884 &self,
885 address: AccountAddress,
886 limit: Option<u32>,
887 ) -> AptosResult<Vec<CoinActivity>> {
888 #[derive(Deserialize)]
889 struct Response {
890 coin_activities: Vec<CoinActivity>,
891 }
892
893 let query = r"
894 query GetCoinActivities($address: String!, $limit: Int!) {
895 coin_activities(
896 where: { owner_address: { _eq: $address } }
897 order_by: { transaction_version: desc }
898 limit: $limit
899 ) {
900 activity_type
901 amount
902 coin_type
903 }
904 }
905 ";
906
907 let variables = serde_json::json!({
908 "address": address.to_string(),
909 "limit": limit.unwrap_or(25)
910 });
911
912 let response: Response = self.query(query, Some(variables)).await?;
913 Ok(response.coin_activities)
914 }
915
916 pub async fn get_processor_status(&self) -> AptosResult<Vec<ProcessorStatus>> {
922 #[derive(Deserialize)]
923 struct Response {
924 processor_status: Vec<ProcessorStatus>,
925 }
926
927 let query = r"
928 query GetProcessorStatus {
929 processor_status {
930 processor
931 last_success_version
932 last_updated
933 }
934 }
935 ";
936
937 let response: Response = self.query(query, None).await?;
938 Ok(response.processor_status)
939 }
940
941 pub async fn get_indexer_version(&self) -> AptosResult<u64> {
948 let statuses = self.get_processor_status().await?;
949 statuses
950 .into_iter()
951 .map(|s| s.last_success_version)
952 .max()
953 .ok_or_else(|| AptosError::Internal("No processor status available".into()))
954 }
955
956 pub async fn check_indexer_lag(
962 &self,
963 reference_version: u64,
964 max_lag: u64,
965 ) -> AptosResult<bool> {
966 let indexer_version = self.get_indexer_version().await?;
967 Ok(reference_version.saturating_sub(indexer_version) <= max_lag)
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use super::*;
974
975 #[test]
976 fn test_indexer_client_creation() {
977 let client = IndexerClient::new(&AptosConfig::testnet());
978 assert!(client.is_ok());
979 }
980
981 #[test]
982 fn test_pagination_params() {
983 let params = PaginationParams::new(10, 20);
984 assert_eq!(params.limit, 10);
985 assert_eq!(params.offset, 20);
986
987 let first_page = PaginationParams::first(50);
988 assert_eq!(first_page.limit, 50);
989 assert_eq!(first_page.offset, 0);
990 }
991
992 #[test]
993 fn test_page_has_more() {
994 let page: Page<u32> = Page {
995 items: vec![1, 2, 3],
996 has_more: true,
997 total_count: Some(100),
998 };
999 assert!(page.has_more);
1000 assert_eq!(page.items.len(), 3);
1001 assert_eq!(page.total_count, Some(100));
1002 }
1003
1004 #[test]
1005 fn test_custom_url() {
1006 let client = IndexerClient::with_url("https://custom-indexer.example.com/v1/graphql");
1007 assert!(client.is_ok());
1008 }
1009}