use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use crate::entity::{EntityQuery, EntitySchema, EntityStore, FieldType, QueryFilter, SortOrder};
use crate::error::IndexerError;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GraphqlError {
pub message: String,
}
impl GraphqlError {
fn new(msg: impl Into<String>) -> Self {
Self {
message: msg.into(),
}
}
}
impl From<IndexerError> for GraphqlError {
fn from(e: IndexerError) -> Self {
Self::new(e.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphqlResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub errors: Option<Vec<GraphqlError>>,
}
impl GraphqlResponse {
pub fn ok(data: JsonValue) -> Self {
Self {
data: Some(data),
errors: None,
}
}
pub fn err(msg: impl Into<String>) -> Self {
Self {
data: None,
errors: Some(vec![GraphqlError::new(msg)]),
}
}
pub fn errors(errors: Vec<GraphqlError>) -> Self {
Self {
data: None,
errors: Some(errors),
}
}
pub fn is_error(&self) -> bool {
self.errors.is_some()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionConfig {
pub entity_types: Vec<String>,
pub events: Vec<SubscriptionEvent>,
pub from_block: Option<u64>,
pub buffer_size: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SubscriptionEvent {
Insert,
Update,
Delete,
Reorg,
}
impl Default for SubscriptionConfig {
fn default() -> Self {
Self {
entity_types: Vec::new(),
events: vec![
SubscriptionEvent::Insert,
SubscriptionEvent::Update,
SubscriptionEvent::Delete,
SubscriptionEvent::Reorg,
],
from_block: None,
buffer_size: 256,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct GraphqlSchema {
entities: Vec<EntitySchema>,
}
impl GraphqlSchema {
pub fn new() -> Self {
Self::default()
}
pub fn add_entity(&mut self, schema: EntitySchema) {
self.entities.push(schema);
}
pub fn sdl(&self) -> String {
let mut out = String::new();
out.push_str("scalar BigInt\n\n");
for schema in &self.entities {
out.push_str(&self.entity_type_sdl(schema));
out.push_str(&self.filter_input_sdl(schema));
}
out.push_str("enum OrderDirection {\n asc\n desc\n}\n\n");
out.push_str("type Query {\n");
for schema in &self.entities {
let type_name = pascal_case(&schema.name);
let singular = schema.name.clone();
let plural = format!("{}s", schema.name);
out.push_str(&format!(" {}(id: String!): {}\n", singular, type_name));
out.push_str(&format!(
" {}(where: {}_filter, orderBy: String, orderDirection: OrderDirection, first: Int, skip: Int): [{}!]!\n",
plural, schema.name, type_name
));
}
out.push_str("}\n");
out
}
fn entity_type_sdl(&self, schema: &EntitySchema) -> String {
let type_name = pascal_case(&schema.name);
let mut out = format!("type {} {{\n", type_name);
out.push_str(" id: String!\n");
out.push_str(" blockNumber: BigInt!\n");
out.push_str(" txHash: String!\n");
out.push_str(" logIndex: Int!\n");
for field in &schema.fields {
let gql_type = field_type_to_gql(&field.field_type, field.nullable);
out.push_str(&format!(" {}: {}\n", field.name, gql_type));
}
out.push_str("}\n\n");
out
}
fn filter_input_sdl(&self, schema: &EntitySchema) -> String {
let mut out = format!("input {}_filter {{\n", schema.name);
for field in &schema.fields {
let base = field_type_to_gql_scalar(&field.field_type);
out.push_str(&format!(" {}: {}\n", field.name, base));
out.push_str(&format!(" {}_gt: {}\n", field.name, base));
out.push_str(&format!(" {}_lt: {}\n", field.name, base));
out.push_str(&format!(" {}_gte: {}\n", field.name, base));
out.push_str(&format!(" {}_lte: {}\n", field.name, base));
out.push_str(&format!(" {}_in: [{}]\n", field.name, base));
}
out.push_str("}\n\n");
out
}
}
#[derive(Debug, Clone)]
struct ParsedSelection {
field: String,
args: HashMap<String, ArgValue>,
sub_fields: Vec<String>,
}
#[derive(Debug, Clone)]
enum ArgValue {
Str(String),
Num(f64),
Obj(HashMap<String, ArgValue>),
Ident(String),
}
impl ArgValue {
fn as_str(&self) -> Option<&str> {
match self {
ArgValue::Str(s) => Some(s.as_str()),
ArgValue::Ident(s) => Some(s.as_str()),
_ => None,
}
}
fn as_usize(&self) -> Option<usize> {
match self {
ArgValue::Num(n) => Some(*n as usize),
_ => None,
}
}
fn as_obj(&self) -> Option<&HashMap<String, ArgValue>> {
match self {
ArgValue::Obj(m) => Some(m),
_ => None,
}
}
}
struct Parser<'a> {
src: &'a [u8],
pos: usize,
}
impl<'a> Parser<'a> {
fn new(src: &'a str) -> Self {
Self {
src: src.as_bytes(),
pos: 0,
}
}
fn peek(&self) -> Option<u8> {
self.src.get(self.pos).copied()
}
fn consume(&mut self) -> Option<u8> {
let b = self.src.get(self.pos).copied();
if b.is_some() {
self.pos += 1;
}
b
}
fn skip_ws(&mut self) {
while let Some(b) = self.peek() {
if b == b'#' {
while let Some(c) = self.consume() {
if c == b'\n' {
break;
}
}
} else if b.is_ascii_whitespace() || b == b',' {
self.consume();
} else {
break;
}
}
}
fn expect(&mut self, ch: u8) -> Result<(), String> {
self.skip_ws();
match self.consume() {
Some(b) if b == ch => Ok(()),
Some(b) => Err(format!(
"expected '{}' but got '{}' at position {}",
ch as char, b as char, self.pos
)),
None => Err(format!(
"expected '{}' but reached end of input",
ch as char
)),
}
}
fn read_name(&mut self) -> Option<String> {
self.skip_ws();
let start = self.pos;
while let Some(b) = self.peek() {
if b.is_ascii_alphanumeric() || b == b'_' {
self.consume();
} else {
break;
}
}
if self.pos > start {
Some(String::from_utf8_lossy(&self.src[start..self.pos]).into_owned())
} else {
None
}
}
fn read_string(&mut self) -> Result<String, String> {
let mut s = String::new();
loop {
match self.consume() {
Some(b'"') => break,
Some(b'\\') => match self.consume() {
Some(b'"') => s.push('"'),
Some(b'\\') => s.push('\\'),
Some(b'n') => s.push('\n'),
Some(b't') => s.push('\t'),
Some(c) => s.push(c as char),
None => return Err("unterminated string escape".into()),
},
Some(c) => s.push(c as char),
None => return Err("unterminated string literal".into()),
}
}
Ok(s)
}
fn read_number(&mut self, first: u8) -> ArgValue {
let mut buf = String::new();
buf.push(first as char);
while let Some(b) = self.peek() {
if b.is_ascii_digit() || b == b'.' || b == b'-' || b == b'e' || b == b'E' {
buf.push(b as char);
self.consume();
} else {
break;
}
}
ArgValue::Num(buf.parse::<f64>().unwrap_or(0.0))
}
fn read_arg_value(&mut self) -> Result<ArgValue, String> {
self.skip_ws();
match self.peek() {
Some(b'"') => {
self.consume();
Ok(ArgValue::Str(self.read_string()?))
}
Some(b'{') => {
self.consume();
let obj = self.read_object()?;
Ok(ArgValue::Obj(obj))
}
Some(b) if b.is_ascii_digit() || b == b'-' => {
let first = self.consume().unwrap();
Ok(self.read_number(first))
}
Some(_) => {
match self.read_name() {
Some(name) => Ok(ArgValue::Ident(name)),
None => Err(format!("unexpected character at pos {}", self.pos)),
}
}
None => Err("unexpected end of input in argument value".into()),
}
}
fn read_object(&mut self) -> Result<HashMap<String, ArgValue>, String> {
let mut map = HashMap::new();
loop {
self.skip_ws();
if self.peek() == Some(b'}') {
self.consume();
break;
}
let key = self.read_name().ok_or("expected object key")?;
self.skip_ws();
self.expect(b':')?;
let val = self.read_arg_value()?;
map.insert(key, val);
}
Ok(map)
}
fn read_args(&mut self) -> Result<HashMap<String, ArgValue>, String> {
let mut args = HashMap::new();
loop {
self.skip_ws();
if self.peek() == Some(b')') {
self.consume();
break;
}
let key = self.read_name().ok_or("expected argument name")?;
self.skip_ws();
self.expect(b':')?;
let val = self.read_arg_value()?;
args.insert(key, val);
}
Ok(args)
}
fn read_sub_fields(&mut self) -> Result<Vec<String>, String> {
let mut fields = Vec::new();
loop {
self.skip_ws();
if self.peek() == Some(b'}') {
self.consume();
break;
}
if self.peek() == Some(b'{') {
self.consume();
self.read_sub_fields()?; continue;
}
match self.read_name() {
Some(name) => {
self.skip_ws();
if self.peek() == Some(b'{') {
self.consume();
self.read_sub_fields()?;
}
fields.push(name);
}
None => {
return Err(format!("expected field name at pos {}", self.pos));
}
}
}
Ok(fields)
}
fn parse(&mut self) -> Result<Vec<ParsedSelection>, String> {
self.skip_ws();
if let Some(b'q') | Some(b'm') | Some(b's') = self.peek() {
let kw = self.read_name().unwrap_or_default();
if kw != "query" && kw != "mutation" && kw != "subscription" {
return Err(format!("unexpected keyword '{kw}' at document start"));
}
self.skip_ws();
if self
.peek()
.is_some_and(|b| b.is_ascii_alphabetic() || b == b'_')
{
self.read_name();
}
}
self.skip_ws();
self.expect(b'{')?;
let mut selections = Vec::new();
loop {
self.skip_ws();
if self.peek() == Some(b'}') {
self.consume();
break;
}
let field = self.read_name().ok_or("expected selection field name")?;
self.skip_ws();
let mut args = HashMap::new();
if self.peek() == Some(b'(') {
self.consume();
args = self.read_args()?;
}
self.skip_ws();
let mut sub_fields = Vec::new();
if self.peek() == Some(b'{') {
self.consume();
sub_fields = self.read_sub_fields()?;
}
selections.push(ParsedSelection {
field,
args,
sub_fields,
});
}
Ok(selections)
}
}
pub struct GraphqlExecutor {
store: Arc<dyn EntityStore>,
schema: RwLock<GraphqlSchema>,
}
impl GraphqlExecutor {
pub fn new(store: Arc<dyn EntityStore>) -> Self {
Self {
store,
schema: RwLock::new(GraphqlSchema::new()),
}
}
pub fn register_schema(&self, entity_schema: EntitySchema) {
let mut schema = self.schema.write().expect("schema lock poisoned");
schema.add_entity(entity_schema);
}
pub fn introspect(&self) -> String {
let schema = self.schema.read().expect("schema lock poisoned");
schema.sdl()
}
pub async fn execute(&self, query: &str) -> GraphqlResponse {
let selections = match Parser::new(query).parse() {
Ok(s) => s,
Err(e) => return GraphqlResponse::err(format!("Parse error: {e}")),
};
let mut data_map = serde_json::Map::new();
let mut errors: Vec<GraphqlError> = Vec::new();
for sel in selections {
if sel.field == "__schema" || sel.field == "__type" {
let sdl = self.introspect();
data_map.insert(sel.field.clone(), JsonValue::String(sdl));
continue;
}
match self.execute_selection(&sel).await {
Ok(value) => {
data_map.insert(sel.field.clone(), value);
}
Err(e) => {
errors.push(e);
}
}
}
if !errors.is_empty() {
return GraphqlResponse::errors(errors);
}
GraphqlResponse::ok(JsonValue::Object(data_map))
}
async fn execute_selection(&self, sel: &ParsedSelection) -> Result<JsonValue, GraphqlError> {
let field = &sel.field;
let (entity_type, is_singular) = self.resolve_entity_type(field);
let entity_type = entity_type.ok_or_else(|| {
GraphqlError::new(format!("Unknown field '{}': no entity type found", field))
})?;
if is_singular {
self.execute_single(&entity_type, sel).await
} else {
self.execute_collection(&entity_type, sel).await
}
}
fn resolve_entity_type(&self, field: &str) -> (Option<String>, bool) {
let schema = self.schema.read().expect("schema lock poisoned");
if schema.entities.iter().any(|e| e.name == field) {
return (Some(field.to_string()), true);
}
for entity in &schema.entities {
let plural = format!("{}s", entity.name);
if *field == plural {
return (Some(entity.name.clone()), false);
}
}
(None, false)
}
async fn execute_single(
&self,
entity_type: &str,
sel: &ParsedSelection,
) -> Result<JsonValue, GraphqlError> {
let id = sel
.args
.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| GraphqlError::new("Singular query requires an 'id' argument"))?;
let _query = EntityQuery::new(entity_type)
.filter(QueryFilter::Eq(
"id".to_string(),
JsonValue::String(id.to_string()),
))
.limit(1);
let query_no_id_filter = EntityQuery::new(entity_type);
let mut rows = self
.store
.query(query_no_id_filter)
.await
.map_err(GraphqlError::from)?;
rows.retain(|r| r.id == id);
if rows.is_empty() {
return Ok(JsonValue::Null);
}
let row = &rows[0];
Ok(self.project_row(row, &sel.sub_fields))
}
async fn execute_collection(
&self,
entity_type: &str,
sel: &ParsedSelection,
) -> Result<JsonValue, GraphqlError> {
let first = sel.args.get("first").and_then(|v| v.as_usize());
let skip = sel.args.get("skip").and_then(|v| v.as_usize());
let order_by = sel
.args
.get("orderBy")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let order_direction = sel
.args
.get("orderDirection")
.and_then(|v| v.as_str())
.unwrap_or("asc")
.to_lowercase();
let sort_order = if order_direction == "desc" {
SortOrder::Desc
} else {
SortOrder::Asc
};
let filters = if let Some(where_arg) = sel.args.get("where") {
let obj = where_arg
.as_obj()
.ok_or_else(|| GraphqlError::new("'where' argument must be an object"))?;
self.parse_where_filters(obj)?
} else {
Vec::new()
};
let mut q = EntityQuery::new(entity_type);
for f in filters {
q = q.filter(f);
}
if let Some(ob) = order_by {
q = q.order_by(ob, sort_order);
}
if let Some(n) = first {
q = q.limit(n);
}
if let Some(n) = skip {
q = q.offset(n);
}
let rows = self.store.query(q).await.map_err(GraphqlError::from)?;
let values: Vec<JsonValue> = rows
.iter()
.map(|row| self.project_row(row, &sel.sub_fields))
.collect();
Ok(JsonValue::Array(values))
}
fn parse_where_filters(
&self,
obj: &HashMap<String, ArgValue>,
) -> Result<Vec<QueryFilter>, GraphqlError> {
let mut filters = Vec::new();
for (key, val) in obj {
let json_val = arg_to_json(val);
if let Some(field) = key.strip_suffix("_gt") {
filters.push(QueryFilter::Gt(field.to_string(), json_val));
} else if let Some(field) = key.strip_suffix("_lt") {
filters.push(QueryFilter::Lt(field.to_string(), json_val));
} else if let Some(field) = key.strip_suffix("_gte") {
filters.push(QueryFilter::Gte(field.to_string(), json_val));
} else if let Some(field) = key.strip_suffix("_lte") {
filters.push(QueryFilter::Lte(field.to_string(), json_val));
} else if let Some(field) = key.strip_suffix("_in") {
let items = match json_val {
JsonValue::Array(arr) => arr,
JsonValue::String(s) => {
serde_json::from_str::<Vec<JsonValue>>(&s)
.unwrap_or_else(|_| vec![JsonValue::String(s)])
}
other => vec![other],
};
filters.push(QueryFilter::In(field.to_string(), items));
} else {
filters.push(QueryFilter::Eq(key.clone(), json_val));
}
}
Ok(filters)
}
fn project_row(&self, row: &crate::entity::EntityRow, sub_fields: &[String]) -> JsonValue {
let mut obj = serde_json::Map::new();
let include_all = sub_fields.is_empty();
let want = |name: &str| -> bool { include_all || sub_fields.iter().any(|f| f == name) };
if want("id") {
obj.insert("id".to_string(), JsonValue::String(row.id.clone()));
}
if want("blockNumber") {
obj.insert(
"blockNumber".to_string(),
JsonValue::Number(row.block_number.into()),
);
}
if want("txHash") {
obj.insert("txHash".to_string(), JsonValue::String(row.tx_hash.clone()));
}
if want("logIndex") {
obj.insert(
"logIndex".to_string(),
JsonValue::Number(row.log_index.into()),
);
}
for (k, v) in &row.data {
if want(k) {
obj.insert(k.clone(), v.clone());
}
}
JsonValue::Object(obj)
}
}
fn arg_to_json(val: &ArgValue) -> JsonValue {
match val {
ArgValue::Str(s) => JsonValue::String(s.clone()),
ArgValue::Num(n) => {
if n.fract() == 0.0 && *n >= 0.0 && *n <= u64::MAX as f64 {
JsonValue::Number((*n as u64).into())
} else if n.fract() == 0.0 && *n < 0.0 && *n >= i64::MIN as f64 {
JsonValue::Number((*n as i64).into())
} else {
serde_json::Number::from_f64(*n)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null)
}
}
ArgValue::Ident(s) => {
match s.as_str() {
"true" => JsonValue::Bool(true),
"false" => JsonValue::Bool(false),
"null" => JsonValue::Null,
_ => JsonValue::String(s.clone()),
}
}
ArgValue::Obj(map) => {
let mut obj = serde_json::Map::new();
for (k, v) in map {
obj.insert(k.clone(), arg_to_json(v));
}
JsonValue::Object(obj)
}
}
}
fn pascal_case(s: &str) -> String {
s.split('_')
.map(|part| {
let mut c = part.chars();
match c.next() {
None => String::new(),
Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
}
})
.collect()
}
fn field_type_to_gql(ft: &FieldType, nullable: bool) -> String {
let base = field_type_to_gql_scalar(ft);
if nullable {
base.to_string()
} else {
format!("{}!", base)
}
}
fn field_type_to_gql_scalar(ft: &FieldType) -> &'static str {
match ft {
FieldType::String => "String",
FieldType::Int64 => "BigInt",
FieldType::Uint64 => "BigInt",
FieldType::Float64 => "Float",
FieldType::Bool => "Boolean",
FieldType::Json => "String",
FieldType::Bytes => "String",
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::*;
use crate::entity::{
EntityRow, EntitySchemaBuilder, EntityStore, FieldType, MemoryEntityStore,
};
fn swap_schema() -> EntitySchema {
EntitySchemaBuilder::new("swap")
.primary_key("id")
.field("pool", FieldType::String, true)
.field("amount0", FieldType::Uint64, false)
.field("amount1", FieldType::Uint64, false)
.nullable_field("trader", FieldType::String, false)
.build()
}
fn transfer_schema() -> EntitySchema {
EntitySchemaBuilder::new("transfer")
.primary_key("id")
.field("from", FieldType::String, true)
.field("to", FieldType::String, true)
.field("value", FieldType::Uint64, false)
.build()
}
fn make_swap(id: &str, pool: &str, amount0: u64, amount1: u64, block: u64) -> EntityRow {
let mut data = HashMap::new();
data.insert("pool".to_string(), serde_json::json!(pool));
data.insert("amount0".to_string(), serde_json::json!(amount0));
data.insert("amount1".to_string(), serde_json::json!(amount1));
EntityRow {
id: id.to_string(),
entity_type: "swap".to_string(),
block_number: block,
tx_hash: format!("0xtx_{id}"),
log_index: 0,
data,
}
}
fn make_transfer(id: &str, from: &str, to: &str, value: u64, block: u64) -> EntityRow {
let mut data = HashMap::new();
data.insert("from".to_string(), serde_json::json!(from));
data.insert("to".to_string(), serde_json::json!(to));
data.insert("value".to_string(), serde_json::json!(value));
EntityRow {
id: id.to_string(),
entity_type: "transfer".to_string(),
block_number: block,
tx_hash: format!("0xtx_{id}"),
log_index: 0,
data,
}
}
async fn seeded_executor() -> GraphqlExecutor {
let store = Arc::new(MemoryEntityStore::new());
store.register_schema(&swap_schema()).await.unwrap();
store.register_schema(&transfer_schema()).await.unwrap();
store
.upsert(make_swap("s1", "0xPOOL_A", 1000, 500, 10))
.await
.unwrap();
store
.upsert(make_swap("s2", "0xPOOL_A", 2000, 1000, 11))
.await
.unwrap();
store
.upsert(make_swap("s3", "0xPOOL_B", 3000, 1500, 12))
.await
.unwrap();
store
.upsert(make_transfer("t1", "0xAlice", "0xBob", 100, 10))
.await
.unwrap();
store
.upsert(make_transfer("t2", "0xBob", "0xCharlie", 200, 11))
.await
.unwrap();
let executor = GraphqlExecutor::new(store);
executor.register_schema(swap_schema());
executor.register_schema(transfer_schema());
executor
}
#[test]
fn test_schema_generation_contains_type() {
let mut gql_schema = GraphqlSchema::new();
gql_schema.add_entity(swap_schema());
let sdl = gql_schema.sdl();
assert!(sdl.contains("type Swap {"), "SDL missing Swap type:\n{sdl}");
assert!(
sdl.contains("pool: String!"),
"SDL missing pool field:\n{sdl}"
);
assert!(
sdl.contains("amount0: BigInt!"),
"SDL missing amount0 field:\n{sdl}"
);
assert!(
sdl.contains("trader: String"),
"SDL missing nullable trader field:\n{sdl}"
);
}
#[test]
fn test_schema_generation_filter_input() {
let mut gql_schema = GraphqlSchema::new();
gql_schema.add_entity(swap_schema());
let sdl = gql_schema.sdl();
assert!(
sdl.contains("input swap_filter {"),
"SDL missing swap_filter input:\n{sdl}"
);
assert!(
sdl.contains("amount0_gt:"),
"SDL missing amount0_gt in filter:\n{sdl}"
);
assert!(
sdl.contains("pool_in:"),
"SDL missing pool_in in filter:\n{sdl}"
);
}
#[test]
fn test_schema_generation_query_type() {
let mut gql_schema = GraphqlSchema::new();
gql_schema.add_entity(swap_schema());
let sdl = gql_schema.sdl();
assert!(
sdl.contains("type Query {"),
"SDL missing Query type:\n{sdl}"
);
assert!(
sdl.contains("swap(id: String!): Swap"),
"SDL missing singular swap:\n{sdl}"
);
assert!(sdl.contains("swaps("), "SDL missing plural swaps:\n{sdl}");
}
#[test]
fn test_pascal_case_conversion() {
assert_eq!(pascal_case("swap"), "Swap");
assert_eq!(pascal_case("erc20_transfer"), "Erc20Transfer");
assert_eq!(pascal_case("uniswap_v3_pool"), "UniswapV3Pool");
}
#[tokio::test]
async fn test_introspection() {
let executor = seeded_executor().await;
let sdl = executor.introspect();
assert!(
sdl.contains("type Swap {"),
"introspect missing Swap type:\n{sdl}"
);
assert!(
sdl.contains("type Transfer {"),
"introspect missing Transfer type:\n{sdl}"
);
}
#[tokio::test]
async fn test_introspection_query() {
let executor = seeded_executor().await;
let resp = executor.execute("{ __schema { types { name } } }").await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let data = resp.data.unwrap();
let sdl = data["__schema"].as_str().unwrap();
assert!(sdl.contains("type Swap {"));
}
#[tokio::test]
async fn test_collection_query_all() {
let executor = seeded_executor().await;
let resp = executor
.execute("{ swaps { id pool amount0 amount1 } }")
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 3);
}
#[tokio::test]
async fn test_singular_query_by_id() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swap(id: "s2") { id pool amount0 } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let row = &resp.data.unwrap()["swap"];
assert_eq!(row["id"], "s2");
assert_eq!(row["pool"], "0xPOOL_A");
assert_eq!(row["amount0"], 2000);
}
#[tokio::test]
async fn test_singular_query_missing_id() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swap(id: "nonexistent") { id } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
assert_eq!(resp.data.unwrap()["swap"], JsonValue::Null);
}
#[tokio::test]
async fn test_collection_with_where_filter() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swaps(where: { pool: "0xPOOL_A" }) { id pool } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 2);
for row in &arr {
assert_eq!(row["pool"], "0xPOOL_A");
}
}
#[tokio::test]
async fn test_collection_pagination() {
let executor = seeded_executor().await;
let resp = executor
.execute(
r#"{ swaps(first: 1, skip: 1, orderBy: "amount0", orderDirection: "asc") { id amount0 } }"#,
)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["amount0"], 2000);
}
#[tokio::test]
async fn test_collection_order_desc() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swaps(orderBy: "amount0", orderDirection: "desc") { id amount0 } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 3);
let first_amount = arr[0]["amount0"].as_u64().unwrap();
let last_amount = arr[2]["amount0"].as_u64().unwrap();
assert!(first_amount > last_amount, "expected descending order");
}
#[tokio::test]
async fn test_unknown_entity_returns_error() {
let executor = seeded_executor().await;
let resp = executor.execute("{ unknownEntity { id } }").await;
assert!(resp.is_error(), "expected an error for unknown entity");
let errs = resp.errors.unwrap();
assert!(
errs[0].message.contains("Unknown field"),
"wrong error message: {}",
errs[0].message
);
}
#[tokio::test]
async fn test_where_gt_filter() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swaps(where: { amount0_gt: 1000 }) { id amount0 } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 2);
for row in &arr {
assert!(row["amount0"].as_u64().unwrap() > 1000);
}
}
#[test]
fn test_response_ok_format() {
let resp = GraphqlResponse::ok(serde_json::json!({ "swap": { "id": "s1" } }));
assert!(!resp.is_error());
let json = serde_json::to_value(&resp).unwrap();
assert!(json.get("data").is_some());
assert!(json.get("errors").is_none());
assert_eq!(json["data"]["swap"]["id"], "s1");
}
#[test]
fn test_response_error_format() {
let resp = GraphqlResponse::err("something went wrong");
assert!(resp.is_error());
let json = serde_json::to_value(&resp).unwrap();
assert!(json.get("errors").is_some());
assert!(json.get("data").is_none());
assert_eq!(json["errors"][0]["message"], "something went wrong");
}
#[tokio::test]
async fn test_field_projection() {
let executor = seeded_executor().await;
let resp = executor
.execute(
r#"{ swaps(first: 1, orderBy: "amount0", orderDirection: "asc") { id pool } }"#,
)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let row = &resp.data.unwrap()["swaps"][0];
assert!(row.get("id").is_some());
assert!(row.get("pool").is_some());
assert!(
row.get("amount0").is_none(),
"amount0 should be projected out"
);
}
#[tokio::test]
async fn test_multi_entity_query() {
let executor = seeded_executor().await;
let resp = executor.execute("{ swaps { id } transfers { id } }").await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let data = resp.data.unwrap();
assert_eq!(data["swaps"].as_array().unwrap().len(), 3);
assert_eq!(data["transfers"].as_array().unwrap().len(), 2);
}
#[tokio::test]
async fn test_parse_error() {
let executor = seeded_executor().await;
let resp = executor.execute("{ unclosed { id ").await;
assert!(resp.is_error(), "expected parse error");
}
#[test]
fn test_subscription_config_default() {
let cfg = SubscriptionConfig::default();
assert!(cfg.entity_types.is_empty());
assert_eq!(cfg.buffer_size, 256);
assert!(cfg.events.contains(&SubscriptionEvent::Insert));
assert!(cfg.events.contains(&SubscriptionEvent::Reorg));
assert!(cfg.from_block.is_none());
}
#[test]
fn test_subscription_config_serialization() {
let cfg = SubscriptionConfig {
entity_types: vec!["swap".to_string()],
events: vec![SubscriptionEvent::Insert, SubscriptionEvent::Delete],
from_block: Some(1_000_000),
buffer_size: 64,
};
let json = serde_json::to_value(&cfg).unwrap();
assert_eq!(json["entity_types"][0], "swap");
assert_eq!(json["from_block"], 1_000_000);
assert_eq!(json["events"][0], "insert");
assert_eq!(json["events"][1], "delete");
}
#[tokio::test]
async fn test_where_lte_filter() {
let executor = seeded_executor().await;
let resp = executor
.execute(r#"{ swaps(where: { amount0_lte: 2000 }) { id amount0 } }"#)
.await;
assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
assert_eq!(arr.len(), 2);
for row in &arr {
assert!(row["amount0"].as_u64().unwrap() <= 2000);
}
}
#[tokio::test]
async fn test_singular_without_id_returns_error() {
let executor = seeded_executor().await;
let resp = executor.execute("{ swap { id pool } }").await;
assert!(resp.is_error(), "expected error for singular without id");
}
}