use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::{
chain::Chain,
error::{Error, Result},
transport::{DefaultTransport, HttpMethod, HttpRequest, HttpTransport},
};
#[derive(Clone, Copy, Debug, thiserror::Error, Eq, PartialEq)]
#[error(
"chain {0:?} has no published subgraph deployment; pass a gateway URL via SubgraphClient::with_bearer_token"
)]
pub struct ChainSubgraphUnavailable(pub Chain);
#[derive(Debug, thiserror::Error)]
pub enum SubgraphError {
#[error(
"subgraph returned {} graphql error(s); first: {}",
errors.len(),
errors.first().map_or("<no message>", |e| e.message.as_str())
)]
GraphQl {
errors: Vec<GraphQlError>,
},
#[error("subgraph response had neither `data` nor `errors`")]
EmptyResponse,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct GraphQlError {
pub message: String,
#[serde(default)]
pub locations: Vec<serde_json::Value>,
#[serde(default)]
pub path: Vec<serde_json::Value>,
#[serde(default)]
pub extensions: Option<serde_json::Value>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Totals {
pub tokens: String,
pub orders: String,
pub traders: String,
pub settlements: String,
#[serde(default)]
pub volume_usd: Option<String>,
#[serde(default)]
pub volume_eth: Option<String>,
#[serde(default)]
pub fees_usd: Option<String>,
#[serde(default)]
pub fees_eth: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DailyTotal {
pub timestamp: i64,
#[serde(default)]
pub volume_usd: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HourlyTotal {
pub timestamp: i64,
#[serde(default)]
pub volume_usd: Option<String>,
}
#[derive(Debug, Deserialize)]
struct Envelope<T> {
#[serde(default = "Option::default")]
data: Option<T>,
#[serde(default)]
errors: Vec<GraphQlError>,
}
#[derive(Debug, Serialize)]
struct Request<'a, V: Serialize> {
query: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
variables: Option<V>,
}
#[derive(Debug, Deserialize)]
struct TotalsData {
totals: Vec<Totals>,
}
#[derive(Debug, Deserialize)]
struct DailyTotalsData {
#[serde(rename = "dailyTotals")]
daily_totals: Vec<DailyTotal>,
}
#[derive(Debug, Deserialize)]
struct HourlyTotalsData {
#[serde(rename = "hourlyTotals")]
hourly_totals: Vec<HourlyTotal>,
}
#[derive(Debug, Serialize)]
struct FirstVariables {
first: u32,
}
#[derive(Clone)]
pub struct SubgraphClient<T = DefaultTransport> {
url: url::Url,
transport: T,
bearer: Option<String>,
}
impl<T> std::fmt::Debug for SubgraphClient<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let url_view = if self.bearer.is_some() {
format!(
"{}://{}/<redacted>",
self.url.scheme(),
self.url.host_str().unwrap_or("")
)
} else {
self.url.to_string()
};
f.debug_struct("SubgraphClient")
.field("url", &url_view)
.field("bearer", &self.bearer.as_ref().map(|_| "<redacted>"))
.finish_non_exhaustive()
}
}
impl SubgraphClient {
pub fn new(url: url::Url) -> Self {
Self::new_with_transport(url, DefaultTransport::default())
}
pub fn with_bearer_token(url: url::Url, token: impl Into<String>) -> Self {
Self {
url,
transport: DefaultTransport::default(),
bearer: Some(token.into()),
}
}
pub fn for_chain_gateway(
chain: Chain,
api_key: impl Into<String>,
) -> std::result::Result<Self, ChainSubgraphUnavailable> {
let id = chain
.subgraph_gateway_deployment_id()
.ok_or(ChainSubgraphUnavailable(chain))?;
let url = url::Url::parse(&format!(
"https://gateway.thegraph.com/api/subgraphs/id/{id}"
))
.expect("hard-coded gateway URL");
Ok(Self::with_bearer_token(url, api_key))
}
}
impl<T: HttpTransport> SubgraphClient<T> {
pub const fn new_with_transport(url: url::Url, transport: T) -> Self {
Self {
url,
transport,
bearer: None,
}
}
pub const fn url(&self) -> &url::Url {
&self.url
}
pub async fn totals(&self) -> Result<Totals> {
let data: TotalsData = self
.execute_no_vars(
r"query Totals {
totals {
tokens
orders
traders
settlements
volumeUsd
volumeEth
feesUsd
feesEth
}
}",
)
.await?;
data.totals
.into_iter()
.next()
.ok_or(Error::Subgraph(SubgraphError::EmptyResponse))
}
pub async fn last_days_volume(&self, days: u32) -> Result<Vec<DailyTotal>> {
let data: DailyTotalsData = self
.last_volume_rows(
r"query LastDaysVolume($first: Int!) {
dailyTotals(orderBy: timestamp, orderDirection: desc, first: $first) {
timestamp
volumeUsd
}
}",
days,
)
.await?;
Ok(data.daily_totals)
}
pub async fn last_hours_volume(&self, hours: u32) -> Result<Vec<HourlyTotal>> {
let data: HourlyTotalsData = self
.last_volume_rows(
r"query LastHoursVolume($first: Int!) {
hourlyTotals(orderBy: timestamp, orderDirection: desc, first: $first) {
timestamp
volumeUsd
}
}",
hours,
)
.await?;
Ok(data.hourly_totals)
}
async fn last_volume_rows<TData>(&self, query: &str, first: u32) -> Result<TData>
where
TData: DeserializeOwned,
{
self.execute(query, Some(FirstVariables { first })).await
}
pub async fn execute<TVars, TData>(
&self,
query: &str,
variables: Option<TVars>,
) -> Result<TData>
where
TVars: Serialize,
TData: DeserializeOwned,
{
let body = Request { query, variables };
let response = self
.transport
.execute(HttpRequest {
method: HttpMethod::Post,
url: self.url.clone(),
json_body: Some(serde_json::to_vec(&body)?),
bearer: self.bearer.clone(),
})
.await?;
if !response.is_success() {
return Err(Error::UnexpectedStatus {
status: response.status,
body: response.body,
});
}
let envelope: Envelope<TData> = serde_json::from_str(&response.body)?;
if !envelope.errors.is_empty() {
return Err(Error::Subgraph(SubgraphError::GraphQl {
errors: envelope.errors,
}));
}
envelope
.data
.ok_or(Error::Subgraph(SubgraphError::EmptyResponse))
}
pub async fn execute_no_vars<TData>(&self, query: &str) -> Result<TData>
where
TData: DeserializeOwned,
{
self.execute::<(), TData>(query, None).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(target_arch = "wasm32"))]
use crate::order_book::MAX_RESPONSE_BYTES;
#[test]
fn totals_round_trips_through_serde() {
let raw = serde_json::json!({
"tokens": "1234",
"orders": "987654",
"traders": "12345",
"settlements": "543210",
"volumeUsd": "1234567890.12",
"volumeEth": "12345.67",
"feesUsd": "12345.67",
"feesEth": "1.234"
});
let totals: Totals = serde_json::from_value(raw).unwrap();
assert_eq!(totals.orders, "987654");
assert_eq!(totals.volume_usd.as_deref(), Some("1234567890.12"));
assert_eq!(totals.fees_eth.as_deref(), Some("1.234"));
}
#[test]
fn totals_tolerates_missing_optional_fields() {
let raw = serde_json::json!({
"tokens": "0",
"orders": "0",
"traders": "0",
"settlements": "0"
});
let totals: Totals = serde_json::from_value(raw).unwrap();
assert!(totals.volume_usd.is_none());
assert!(totals.fees_eth.is_none());
}
#[test]
fn daily_total_parses_canonical_response_row() {
let raw = serde_json::json!({
"timestamp": 1_700_000_000_i64,
"volumeUsd": "42000000.42"
});
let row: DailyTotal = serde_json::from_value(raw).unwrap();
assert_eq!(row.timestamp, 1_700_000_000);
assert_eq!(row.volume_usd.as_deref(), Some("42000000.42"));
}
#[test]
fn hourly_total_tolerates_null_volume() {
let raw = serde_json::json!({
"timestamp": 1_700_000_000_i64,
"volumeUsd": null
});
let row: HourlyTotal = serde_json::from_value(raw).unwrap();
assert!(row.volume_usd.is_none());
}
#[test]
fn graphql_error_response_parses() {
let raw = serde_json::json!({
"errors": [{
"message": "Field 'foo' is not defined",
"locations": [{"line": 1, "column": 12}],
"path": ["foo"],
"extensions": {"code": "GRAPHQL_VALIDATION_FAILED"}
}]
});
let envelope: Envelope<TotalsData> = serde_json::from_value(raw).unwrap();
assert!(envelope.data.is_none());
assert_eq!(envelope.errors.len(), 1);
assert!(envelope.errors[0].message.contains("not defined"));
}
#[test]
fn request_body_includes_variables_when_present() {
let body = Request {
query: "query LastDaysVolume($first: Int!) { dailyTotals(first: $first) { timestamp } }",
variables: Some(FirstVariables { first: 7 }),
};
let json = serde_json::to_value(&body).unwrap();
assert_eq!(json["variables"]["first"], 7);
assert!(json["query"].as_str().unwrap().contains("LastDaysVolume"));
}
#[test]
fn request_body_omits_variables_when_absent() {
let body: Request<'_, ()> = Request {
query: "query Totals { totals { orders } }",
variables: None,
};
let json = serde_json::to_value(&body).unwrap();
assert!(json.get("variables").is_none());
}
#[test]
fn client_url_is_preserved() {
let url = url::Url::parse("https://example.test/subgraphs/cow").unwrap();
let client = SubgraphClient::new(url.clone());
assert_eq!(client.url(), &url);
}
#[test]
fn bearer_token_constructor_stores_it() {
let url = url::Url::parse("https://example.test/").unwrap();
let client = SubgraphClient::with_bearer_token(url, "tok_abc");
assert_eq!(client.bearer.as_deref(), Some("tok_abc"));
}
#[test]
fn debug_does_not_leak_bearer_token() {
let url = url::Url::parse("https://example.test/").unwrap();
let secret = "super-secret-token-xyz-do-not-leak";
let client = SubgraphClient::with_bearer_token(url, secret);
let rendered = format!("{client:?}");
assert!(
!rendered.contains(secret),
"bearer token leaked through Debug: {rendered}"
);
assert!(
rendered.contains("redacted"),
"expected '<redacted>' marker in Debug output, got: {rendered}"
);
let path_key = "API-KEY-IN-URL-DO-NOT-LEAK";
let gateway = url::Url::parse(&format!(
"https://gateway.thegraph.com/api/{path_key}/subgraphs/id/xyz",
))
.unwrap();
let gw_client = SubgraphClient::with_bearer_token(gateway, "bearer-token");
let gw_rendered = format!("{gw_client:?}");
assert!(
!gw_rendered.contains(path_key),
"gateway URL path leaked through Debug: {gw_rendered}"
);
let no_token = SubgraphClient::new(url::Url::parse("https://example.test/").unwrap());
assert!(format!("{no_token:?}").contains("None"));
let request = HttpRequest {
method: HttpMethod::Post,
url: url::Url::parse("https://example.test/").unwrap(),
json_body: None,
bearer: Some(secret.to_owned()),
};
let request_rendered = format!("{request:?}");
assert!(
!request_rendered.contains(secret),
"bearer token leaked through HttpRequest Debug: {request_rendered}"
);
assert!(
request_rendered.contains("redacted"),
"expected '<redacted>' marker in HttpRequest Debug output, got: {request_rendered}"
);
}
#[test]
fn for_chain_gateway_resolves_five_supported_chains() {
for chain in [
Chain::Mainnet,
Chain::Gnosis,
Chain::ArbitrumOne,
Chain::Base,
Chain::Sepolia,
] {
let id = chain.subgraph_gateway_deployment_id().unwrap();
let client = SubgraphClient::for_chain_gateway(chain, "test-key").unwrap();
assert_eq!(client.url().scheme(), "https");
assert_eq!(client.url().host_str(), Some("gateway.thegraph.com"));
assert!(client.url().path().ends_with(id));
assert!(!client.url().path().contains("test-key"));
}
}
#[test]
fn for_chain_gateway_rejects_chains_without_deployment() {
for chain in [
Chain::Bnb,
Chain::Polygon,
Chain::Plasma,
Chain::Avalanche,
Chain::Linea,
] {
let err = SubgraphClient::for_chain_gateway(chain, "test-key").unwrap_err();
assert_eq!(err, ChainSubgraphUnavailable(chain));
}
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn totals_errors_on_empty_result_set() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"data":{"totals":[]}}"#))
.mount(&server)
.await;
let client = SubgraphClient::new(server.uri().parse().unwrap());
let err = client.totals().await.unwrap_err();
assert!(
matches!(err, Error::Subgraph(SubgraphError::EmptyResponse)),
"expected EmptyResponse, got {err:?}"
);
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn execute_keeps_unexpected_status_for_api_error_shaped_body() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
let server = MockServer::start().await;
let api_error_body = r#"{"errorType":"NoLiquidity","description":"boom"}"#;
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(500).set_body_string(api_error_body))
.mount(&server)
.await;
let client = SubgraphClient::new(server.uri().parse().unwrap());
let err = client
.execute::<(), serde_json::Value>("query { totals { orders } }", None)
.await
.unwrap_err();
match err {
Error::UnexpectedStatus { status, body } => {
assert_eq!(status, 500);
assert_eq!(body, api_error_body);
}
other => panic!("expected UnexpectedStatus, got {other:?}"),
}
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn execute_sends_bearer_token_as_authorization_header() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{header, method, path},
};
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(header("authorization", "Bearer tok_abc"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"data":{"ok":true}}"#))
.mount(&server)
.await;
let client = SubgraphClient::with_bearer_token(server.uri().parse().unwrap(), "tok_abc");
let data: serde_json::Value = client
.execute_no_vars("query { totals { orders } }")
.await
.unwrap();
assert_eq!(data["ok"], true);
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn execute_rejects_response_above_size_cap() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
let server = MockServer::start().await;
let oversize_body = "a".repeat(MAX_RESPONSE_BYTES + 1);
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_string(oversize_body))
.mount(&server)
.await;
let client = SubgraphClient::new(server.uri().parse().unwrap());
let err = client
.execute::<(), serde_json::Value>("query { totals { orders } }", None)
.await
.unwrap_err();
match err {
Error::ResponseTooLarge { max } => assert_eq!(max, MAX_RESPONSE_BYTES),
other => panic!("expected ResponseTooLarge, got {other:?}"),
}
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn execute_accepts_response_at_size_cap() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
let server = MockServer::start().await;
let at_cap_body = "a".repeat(MAX_RESPONSE_BYTES);
Mock::given(method("POST"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_string(at_cap_body))
.mount(&server)
.await;
let client = SubgraphClient::new(server.uri().parse().unwrap());
let err = client
.execute::<(), serde_json::Value>("query { totals { orders } }", None)
.await
.unwrap_err();
assert!(
!matches!(err, Error::ResponseTooLarge { .. }),
"body at the cap must not trip ResponseTooLarge: {err:?}"
);
}
}