use crate::config::AptosConfig;
use crate::error::{AptosError, AptosResult};
use crate::retry::{RetryConfig, RetryExecutor};
use crate::types::AccountAddress;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use url::Url;
const MAX_INDEXER_RESPONSE_SIZE: usize = 10 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct IndexerClient {
indexer_url: Url,
client: Client,
retry_config: Arc<RetryConfig>,
}
#[derive(Debug, Serialize)]
struct GraphQLRequest {
query: String,
#[serde(skip_serializing_if = "Option::is_none")]
variables: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct GraphQLResponse<T> {
data: Option<T>,
errors: Option<Vec<GraphQLError>>,
}
#[derive(Debug, Deserialize)]
struct GraphQLError {
message: String,
}
impl IndexerClient {
pub fn new(config: &AptosConfig) -> AptosResult<Self> {
let indexer_url = config
.indexer_url()
.cloned()
.ok_or_else(|| AptosError::Config("indexer URL not configured".into()))?;
let pool = config.pool_config();
let mut builder = Client::builder()
.timeout(config.timeout)
.pool_max_idle_per_host(pool.max_idle_per_host.unwrap_or(usize::MAX))
.pool_idle_timeout(pool.idle_timeout)
.tcp_nodelay(pool.tcp_nodelay);
if let Some(keepalive) = pool.tcp_keepalive {
builder = builder.tcp_keepalive(keepalive);
}
let client = builder.build().map_err(AptosError::Http)?;
let retry_config = Arc::new(config.retry_config().clone());
Ok(Self {
indexer_url,
client,
retry_config,
})
}
pub fn with_url(url: &str) -> AptosResult<Self> {
let indexer_url = Url::parse(url)?;
crate::config::validate_url_scheme(&indexer_url)?;
let client = Client::new();
Ok(Self {
indexer_url,
client,
retry_config: Arc::new(RetryConfig::default()),
})
}
pub async fn query<T: for<'de> Deserialize<'de> + Send + 'static>(
&self,
query: &str,
variables: Option<serde_json::Value>,
) -> AptosResult<T> {
let request = GraphQLRequest {
query: query.to_string(),
variables,
};
let client = self.client.clone();
let url = self.indexer_url.clone();
let retry_config = self.retry_config.clone();
let executor = RetryExecutor::from_shared(retry_config);
executor
.execute(|| {
let client = client.clone();
let url = url.clone();
let request = GraphQLRequest {
query: request.query.clone(),
variables: request.variables.clone(),
};
async move {
let response = client.post(url.as_str()).json(&request).send().await?;
if response.status().is_success() {
let bytes = crate::config::read_response_bounded(
response,
MAX_INDEXER_RESPONSE_SIZE,
)
.await?;
let graphql_response: GraphQLResponse<T> = serde_json::from_slice(&bytes)?;
if let Some(errors) = graphql_response.errors {
let mut message = String::new();
for (i, e) in errors.iter().enumerate() {
if i > 0 {
message.push_str("; ");
}
message.push_str(&e.message);
}
return Err(AptosError::Api {
status_code: 400,
message,
error_code: Some("GRAPHQL_ERROR".into()),
vm_error_code: None,
});
}
graphql_response.data.ok_or_else(|| {
AptosError::Internal("no data in GraphQL response".into())
})
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
Err(AptosError::api(status.as_u16(), body))
}
}
})
.await
}
pub async fn get_fungible_asset_balances(
&self,
address: AccountAddress,
) -> AptosResult<Vec<FungibleAssetBalance>> {
#[derive(Deserialize)]
struct Response {
current_fungible_asset_balances: Vec<FungibleAssetBalance>,
}
let query = r"
query GetFungibleAssetBalances($address: String!) {
current_fungible_asset_balances(
where: { owner_address: { _eq: $address } }
) {
asset_type
amount
metadata {
name
symbol
decimals
}
}
}
";
let variables = serde_json::json!({
"address": address.to_string()
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.current_fungible_asset_balances)
}
pub async fn get_account_tokens(
&self,
address: AccountAddress,
) -> AptosResult<Vec<TokenBalance>> {
#[derive(Deserialize)]
struct Response {
current_token_ownerships_v2: Vec<TokenBalance>,
}
let query = r"
query GetAccountTokens($address: String!) {
current_token_ownerships_v2(
where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
) {
token_data_id
amount
current_token_data {
token_name
description
token_uri
current_collection {
collection_name
}
}
}
}
";
let variables = serde_json::json!({
"address": address.to_string()
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.current_token_ownerships_v2)
}
pub async fn get_account_transactions(
&self,
address: AccountAddress,
limit: Option<u32>,
) -> AptosResult<Vec<Transaction>> {
#[derive(Deserialize)]
struct Response {
account_transactions: Vec<Transaction>,
}
let query = r"
query GetAccountTransactions($address: String!, $limit: Int!) {
account_transactions(
where: { account_address: { _eq: $address } }
order_by: { transaction_version: desc }
limit: $limit
) {
transaction_version
coin_activities {
activity_type
amount
coin_type
}
}
}
";
let variables = serde_json::json!({
"address": address.to_string(),
"limit": limit.unwrap_or(25)
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.account_transactions)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct FungibleAssetBalance {
pub asset_type: String,
pub amount: String,
pub metadata: Option<FungibleAssetMetadata>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct FungibleAssetMetadata {
pub name: String,
pub symbol: String,
pub decimals: u8,
}
#[derive(Debug, Clone, Deserialize)]
pub struct TokenBalance {
pub token_data_id: String,
pub amount: String,
pub current_token_data: Option<TokenData>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct TokenData {
pub token_name: String,
pub description: String,
pub token_uri: String,
pub current_collection: Option<CollectionData>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CollectionData {
pub collection_name: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Transaction {
pub transaction_version: String,
pub coin_activities: Vec<CoinActivity>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CoinActivity {
pub activity_type: String,
pub amount: Option<String>,
pub coin_type: String,
}
#[derive(Debug, Clone, Default)]
pub struct PaginationParams {
pub limit: u32,
pub offset: u32,
}
impl PaginationParams {
pub fn new(limit: u32, offset: u32) -> Self {
Self { limit, offset }
}
pub fn first(limit: u32) -> Self {
Self { limit, offset: 0 }
}
}
#[derive(Debug, Clone)]
pub struct Page<T> {
pub items: Vec<T>,
pub has_more: bool,
pub total_count: Option<u64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Event {
pub sequence_number: String,
#[serde(rename = "type")]
pub event_type: String,
pub data: serde_json::Value,
pub transaction_version: Option<String>,
pub account_address: Option<String>,
pub creation_number: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Collection {
pub collection_id: String,
pub collection_name: String,
pub creator_address: String,
pub current_supply: String,
pub max_supply: Option<String>,
pub uri: String,
pub description: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CoinBalance {
pub coin_type: String,
pub amount: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ProcessorStatus {
pub processor: String,
pub last_success_version: u64,
pub last_updated: Option<String>,
}
impl IndexerClient {
pub async fn get_account_tokens_paginated(
&self,
address: AccountAddress,
pagination: Option<PaginationParams>,
) -> AptosResult<Page<TokenBalance>> {
#[derive(Deserialize)]
struct AggregateCount {
count: u64,
}
#[derive(Deserialize)]
struct Aggregate {
aggregate: Option<AggregateCount>,
}
#[derive(Deserialize)]
struct Response {
current_token_ownerships_v2: Vec<TokenBalance>,
current_token_ownerships_v2_aggregate: Aggregate,
}
let pagination = pagination.unwrap_or(PaginationParams {
limit: 25,
offset: 0,
});
let query = r"
query GetAccountTokens($address: String!, $limit: Int!, $offset: Int!) {
current_token_ownerships_v2(
where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
limit: $limit
offset: $offset
) {
token_data_id
amount
current_token_data {
token_name
description
token_uri
current_collection {
collection_name
}
}
}
current_token_ownerships_v2_aggregate(
where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
) {
aggregate {
count
}
}
}
";
let variables = serde_json::json!({
"address": address.to_string(),
"limit": pagination.limit,
"offset": pagination.offset
});
let response: Response = self.query(query, Some(variables)).await?;
let total_count = response
.current_token_ownerships_v2_aggregate
.aggregate
.map(|a| a.count);
let has_more = total_count.is_some_and(|total| {
(u64::from(pagination.offset) + response.current_token_ownerships_v2.len() as u64)
< total
});
Ok(Page {
items: response.current_token_ownerships_v2,
has_more,
total_count,
})
}
pub async fn get_account_transactions_paginated(
&self,
address: AccountAddress,
pagination: Option<PaginationParams>,
) -> AptosResult<Page<Transaction>> {
#[derive(Deserialize)]
struct AggregateCount {
count: u64,
}
#[derive(Deserialize)]
struct Aggregate {
aggregate: Option<AggregateCount>,
}
#[derive(Deserialize)]
struct Response {
account_transactions: Vec<Transaction>,
account_transactions_aggregate: Aggregate,
}
let pagination = pagination.unwrap_or(PaginationParams {
limit: 25,
offset: 0,
});
let query = r"
query GetAccountTransactions($address: String!, $limit: Int!, $offset: Int!) {
account_transactions(
where: { account_address: { _eq: $address } }
order_by: { transaction_version: desc }
limit: $limit
offset: $offset
) {
transaction_version
coin_activities {
activity_type
amount
coin_type
}
}
account_transactions_aggregate(
where: { account_address: { _eq: $address } }
) {
aggregate {
count
}
}
}
";
let variables = serde_json::json!({
"address": address.to_string(),
"limit": pagination.limit,
"offset": pagination.offset
});
let response: Response = self.query(query, Some(variables)).await?;
let total_count = response
.account_transactions_aggregate
.aggregate
.map(|a| a.count);
let has_more = total_count.is_some_and(|total| {
(u64::from(pagination.offset) + response.account_transactions.len() as u64) < total
});
Ok(Page {
items: response.account_transactions,
has_more,
total_count,
})
}
pub async fn get_events_by_type(
&self,
event_type: &str,
limit: Option<u32>,
) -> AptosResult<Vec<Event>> {
#[derive(Deserialize)]
struct Response {
events: Vec<Event>,
}
let query = r"
query GetEventsByType($type: String!, $limit: Int!) {
events(
where: { type: { _eq: $type } }
order_by: { transaction_version: desc }
limit: $limit
) {
sequence_number
type
data
transaction_version
account_address
creation_number
}
}
";
let variables = serde_json::json!({
"type": event_type,
"limit": limit.unwrap_or(25)
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.events)
}
pub async fn get_events_by_account(
&self,
address: AccountAddress,
limit: Option<u32>,
) -> AptosResult<Vec<Event>> {
#[derive(Deserialize)]
struct Response {
events: Vec<Event>,
}
let query = r"
query GetEventsByAccount($address: String!, $limit: Int!) {
events(
where: { account_address: { _eq: $address } }
order_by: { transaction_version: desc }
limit: $limit
) {
sequence_number
type
data
transaction_version
account_address
creation_number
}
}
";
let variables = serde_json::json!({
"address": address.to_string(),
"limit": limit.unwrap_or(25)
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.events)
}
pub async fn get_collection(
&self,
collection_address: AccountAddress,
) -> AptosResult<Collection> {
#[derive(Deserialize)]
struct Response {
current_collections_v2: Vec<Collection>,
}
let query = r"
query GetCollection($address: String!) {
current_collections_v2(
where: { collection_id: { _eq: $address } }
limit: 1
) {
collection_id
collection_name
creator_address
current_supply
max_supply
uri
description
}
}
";
let variables = serde_json::json!({
"address": collection_address.to_string()
});
let response: Response = self.query(query, Some(variables)).await?;
response
.current_collections_v2
.into_iter()
.next()
.ok_or_else(|| {
AptosError::NotFound(format!("Collection not found: {collection_address}"))
})
}
pub async fn get_collection_tokens(
&self,
collection_address: AccountAddress,
pagination: Option<PaginationParams>,
) -> AptosResult<Page<TokenBalance>> {
#[derive(Deserialize)]
struct Response {
current_token_ownerships_v2: Vec<TokenBalance>,
}
let pagination = pagination.unwrap_or(PaginationParams {
limit: 25,
offset: 0,
});
let query = r"
query GetCollectionTokens($address: String!, $limit: Int!, $offset: Int!) {
current_token_ownerships_v2(
where: {
current_token_data: {
current_collection: {
collection_id: { _eq: $address }
}
}
amount: { _gt: 0 }
}
limit: $limit
offset: $offset
) {
token_data_id
amount
current_token_data {
token_name
description
token_uri
current_collection {
collection_name
}
}
}
}
";
let variables = serde_json::json!({
"address": collection_address.to_string(),
"limit": pagination.limit,
"offset": pagination.offset
});
let response: Response = self.query(query, Some(variables)).await?;
let items_count = response.current_token_ownerships_v2.len();
Ok(Page {
items: response.current_token_ownerships_v2,
has_more: items_count == pagination.limit as usize,
total_count: None,
})
}
pub async fn get_coin_balances(
&self,
address: AccountAddress,
) -> AptosResult<Vec<CoinBalance>> {
#[derive(Deserialize)]
struct Response {
current_coin_balances: Vec<CoinBalance>,
}
let query = r"
query GetCoinBalances($address: String!) {
current_coin_balances(
where: { owner_address: { _eq: $address } }
) {
coin_type
amount
}
}
";
let variables = serde_json::json!({
"address": address.to_string()
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.current_coin_balances)
}
pub async fn get_coin_activities(
&self,
address: AccountAddress,
limit: Option<u32>,
) -> AptosResult<Vec<CoinActivity>> {
#[derive(Deserialize)]
struct Response {
coin_activities: Vec<CoinActivity>,
}
let query = r"
query GetCoinActivities($address: String!, $limit: Int!) {
coin_activities(
where: { owner_address: { _eq: $address } }
order_by: { transaction_version: desc }
limit: $limit
) {
activity_type
amount
coin_type
}
}
";
let variables = serde_json::json!({
"address": address.to_string(),
"limit": limit.unwrap_or(25)
});
let response: Response = self.query(query, Some(variables)).await?;
Ok(response.coin_activities)
}
pub async fn get_processor_status(&self) -> AptosResult<Vec<ProcessorStatus>> {
#[derive(Deserialize)]
struct Response {
processor_status: Vec<ProcessorStatus>,
}
let query = r"
query GetProcessorStatus {
processor_status {
processor
last_success_version
last_updated
}
}
";
let response: Response = self.query(query, None).await?;
Ok(response.processor_status)
}
pub async fn get_indexer_version(&self) -> AptosResult<u64> {
let statuses = self.get_processor_status().await?;
statuses
.into_iter()
.map(|s| s.last_success_version)
.max()
.ok_or_else(|| AptosError::Internal("No processor status available".into()))
}
pub async fn check_indexer_lag(
&self,
reference_version: u64,
max_lag: u64,
) -> AptosResult<bool> {
let indexer_version = self.get_indexer_version().await?;
Ok(reference_version.saturating_sub(indexer_version) <= max_lag)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_indexer_client_creation() {
let client = IndexerClient::new(&AptosConfig::testnet());
assert!(client.is_ok());
}
#[test]
fn test_pagination_params() {
let params = PaginationParams::new(10, 20);
assert_eq!(params.limit, 10);
assert_eq!(params.offset, 20);
let first_page = PaginationParams::first(50);
assert_eq!(first_page.limit, 50);
assert_eq!(first_page.offset, 0);
}
#[test]
fn test_page_has_more() {
let page: Page<u32> = Page {
items: vec![1, 2, 3],
has_more: true,
total_count: Some(100),
};
assert!(page.has_more);
assert_eq!(page.items.len(), 3);
assert_eq!(page.total_count, Some(100));
}
#[test]
fn test_custom_url() {
let client = IndexerClient::with_url("https://custom-indexer.example.com/v1/graphql");
assert!(client.is_ok());
}
}