use std::sync::Arc;
use serde::de::DeserializeOwned;
use serde_json::{Value, json};
use crate::{
config::chain::{Env, SupportedChainId},
error::CowError,
order_book::{RateLimiter, RetryPolicy},
};
use super::{
queries,
types::{
Bundle, DailyTotal, DailyVolume, HourlyTotal, HourlyVolume, Order, Pair, PairDaily,
PairHourly, Settlement, Token, TokenDailyTotal, TokenHourlyTotal, TokenTradingEvent,
Totals, Trade, User,
},
};
const fn subgraph_url(chain: SupportedChainId, _env: Env) -> Option<&'static str> {
match chain {
SupportedChainId::Mainnet => {
Some("https://api.thegraph.com/subgraphs/name/cowprotocol/cow")
}
SupportedChainId::GnosisChain => {
Some("https://api.thegraph.com/subgraphs/name/cowprotocol/cow-gc")
}
SupportedChainId::ArbitrumOne => {
Some("https://api.thegraph.com/subgraphs/name/cowprotocol/cow-arbitrum-one")
}
SupportedChainId::Base => {
Some("https://api.thegraph.com/subgraphs/name/cowprotocol/cow-base")
}
SupportedChainId::Sepolia => {
Some("https://api.thegraph.com/subgraphs/name/cowprotocol/cow-sepolia")
}
SupportedChainId::Polygon |
SupportedChainId::Avalanche |
SupportedChainId::BnbChain |
SupportedChainId::Linea |
SupportedChainId::Lens |
SupportedChainId::Plasma |
SupportedChainId::Ink => None,
}
}
#[derive(Debug, Clone)]
pub struct SubgraphApi {
client: reqwest::Client,
base_url: String,
rate_limiter: Arc<RateLimiter>,
retry_policy: RetryPolicy,
}
impl SubgraphApi {
#[allow(clippy::shadow_reuse, reason = "builder pattern chains naturally shadow")]
fn build_client() -> reqwest::Client {
let builder = reqwest::Client::builder();
#[cfg(not(target_arch = "wasm32"))]
let builder = builder.timeout(std::time::Duration::from_secs(30));
builder.build().unwrap_or_default()
}
pub fn new(chain: SupportedChainId, env: Env) -> Result<Self, CowError> {
let base_url = subgraph_url(chain, env)
.ok_or_else(|| CowError::UnknownAsset(format!("no subgraph for chain {chain}")))?;
Ok(Self {
client: Self::build_client(),
base_url: base_url.to_owned(),
rate_limiter: Arc::new(RateLimiter::default_orderbook()),
retry_policy: RetryPolicy::default_orderbook(),
})
}
#[must_use]
pub fn new_with_url(url: impl Into<String>) -> Self {
Self {
client: Self::build_client(),
base_url: url.into(),
rate_limiter: Arc::new(RateLimiter::default_orderbook()),
retry_policy: RetryPolicy::default_orderbook(),
}
}
#[must_use]
pub fn with_rate_limiter(mut self, limiter: Arc<RateLimiter>) -> Self {
self.rate_limiter = limiter;
self
}
#[must_use]
#[allow(
clippy::missing_const_for_fn,
reason = "RetryPolicy contains a Duration whose Drop is non-const"
)]
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub async fn get_totals(&self) -> Result<Vec<Totals>, CowError> {
let data = self.run_query(queries::GET_TOTALS, None).await?;
parse_field(&data, "totals")
}
pub async fn get_last_days_volume(&self, days: u32) -> Result<Vec<DailyVolume>, CowError> {
let data =
self.run_query(queries::GET_LAST_DAYS_VOLUME, Some(json!({ "days": days }))).await?;
parse_field(&data, "dailyTotals")
}
pub async fn get_last_hours_volume(&self, hours: u32) -> Result<Vec<HourlyVolume>, CowError> {
let data =
self.run_query(queries::GET_LAST_HOURS_VOLUME, Some(json!({ "hours": hours }))).await?;
parse_field(&data, "hourlyTotals")
}
pub async fn get_daily_totals(&self, days: u32) -> Result<Vec<DailyTotal>, CowError> {
let data = self.run_query(queries::GET_DAILY_TOTALS, Some(json!({ "n": days }))).await?;
parse_field(&data, "dailyTotals")
}
pub async fn get_hourly_totals(&self, hours: u32) -> Result<Vec<HourlyTotal>, CowError> {
let data = self.run_query(queries::GET_HOURLY_TOTALS, Some(json!({ "n": hours }))).await?;
parse_field(&data, "hourlyTotals")
}
pub async fn get_orders_for_owner(
&self,
owner: &str,
limit: u32,
) -> Result<Vec<Order>, CowError> {
let data = self
.run_query(queries::GET_ORDERS_FOR_OWNER, Some(json!({ "owner": owner, "n": limit })))
.await?;
parse_field(&data, "orders")
}
pub async fn get_eth_price(&self) -> Result<Bundle, CowError> {
let data = self.run_query(queries::GET_ETH_PRICE, None).await?;
parse_field(&data, "bundle")
}
pub async fn get_trades(&self, limit: u32) -> Result<Vec<Trade>, CowError> {
let data = self.run_query(queries::GET_TRADES, Some(json!({ "n": limit }))).await?;
parse_field(&data, "trades")
}
pub async fn get_settlements(&self, limit: u32) -> Result<Vec<Settlement>, CowError> {
let data = self.run_query(queries::GET_SETTLEMENTS, Some(json!({ "n": limit }))).await?;
parse_field(&data, "settlements")
}
pub async fn get_user(&self, address: &str) -> Result<User, CowError> {
let data = self.run_query(queries::GET_USER, Some(json!({ "id": address }))).await?;
parse_field(&data, "user")
}
pub async fn get_tokens(&self, limit: u32) -> Result<Vec<Token>, CowError> {
let data = self.run_query(queries::GET_TOKENS, Some(json!({ "n": limit }))).await?;
parse_field(&data, "tokens")
}
pub async fn get_token(&self, address: &str) -> Result<Token, CowError> {
let data = self.run_query(queries::GET_TOKEN, Some(json!({ "id": address }))).await?;
parse_field(&data, "token")
}
pub async fn get_token_daily_totals(
&self,
token_address: &str,
days: u32,
) -> Result<Vec<TokenDailyTotal>, CowError> {
let data = self
.run_query(
queries::GET_TOKEN_DAILY_TOTALS,
Some(json!({ "token": token_address, "n": days })),
)
.await?;
parse_field(&data, "tokenDailyTotals")
}
pub async fn get_token_hourly_totals(
&self,
token_address: &str,
hours: u32,
) -> Result<Vec<TokenHourlyTotal>, CowError> {
let data = self
.run_query(
queries::GET_TOKEN_HOURLY_TOTALS,
Some(json!({ "token": token_address, "n": hours })),
)
.await?;
parse_field(&data, "tokenHourlyTotals")
}
pub async fn get_token_trading_events(
&self,
token_address: &str,
limit: u32,
) -> Result<Vec<TokenTradingEvent>, CowError> {
let data = self
.run_query(
queries::GET_TOKEN_TRADING_EVENTS,
Some(json!({ "token": token_address, "n": limit })),
)
.await?;
parse_field(&data, "tokenTradingEvents")
}
pub async fn get_pairs(&self, limit: u32) -> Result<Vec<Pair>, CowError> {
let data = self.run_query(queries::GET_PAIRS, Some(json!({ "n": limit }))).await?;
parse_field(&data, "pairs")
}
pub async fn get_pair(&self, id: &str) -> Result<Pair, CowError> {
let data = self.run_query(queries::GET_PAIR, Some(json!({ "id": id }))).await?;
parse_field(&data, "pair")
}
pub async fn get_pair_daily_totals(
&self,
pair_id: &str,
days: u32,
) -> Result<Vec<PairDaily>, CowError> {
let data = self
.run_query(queries::GET_PAIR_DAILY_TOTALS, Some(json!({ "pair": pair_id, "n": days })))
.await?;
parse_field(&data, "pairDailies")
}
pub async fn get_pair_hourly_totals(
&self,
pair_id: &str,
hours: u32,
) -> Result<Vec<PairHourly>, CowError> {
let data = self
.run_query(
queries::GET_PAIR_HOURLY_TOTALS,
Some(json!({ "pair": pair_id, "n": hours })),
)
.await?;
parse_field(&data, "pairHourlies")
}
pub async fn run_query(
&self,
query: &str,
variables: Option<Value>,
) -> Result<Value, CowError> {
let mut body = json!({ "query": query });
if let Some(vars) = variables {
body["variables"] = vars;
}
let max = self.retry_policy.max_attempts.max(1);
let mut attempt: u32 = 0;
let resp = loop {
self.rate_limiter.acquire().await;
let result = self.client.post(&self.base_url).json(&body).send().await;
let last_attempt = attempt + 1 >= max;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
if !self.retry_policy.should_retry_status(status) || last_attempt {
break resp;
}
}
Err(e) => {
if !self.retry_policy.should_retry_error(&e) || last_attempt {
return Err(e.into());
}
}
}
self.retry_policy.wait(self.retry_policy.delay_for_attempt(attempt)).await;
attempt += 1;
};
let status = resp.status().as_u16();
let text = resp.text().await?;
if status != 200 {
return Err(CowError::Api { status, body: text });
}
let parsed: Value = serde_json::from_str(&text)
.map_err(|e| CowError::Parse { field: "response", reason: e.to_string() })?;
if let Some(errors) = parsed.get("errors") {
return Err(CowError::Api { status: 200, body: errors.to_string() });
}
parsed
.get("data")
.cloned()
.ok_or_else(|| CowError::Parse { field: "data", reason: "missing data field".into() })
}
}
fn parse_field<T: DeserializeOwned>(data: &Value, field: &'static str) -> Result<T, CowError> {
let val =
data.get(field).ok_or_else(|| CowError::Parse { field, reason: "field missing".into() })?;
serde_json::from_value(val.clone())
.map_err(|e| CowError::Parse { field, reason: e.to_string() })
}