use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use super::provider::{ChainFamily, ChainProvider, TxStatus};
use crate::core::types::ExchangeError;
#[async_trait]
pub trait CosmosChain: ChainProvider {
async fn get_sequence(&self, address: &str) -> Result<u64, ExchangeError>;
async fn next_sequence(&self, address: &str) -> Result<u64, ExchangeError>;
async fn refresh_sequence(&self, address: &str) -> Result<u64, ExchangeError>;
async fn query_account(&self, address: &str) -> Result<(u64, u64), ExchangeError>;
async fn simulate(&self, tx_bytes: &[u8]) -> Result<u64, ExchangeError>;
async fn broadcast_tx_sync(&self, tx_bytes: &[u8]) -> Result<String, ExchangeError>;
async fn abci_query(
&self,
path: &str,
data: &str,
) -> Result<serde_json::Value, ExchangeError>;
async fn get_all_balances(
&self,
address: &str,
) -> Result<Vec<serde_json::Value>, ExchangeError>;
async fn get_ibc_channel(
&self,
channel_id: &str,
port_id: &str,
) -> Result<serde_json::Value, ExchangeError>;
async fn get_denom_trace(
&self,
hash: &str,
) -> Result<serde_json::Value, ExchangeError>;
async fn query_contract_smart(
&self,
contract: &str,
query_msg: serde_json::Value,
) -> Result<serde_json::Value, ExchangeError>;
async fn get_validators(&self) -> Result<Vec<serde_json::Value>, ExchangeError>;
async fn get_delegations(
&self,
delegator: &str,
) -> Result<Vec<serde_json::Value>, ExchangeError>;
async fn get_proposals(
&self,
status: Option<&str>,
) -> Result<Vec<serde_json::Value>, ExchangeError>;
async fn get_tx(&self, hash: &str) -> Result<serde_json::Value, ExchangeError>;
async fn search_txs(
&self,
events: &str,
page: Option<u32>,
) -> Result<serde_json::Value, ExchangeError>;
async fn get_pool(&self, pool_id: &str) -> Result<serde_json::Value, ExchangeError>;
async fn get_pools(
&self,
pagination_key: Option<&str>,
) -> Result<serde_json::Value, ExchangeError>;
}
pub struct CosmosProvider {
endpoint: String,
chain_id: String,
http: reqwest::Client,
sequences: Arc<Mutex<HashMap<String, u64>>>,
}
impl CosmosProvider {
pub fn new(endpoint: &str, chain_id: &str) -> Self {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("reqwest client construction is infallible with valid config");
Self {
endpoint: endpoint.trim_end_matches('/').to_string(),
chain_id: chain_id.to_string(),
http,
sequences: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn dydx_mainnet() -> Self {
Self::new(
"https://dydx-rest.publicnode.com",
"dydx-mainnet-1",
)
}
pub fn dydx_testnet() -> Self {
Self::new(
"https://dydx-testnet-rest.publicnode.com",
"dydx-testnet-4",
)
}
pub fn osmosis_mainnet() -> Self {
Self::new("https://lcd.osmosis.zone", "osmosis-1")
}
#[inline]
pub fn osmosis() -> Self {
Self::osmosis_mainnet()
}
pub fn injective() -> Self {
Self::new("https://lcd.injective.network", "injective-1")
}
pub fn sei() -> Self {
Self::new("https://rest.sei-apis.com", "pacific-1")
}
pub fn neutron() -> Self {
Self::new("https://rest.neutron.quasar.fi", "neutron-1")
}
#[inline]
pub fn dydx() -> Self {
Self::dydx_mainnet()
}
pub fn celestia() -> Self {
Self::new("https://api.celestia.nodestake.top", "celestia")
}
async fn fetch_account_info(
&self,
address: &str,
) -> Result<(u64, u64), ExchangeError> {
let url = format!(
"{}/cosmos/auth/v1beta1/accounts/{}",
self.endpoint, address
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: account query failed for {}: {}",
address, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: account query HTTP {} for {}: {}",
status, address, body
)));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: account JSON parse error: {}",
e
)))?;
let account = json
.get("account")
.ok_or_else(|| ExchangeError::Parse(
"CosmosProvider: missing 'account' field in response".to_string()
))?;
Self::extract_account_fields(account)
}
fn extract_account_fields(
account: &serde_json::Value,
) -> Result<(u64, u64), ExchangeError> {
let num_opt = account.get("account_number")
.and_then(|v| {
v.as_str()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| v.as_u64())
});
let seq_opt = account.get("sequence")
.and_then(|v| {
v.as_str()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| v.as_u64())
});
if let (Some(num), Some(seq)) = (num_opt, seq_opt) {
return Ok((num, seq));
}
if let Some(base) = account.get("base_account") {
return Self::extract_account_fields(base);
}
if let Some(val) = account.get("value") {
if let Some(base) = val.get("base_account") {
return Self::extract_account_fields(base);
}
return Self::extract_account_fields(val);
}
Err(ExchangeError::Parse(format!(
"CosmosProvider: cannot extract account_number/sequence from: {}",
account
)))
}
async fn fetch_latest_height(&self) -> Result<u64, ExchangeError> {
let url = format!(
"{}/cosmos/base/tendermint/v1beta1/blocks/latest",
self.endpoint
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: block height query failed: {}",
e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: blocks/latest HTTP {}: {}",
status, body
)));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: block JSON parse error: {}",
e
)))?;
let height_str = json
.pointer("/block/header/height")
.and_then(|v| v.as_str())
.ok_or_else(|| ExchangeError::Parse(
"CosmosProvider: missing block.header.height".to_string()
))?;
height_str.parse::<u64>().map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: block height parse error: {}",
e
))
})
}
async fn fetch_native_balance(&self, address: &str) -> Result<String, ExchangeError> {
let url = format!(
"{}/cosmos/bank/v1beta1/balances/{}",
self.endpoint, address
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: balance query failed for {}: {}",
address, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: balance HTTP {} for {}: {}",
status, address, body
)));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: balance JSON parse error: {}",
e
)))?;
let amount = json
.get("balances")
.and_then(|v| v.as_array())
.and_then(|arr| arr.first())
.and_then(|coin| coin.get("amount"))
.and_then(|v| v.as_str())
.unwrap_or("0")
.to_string();
Ok(amount)
}
async fn broadcast_tx_rest(&self, tx_bytes: &[u8]) -> Result<String, ExchangeError> {
use base64::Engine as _;
let url = format!("{}/cosmos/tx/v1beta1/txs", self.endpoint);
let encoded = base64::engine::general_purpose::STANDARD.encode(tx_bytes);
let body = serde_json::json!({
"tx_bytes": encoded,
"mode": "BROADCAST_MODE_SYNC"
});
let resp = self
.http
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: broadcast_tx POST failed: {}",
e
)))?;
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: broadcast_tx JSON parse error: {}",
e
)))?;
if let Some(code) = json
.pointer("/tx_response/code")
.and_then(|v| v.as_u64())
{
if code != 0 {
let raw_log = json
.pointer("/tx_response/raw_log")
.and_then(|v| v.as_str())
.unwrap_or("unknown error")
.to_string();
return Err(ExchangeError::Api {
code: code as i32,
message: format!("CosmosProvider broadcast rejected: {}", raw_log),
});
}
}
let txhash = json
.pointer("/tx_response/txhash")
.and_then(|v| v.as_str())
.ok_or_else(|| ExchangeError::Parse(
"CosmosProvider: missing tx_response.txhash in broadcast response".to_string()
))?
.to_string();
Ok(txhash)
}
async fn fetch_tx_status(&self, tx_hash: &str) -> Result<TxStatus, ExchangeError> {
let url = format!("{}/cosmos/tx/v1beta1/txs/{}", self.endpoint, tx_hash);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: tx status query failed: {}",
e
)))?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(TxStatus::NotFound);
}
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: tx status HTTP {}: {}",
status, body
)));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: tx status JSON parse error: {}",
e
)))?;
let code = json
.pointer("/tx_response/code")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let height = json
.pointer("/tx_response/height")
.and_then(|v| {
v.as_str()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| v.as_u64())
})
.unwrap_or(0);
if code == 0 {
Ok(TxStatus::Confirmed { block: height })
} else {
let reason = json
.pointer("/tx_response/raw_log")
.and_then(|v| v.as_str())
.unwrap_or("unknown error")
.to_string();
Ok(TxStatus::Failed { reason })
}
}
async fn fetch_simulate(&self, tx_bytes: &[u8]) -> Result<u64, ExchangeError> {
use base64::Engine as _;
let url = format!("{}/cosmos/tx/v1beta1/simulate", self.endpoint);
let encoded = base64::engine::general_purpose::STANDARD.encode(tx_bytes);
let body = serde_json::json!({ "tx_bytes": encoded });
let resp = self
.http
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: simulate POST failed: {}",
e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: simulate HTTP {}: {}",
status, body
)));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: simulate JSON parse error: {}",
e
)))?;
let gas_used = json
.pointer("/gas_info/gas_used")
.and_then(|v| {
v.as_str()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| v.as_u64())
})
.ok_or_else(|| ExchangeError::Parse(
"CosmosProvider: missing gas_info.gas_used in simulate response".to_string()
))?;
Ok(gas_used)
}
}
#[async_trait]
impl ChainProvider for CosmosProvider {
fn chain_family(&self) -> ChainFamily {
ChainFamily::Cosmos { chain_id: self.chain_id.clone() }
}
async fn broadcast_tx(&self, tx_bytes: &[u8]) -> Result<String, ExchangeError> {
self.broadcast_tx_rest(tx_bytes).await
}
async fn get_height(&self) -> Result<u64, ExchangeError> {
self.fetch_latest_height().await
}
async fn get_nonce(&self, address: &str) -> Result<u64, ExchangeError> {
self.get_sequence(address).await
}
async fn get_native_balance(&self, address: &str) -> Result<String, ExchangeError> {
self.fetch_native_balance(address).await
}
async fn get_tx_status(&self, tx_hash: &str) -> Result<TxStatus, ExchangeError> {
self.fetch_tx_status(tx_hash).await
}
}
#[async_trait]
impl CosmosChain for CosmosProvider {
async fn get_sequence(&self, address: &str) -> Result<u64, ExchangeError> {
{
let cache = self.sequences.lock().await;
if let Some(&seq) = cache.get(address) {
return Ok(seq);
}
}
let (_account_number, sequence) = self.fetch_account_info(address).await?;
{
let mut cache = self.sequences.lock().await;
cache.entry(address.to_string()).or_insert(sequence);
}
Ok(sequence)
}
async fn next_sequence(&self, address: &str) -> Result<u64, ExchangeError> {
{
let needs_fetch = {
let cache = self.sequences.lock().await;
!cache.contains_key(address)
};
if needs_fetch {
let (_account_number, sequence) = self.fetch_account_info(address).await?;
let mut cache = self.sequences.lock().await;
cache.entry(address.to_string()).or_insert(sequence);
}
}
let mut cache = self.sequences.lock().await;
let seq = cache
.get_mut(address)
.expect("just inserted above; cache entry must exist");
let current = *seq;
*seq = current + 1;
Ok(current)
}
async fn refresh_sequence(&self, address: &str) -> Result<u64, ExchangeError> {
let (_account_number, sequence) = self.fetch_account_info(address).await?;
let mut cache = self.sequences.lock().await;
cache.insert(address.to_string(), sequence);
Ok(sequence)
}
async fn query_account(&self, address: &str) -> Result<(u64, u64), ExchangeError> {
self.fetch_account_info(address).await
}
async fn simulate(&self, tx_bytes: &[u8]) -> Result<u64, ExchangeError> {
self.fetch_simulate(tx_bytes).await
}
async fn broadcast_tx_sync(&self, tx_bytes: &[u8]) -> Result<String, ExchangeError> {
self.broadcast_tx_rest(tx_bytes).await
}
async fn abci_query(
&self,
path: &str,
data: &str,
) -> Result<serde_json::Value, ExchangeError> {
let mut url = format!(
"{}/abci_query?path={}&data={}",
self.endpoint,
urlencoding::encode(path),
urlencoding::encode(data),
);
url.push_str("&height=0&prove=false");
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: abci_query failed for path '{}': {}",
path, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: abci_query HTTP {} for '{}': {}",
status, path, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: abci_query JSON parse error: {}",
e
)))
}
async fn get_all_balances(
&self,
address: &str,
) -> Result<Vec<serde_json::Value>, ExchangeError> {
let url = format!(
"{}/cosmos/bank/v1beta1/balances/{}",
self.endpoint, address
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_all_balances failed for {}: {}",
address, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_all_balances HTTP {} for {}: {}",
status, address, body
)));
}
let json: serde_json::Value = resp.json().await.map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: get_all_balances JSON parse error: {}",
e
))
})?;
let balances = json
.get("balances")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
Ok(balances)
}
async fn get_ibc_channel(
&self,
channel_id: &str,
port_id: &str,
) -> Result<serde_json::Value, ExchangeError> {
let url = format!(
"{}/ibc/core/channel/v1/channels/{}/ports/{}",
self.endpoint, channel_id, port_id
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_ibc_channel failed for {}/{}: {}",
channel_id, port_id, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_ibc_channel HTTP {} for {}/{}: {}",
status, channel_id, port_id, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: get_ibc_channel JSON parse error: {}",
e
)))
}
async fn get_denom_trace(
&self,
hash: &str,
) -> Result<serde_json::Value, ExchangeError> {
let hash_only = hash.trim_start_matches("ibc/");
let url = format!(
"{}/ibc/apps/transfer/v1/denom_traces/{}",
self.endpoint, hash_only
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_denom_trace failed for {}: {}",
hash_only, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_denom_trace HTTP {} for {}: {}",
status, hash_only, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: get_denom_trace JSON parse error: {}",
e
)))
}
async fn query_contract_smart(
&self,
contract: &str,
query_msg: serde_json::Value,
) -> Result<serde_json::Value, ExchangeError> {
use base64::Engine as _;
let query_bytes = serde_json::to_vec(&query_msg).map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: query_contract_smart serialize error: {}",
e
))
})?;
let encoded = base64::engine::general_purpose::STANDARD.encode(&query_bytes);
let url = format!(
"{}/cosmwasm/wasm/v1/contract/{}/smart/{}",
self.endpoint, contract, encoded
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: query_contract_smart failed for {}: {}",
contract, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: query_contract_smart HTTP {} for {}: {}",
status, contract, body
)));
}
let json: serde_json::Value = resp.json().await.map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: query_contract_smart JSON parse error: {}",
e
))
})?;
Ok(json.get("data").cloned().unwrap_or(json))
}
async fn get_validators(&self) -> Result<Vec<serde_json::Value>, ExchangeError> {
let url = format!(
"{}/cosmos/staking/v1beta1/validators",
self.endpoint
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_validators failed: {}",
e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_validators HTTP {}: {}",
status, body
)));
}
let json: serde_json::Value = resp.json().await.map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: get_validators JSON parse error: {}",
e
))
})?;
let validators = json
.get("validators")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
Ok(validators)
}
async fn get_delegations(
&self,
delegator: &str,
) -> Result<Vec<serde_json::Value>, ExchangeError> {
let url = format!(
"{}/cosmos/staking/v1beta1/delegations/{}",
self.endpoint, delegator
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_delegations failed for {}: {}",
delegator, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_delegations HTTP {} for {}: {}",
status, delegator, body
)));
}
let json: serde_json::Value = resp.json().await.map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: get_delegations JSON parse error: {}",
e
))
})?;
let delegations = json
.get("delegation_responses")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
Ok(delegations)
}
async fn get_proposals(
&self,
status: Option<&str>,
) -> Result<Vec<serde_json::Value>, ExchangeError> {
let mut url = format!("{}/cosmos/gov/v1/proposals", self.endpoint);
if let Some(s) = status {
url.push_str(&format!("?proposal_status={}", urlencoding::encode(s)));
}
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_proposals failed: {}",
e
)))?;
if !resp.status().is_success() {
let status_code = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_proposals HTTP {}: {}",
status_code, body
)));
}
let json: serde_json::Value = resp.json().await.map_err(|e| {
ExchangeError::Parse(format!(
"CosmosProvider: get_proposals JSON parse error: {}",
e
))
})?;
let proposals = json
.get("proposals")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
Ok(proposals)
}
async fn get_tx(&self, hash: &str) -> Result<serde_json::Value, ExchangeError> {
let url = format!("{}/cosmos/tx/v1beta1/txs/{}", self.endpoint, hash);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_tx failed for {}: {}",
hash, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_tx HTTP {} for {}: {}",
status, hash, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: get_tx JSON parse error: {}",
e
)))
}
async fn search_txs(
&self,
events: &str,
page: Option<u32>,
) -> Result<serde_json::Value, ExchangeError> {
let page_num = page.unwrap_or(1);
let url = format!(
"{}/cosmos/tx/v1beta1/txs?events={}&page={}",
self.endpoint,
urlencoding::encode(events),
page_num,
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: search_txs failed: {}",
e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: search_txs HTTP {}: {}",
status, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: search_txs JSON parse error: {}",
e
)))
}
async fn get_pool(&self, pool_id: &str) -> Result<serde_json::Value, ExchangeError> {
let url = format!(
"{}/osmosis/gamm/v1beta1/pools/{}",
self.endpoint, pool_id
);
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_pool failed for pool {}: {}",
pool_id, e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_pool HTTP {} for pool {}: {}",
status, pool_id, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: get_pool JSON parse error: {}",
e
)))
}
async fn get_pools(
&self,
pagination_key: Option<&str>,
) -> Result<serde_json::Value, ExchangeError> {
let mut url = format!("{}/osmosis/gamm/v1beta1/pools", self.endpoint);
if let Some(key) = pagination_key {
url.push_str(&format!(
"?pagination.key={}",
urlencoding::encode(key)
));
}
let resp = self
.http
.get(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!(
"CosmosProvider: get_pools failed: {}",
e
)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ExchangeError::Network(format!(
"CosmosProvider: get_pools HTTP {}: {}",
status, body
)));
}
resp.json().await.map_err(|e| ExchangeError::Parse(format!(
"CosmosProvider: get_pools JSON parse error: {}",
e
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cosmos_provider_chain_family() {
let provider = CosmosProvider::dydx_mainnet();
assert_eq!(
provider.chain_family(),
ChainFamily::Cosmos { chain_id: "dydx-mainnet-1".to_string() }
);
}
#[test]
fn test_cosmos_provider_testnet_chain_id() {
let provider = CosmosProvider::dydx_testnet();
match provider.chain_family() {
ChainFamily::Cosmos { chain_id } => {
assert_eq!(chain_id, "dydx-testnet-4");
}
_ => panic!("Expected Cosmos chain family"),
}
}
#[test]
fn test_extract_account_fields_flat_strings() {
let json = serde_json::json!({
"account_number": "42",
"sequence": "7"
});
let (num, seq) = CosmosProvider::extract_account_fields(&json).unwrap();
assert_eq!(num, 42);
assert_eq!(seq, 7);
}
#[test]
fn test_extract_account_fields_flat_integers() {
let json = serde_json::json!({
"account_number": 5,
"sequence": 12
});
let (num, seq) = CosmosProvider::extract_account_fields(&json).unwrap();
assert_eq!(num, 5);
assert_eq!(seq, 12);
}
#[test]
fn test_extract_account_fields_nested_base_account() {
let json = serde_json::json!({
"@type": "/cosmos.auth.v1beta1.BaseVestingAccount",
"base_account": {
"account_number": "100",
"sequence": "3"
}
});
let (num, seq) = CosmosProvider::extract_account_fields(&json).unwrap();
assert_eq!(num, 100);
assert_eq!(seq, 3);
}
#[tokio::test]
async fn test_sequence_cache_atomicity() {
let provider = Arc::new(CosmosProvider::dydx_mainnet());
{
let mut cache = provider.sequences.lock().await;
cache.insert("dydx1test".to_string(), 10u64);
}
let seq1 = provider.next_sequence("dydx1test").await.unwrap();
let seq2 = provider.next_sequence("dydx1test").await.unwrap();
let seq3 = provider.next_sequence("dydx1test").await.unwrap();
assert_eq!(seq1, 10, "first call should return the seeded value");
assert_eq!(seq2, 11, "second call should return seeded + 1");
assert_eq!(seq3, 12, "third call should return seeded + 2");
let cache = provider.sequences.lock().await;
assert_eq!(*cache.get("dydx1test").unwrap(), 13u64);
}
#[tokio::test]
async fn test_get_sequence_returns_cached() {
let provider = CosmosProvider::dydx_mainnet();
{
let mut cache = provider.sequences.lock().await;
cache.insert("dydx1abc".to_string(), 5u64);
}
let seq = provider.get_sequence("dydx1abc").await.unwrap();
assert_eq!(seq, 5);
let seq2 = provider.get_sequence("dydx1abc").await.unwrap();
assert_eq!(seq2, 5);
}
}