Skip to main content

chainindex_core/
graphql.rs

1//! Lightweight GraphQL-like query layer for the chainindex entity store.
2//!
3//! This module implements a self-contained, zero-external-dependency GraphQL
4//! query engine that translates a simplified GraphQL query syntax into
5//! [`EntityQuery`] calls against any [`EntityStore`] backend.
6//!
7//! # Supported Syntax
8//!
9//! Single entity by id:
10//! ```text
11//! { swap(id: "0x123") { id pool amount0 amount1 } }
12//! ```
13//!
14//! Collection query with filters, ordering, and pagination:
15//! ```text
16//! {
17//!   swaps(
18//!     first: 10
19//!     skip: 0
20//!     where: { pool: "0xABC", amount0_gt: "1000" }
21//!     orderBy: "amount0"
22//!     orderDirection: "desc"
23//!   ) { id pool amount0 amount1 }
24//! }
25//! ```
26//!
27//! Introspection:
28//! ```text
29//! { __schema { types { name } } }
30//! ```
31//!
32//! # Response Format
33//!
34//! Success: `{ "data": { "<field>": ... } }`
35//! Error:   `{ "errors": [{ "message": "..." }] }`
36
37use std::collections::HashMap;
38use std::sync::{Arc, RwLock};
39
40use serde::{Deserialize, Serialize};
41use serde_json::Value as JsonValue;
42
43use crate::entity::{EntityQuery, EntitySchema, EntityStore, FieldType, QueryFilter, SortOrder};
44use crate::error::IndexerError;
45
46// ─── GraphQL Error ────────────────────────────────────────────────────────────
47
48/// A single GraphQL error entry, matching the spec shape.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct GraphqlError {
51    /// Human-readable error message.
52    pub message: String,
53}
54
55impl GraphqlError {
56    fn new(msg: impl Into<String>) -> Self {
57        Self {
58            message: msg.into(),
59        }
60    }
61}
62
63impl From<IndexerError> for GraphqlError {
64    fn from(e: IndexerError) -> Self {
65        Self::new(e.to_string())
66    }
67}
68
69// ─── GraphQL Response ─────────────────────────────────────────────────────────
70
71/// A complete GraphQL HTTP response body.
72///
73/// Either `data` or `errors` is populated — never both in normal operation.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct GraphqlResponse {
76    /// Query result data, keyed by selection field name.
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub data: Option<JsonValue>,
79
80    /// Error list if the request could not be fulfilled.
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub errors: Option<Vec<GraphqlError>>,
83}
84
85impl GraphqlResponse {
86    /// Construct a successful data response.
87    pub fn ok(data: JsonValue) -> Self {
88        Self {
89            data: Some(data),
90            errors: None,
91        }
92    }
93
94    /// Construct an error response with a single message.
95    pub fn err(msg: impl Into<String>) -> Self {
96        Self {
97            data: None,
98            errors: Some(vec![GraphqlError::new(msg)]),
99        }
100    }
101
102    /// Construct an error response from multiple errors.
103    pub fn errors(errors: Vec<GraphqlError>) -> Self {
104        Self {
105            data: None,
106            errors: Some(errors),
107        }
108    }
109
110    /// Returns `true` if this response contains errors.
111    pub fn is_error(&self) -> bool {
112        self.errors.is_some()
113    }
114}
115
116// ─── Subscription Config ──────────────────────────────────────────────────────
117
118/// Configuration for real-time entity change subscriptions.
119///
120/// Describes which entity types and events a subscriber is interested in.
121/// The actual WebSocket transport is not implemented here — this struct is
122/// carried as metadata for higher-level subscription routers.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SubscriptionConfig {
125    /// Entity types to subscribe to (empty = all types).
126    pub entity_types: Vec<String>,
127
128    /// Event kinds to receive.
129    pub events: Vec<SubscriptionEvent>,
130
131    /// Optional filter: only emit events at or above this block number.
132    pub from_block: Option<u64>,
133
134    /// Maximum number of buffered events before backpressure is applied.
135    pub buffer_size: usize,
136}
137
138/// The kinds of entity change events a subscription may emit.
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum SubscriptionEvent {
142    /// A new entity was inserted.
143    Insert,
144    /// An existing entity was updated (upsert).
145    Update,
146    /// An entity was deleted.
147    Delete,
148    /// Entities were rolled back due to a chain reorg.
149    Reorg,
150}
151
152impl Default for SubscriptionConfig {
153    fn default() -> Self {
154        Self {
155            entity_types: Vec::new(),
156            events: vec![
157                SubscriptionEvent::Insert,
158                SubscriptionEvent::Update,
159                SubscriptionEvent::Delete,
160                SubscriptionEvent::Reorg,
161            ],
162            from_block: None,
163            buffer_size: 256,
164        }
165    }
166}
167
168// ─── GraphQL Schema ───────────────────────────────────────────────────────────
169
170/// Auto-generated GraphQL SDL schema from registered [`EntitySchema`]s.
171///
172/// Call [`GraphqlSchema::add_entity`] for each entity type, then
173/// [`GraphqlSchema::sdl`] to retrieve the schema definition language string
174/// suitable for serving at `/__graphql/schema.graphql`.
175#[derive(Debug, Default, Clone)]
176pub struct GraphqlSchema {
177    entities: Vec<EntitySchema>,
178}
179
180impl GraphqlSchema {
181    /// Create an empty schema.
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    /// Register an entity schema.
187    pub fn add_entity(&mut self, schema: EntitySchema) {
188        self.entities.push(schema);
189    }
190
191    /// Return the GraphQL SDL string for all registered entity types.
192    ///
193    /// Generates:
194    /// - One GraphQL `type` per entity (with system fields + user fields).
195    /// - `{Entity}_filter` input types for `where` arguments.
196    /// - A root `Query` type with single-entity and collection fields.
197    pub fn sdl(&self) -> String {
198        let mut out = String::new();
199
200        // Built-in scalar for the 64-bit unsigned integers (not in GraphQL spec).
201        out.push_str("scalar BigInt\n\n");
202
203        // Generate one object type + one filter input per entity.
204        for schema in &self.entities {
205            out.push_str(&self.entity_type_sdl(schema));
206            out.push_str(&self.filter_input_sdl(schema));
207        }
208
209        // Ordering enum
210        out.push_str("enum OrderDirection {\n  asc\n  desc\n}\n\n");
211
212        // Root Query type.
213        out.push_str("type Query {\n");
214        for schema in &self.entities {
215            let type_name = pascal_case(&schema.name);
216            let singular = schema.name.clone();
217            let plural = format!("{}s", schema.name);
218            out.push_str(&format!("  {}(id: String!): {}\n", singular, type_name));
219            out.push_str(&format!(
220                "  {}(where: {}_filter, orderBy: String, orderDirection: OrderDirection, first: Int, skip: Int): [{}!]!\n",
221                plural, schema.name, type_name
222            ));
223        }
224        out.push_str("}\n");
225
226        out
227    }
228
229    fn entity_type_sdl(&self, schema: &EntitySchema) -> String {
230        let type_name = pascal_case(&schema.name);
231        let mut out = format!("type {} {{\n", type_name);
232        // System fields.
233        out.push_str("  id: String!\n");
234        out.push_str("  blockNumber: BigInt!\n");
235        out.push_str("  txHash: String!\n");
236        out.push_str("  logIndex: Int!\n");
237        // User fields.
238        for field in &schema.fields {
239            let gql_type = field_type_to_gql(&field.field_type, field.nullable);
240            out.push_str(&format!("  {}: {}\n", field.name, gql_type));
241        }
242        out.push_str("}\n\n");
243        out
244    }
245
246    fn filter_input_sdl(&self, schema: &EntitySchema) -> String {
247        let mut out = format!("input {}_filter {{\n", schema.name);
248        // Allow filtering on every user field with operator suffixes.
249        for field in &schema.fields {
250            let base = field_type_to_gql_scalar(&field.field_type);
251            out.push_str(&format!("  {}: {}\n", field.name, base));
252            out.push_str(&format!("  {}_gt: {}\n", field.name, base));
253            out.push_str(&format!("  {}_lt: {}\n", field.name, base));
254            out.push_str(&format!("  {}_gte: {}\n", field.name, base));
255            out.push_str(&format!("  {}_lte: {}\n", field.name, base));
256            out.push_str(&format!("  {}_in: [{}]\n", field.name, base));
257        }
258        out.push_str("}\n\n");
259        out
260    }
261}
262
263// ─── Query Parsing ────────────────────────────────────────────────────────────
264
265/// A parsed top-level GraphQL selection.
266#[derive(Debug, Clone)]
267struct ParsedSelection {
268    /// The root field name (e.g. `"swap"`, `"swaps"`, `"__schema"`).
269    field: String,
270
271    /// Arguments passed to the root field (key → raw string or quoted string).
272    args: HashMap<String, ArgValue>,
273
274    /// Sub-fields requested (e.g. `["id", "pool", "amount0"]`).
275    sub_fields: Vec<String>,
276}
277
278/// An argument value from the parsed query.
279#[derive(Debug, Clone)]
280enum ArgValue {
281    /// A string literal: `"0xABC"`.
282    Str(String),
283    /// A numeric literal: `10`.
284    Num(f64),
285    /// An object literal: `{ pool: "0xABC", amount0_gt: "1000" }`.
286    Obj(HashMap<String, ArgValue>),
287    /// An enum literal / bare identifier: `desc`.
288    Ident(String),
289}
290
291impl ArgValue {
292    fn as_str(&self) -> Option<&str> {
293        match self {
294            ArgValue::Str(s) => Some(s.as_str()),
295            ArgValue::Ident(s) => Some(s.as_str()),
296            _ => None,
297        }
298    }
299
300    fn as_usize(&self) -> Option<usize> {
301        match self {
302            ArgValue::Num(n) => Some(*n as usize),
303            _ => None,
304        }
305    }
306
307    fn as_obj(&self) -> Option<&HashMap<String, ArgValue>> {
308        match self {
309            ArgValue::Obj(m) => Some(m),
310            _ => None,
311        }
312    }
313}
314
315// ─── Minimal Parser ───────────────────────────────────────────────────────────
316
317/// Tokenizer / parser for the supported subset of GraphQL.
318struct Parser<'a> {
319    src: &'a [u8],
320    pos: usize,
321}
322
323impl<'a> Parser<'a> {
324    fn new(src: &'a str) -> Self {
325        Self {
326            src: src.as_bytes(),
327            pos: 0,
328        }
329    }
330
331    fn peek(&self) -> Option<u8> {
332        self.src.get(self.pos).copied()
333    }
334
335    fn consume(&mut self) -> Option<u8> {
336        let b = self.src.get(self.pos).copied();
337        if b.is_some() {
338            self.pos += 1;
339        }
340        b
341    }
342
343    fn skip_ws(&mut self) {
344        while let Some(b) = self.peek() {
345            if b == b'#' {
346                // Skip line comments.
347                while let Some(c) = self.consume() {
348                    if c == b'\n' {
349                        break;
350                    }
351                }
352            } else if b.is_ascii_whitespace() || b == b',' {
353                self.consume();
354            } else {
355                break;
356            }
357        }
358    }
359
360    fn expect(&mut self, ch: u8) -> Result<(), String> {
361        self.skip_ws();
362        match self.consume() {
363            Some(b) if b == ch => Ok(()),
364            Some(b) => Err(format!(
365                "expected '{}' but got '{}' at position {}",
366                ch as char, b as char, self.pos
367            )),
368            None => Err(format!(
369                "expected '{}' but reached end of input",
370                ch as char
371            )),
372        }
373    }
374
375    fn read_name(&mut self) -> Option<String> {
376        self.skip_ws();
377        let start = self.pos;
378        while let Some(b) = self.peek() {
379            if b.is_ascii_alphanumeric() || b == b'_' {
380                self.consume();
381            } else {
382                break;
383            }
384        }
385        if self.pos > start {
386            Some(String::from_utf8_lossy(&self.src[start..self.pos]).into_owned())
387        } else {
388            None
389        }
390    }
391
392    fn read_string(&mut self) -> Result<String, String> {
393        // Caller already consumed the opening `"`.
394        let mut s = String::new();
395        loop {
396            match self.consume() {
397                Some(b'"') => break,
398                Some(b'\\') => match self.consume() {
399                    Some(b'"') => s.push('"'),
400                    Some(b'\\') => s.push('\\'),
401                    Some(b'n') => s.push('\n'),
402                    Some(b't') => s.push('\t'),
403                    Some(c) => s.push(c as char),
404                    None => return Err("unterminated string escape".into()),
405                },
406                Some(c) => s.push(c as char),
407                None => return Err("unterminated string literal".into()),
408            }
409        }
410        Ok(s)
411    }
412
413    fn read_number(&mut self, first: u8) -> ArgValue {
414        let mut buf = String::new();
415        buf.push(first as char);
416        while let Some(b) = self.peek() {
417            if b.is_ascii_digit() || b == b'.' || b == b'-' || b == b'e' || b == b'E' {
418                buf.push(b as char);
419                self.consume();
420            } else {
421                break;
422            }
423        }
424        ArgValue::Num(buf.parse::<f64>().unwrap_or(0.0))
425    }
426
427    fn read_arg_value(&mut self) -> Result<ArgValue, String> {
428        self.skip_ws();
429        match self.peek() {
430            Some(b'"') => {
431                self.consume();
432                Ok(ArgValue::Str(self.read_string()?))
433            }
434            Some(b'{') => {
435                self.consume();
436                let obj = self.read_object()?;
437                Ok(ArgValue::Obj(obj))
438            }
439            Some(b) if b.is_ascii_digit() || b == b'-' => {
440                let first = self.consume().unwrap();
441                Ok(self.read_number(first))
442            }
443            Some(_) => {
444                // Bare identifier / enum value.
445                match self.read_name() {
446                    Some(name) => Ok(ArgValue::Ident(name)),
447                    None => Err(format!("unexpected character at pos {}", self.pos)),
448                }
449            }
450            None => Err("unexpected end of input in argument value".into()),
451        }
452    }
453
454    fn read_object(&mut self) -> Result<HashMap<String, ArgValue>, String> {
455        let mut map = HashMap::new();
456        loop {
457            self.skip_ws();
458            if self.peek() == Some(b'}') {
459                self.consume();
460                break;
461            }
462            let key = self.read_name().ok_or("expected object key")?;
463            self.skip_ws();
464            self.expect(b':')?;
465            let val = self.read_arg_value()?;
466            map.insert(key, val);
467        }
468        Ok(map)
469    }
470
471    fn read_args(&mut self) -> Result<HashMap<String, ArgValue>, String> {
472        // Opening `(` already consumed by caller.
473        let mut args = HashMap::new();
474        loop {
475            self.skip_ws();
476            if self.peek() == Some(b')') {
477                self.consume();
478                break;
479            }
480            let key = self.read_name().ok_or("expected argument name")?;
481            self.skip_ws();
482            self.expect(b':')?;
483            let val = self.read_arg_value()?;
484            args.insert(key, val);
485        }
486        Ok(args)
487    }
488
489    fn read_sub_fields(&mut self) -> Result<Vec<String>, String> {
490        // Opening `{` already consumed by caller.
491        let mut fields = Vec::new();
492        loop {
493            self.skip_ws();
494            if self.peek() == Some(b'}') {
495                self.consume();
496                break;
497            }
498            // Check for nested braces (e.g. __schema sub-selections) — skip them.
499            if self.peek() == Some(b'{') {
500                self.consume();
501                self.read_sub_fields()?; // recurse and discard
502                continue;
503            }
504            match self.read_name() {
505                Some(name) => {
506                    self.skip_ws();
507                    // If followed by `{`, skip nested block (introspection).
508                    if self.peek() == Some(b'{') {
509                        self.consume();
510                        self.read_sub_fields()?;
511                    }
512                    fields.push(name);
513                }
514                None => {
515                    return Err(format!("expected field name at pos {}", self.pos));
516                }
517            }
518        }
519        Ok(fields)
520    }
521
522    /// Parse the entire query, returning a list of top-level selections.
523    fn parse(&mut self) -> Result<Vec<ParsedSelection>, String> {
524        self.skip_ws();
525
526        // Optionally consume `query` / `mutation` keyword.
527        if let Some(b'q') | Some(b'm') | Some(b's') = self.peek() {
528            let kw = self.read_name().unwrap_or_default();
529            if kw != "query" && kw != "mutation" && kw != "subscription" {
530                // Not a keyword — it is a field name at root level without outer `{`.
531                // Put position back by re-parsing is non-trivial, so just treat as-is.
532                // This is a degenerate query; return empty.
533                return Err(format!("unexpected keyword '{kw}' at document start"));
534            }
535            // Skip optional operation name.
536            self.skip_ws();
537            if self
538                .peek()
539                .is_some_and(|b| b.is_ascii_alphabetic() || b == b'_')
540            {
541                self.read_name();
542            }
543        }
544
545        self.skip_ws();
546        self.expect(b'{')?;
547
548        let mut selections = Vec::new();
549        loop {
550            self.skip_ws();
551            if self.peek() == Some(b'}') {
552                self.consume();
553                break;
554            }
555            let field = self.read_name().ok_or("expected selection field name")?;
556            self.skip_ws();
557
558            let mut args = HashMap::new();
559            if self.peek() == Some(b'(') {
560                self.consume();
561                args = self.read_args()?;
562            }
563
564            self.skip_ws();
565            let mut sub_fields = Vec::new();
566            if self.peek() == Some(b'{') {
567                self.consume();
568                sub_fields = self.read_sub_fields()?;
569            }
570
571            selections.push(ParsedSelection {
572                field,
573                args,
574                sub_fields,
575            });
576        }
577
578        Ok(selections)
579    }
580}
581
582// ─── GraphQL Executor ─────────────────────────────────────────────────────────
583
584/// Executes simplified GraphQL queries against an [`EntityStore`].
585///
586/// # Usage
587///
588/// ```rust,no_run
589/// use std::sync::Arc;
590/// use chainindex_core::entity::MemoryEntityStore;
591/// use chainindex_core::graphql::GraphqlExecutor;
592///
593/// # async fn example() {
594/// let store = Arc::new(MemoryEntityStore::new());
595/// let executor = GraphqlExecutor::new(store);
596/// let resp = executor.execute(r#"{ transfers(first: 5) { id } }"#).await;
597/// # }
598/// ```
599pub struct GraphqlExecutor {
600    store: Arc<dyn EntityStore>,
601    schema: RwLock<GraphqlSchema>,
602}
603
604impl GraphqlExecutor {
605    /// Create a new executor backed by the given entity store.
606    pub fn new(store: Arc<dyn EntityStore>) -> Self {
607        Self {
608            store,
609            schema: RwLock::new(GraphqlSchema::new()),
610        }
611    }
612
613    /// Register an entity schema so the executor knows the field list.
614    pub fn register_schema(&self, entity_schema: EntitySchema) {
615        let mut schema = self.schema.write().expect("schema lock poisoned");
616        schema.add_entity(entity_schema);
617    }
618
619    /// Return the SDL for all registered entity types (introspection).
620    pub fn introspect(&self) -> String {
621        let schema = self.schema.read().expect("schema lock poisoned");
622        schema.sdl()
623    }
624
625    /// Execute a GraphQL query string and return a [`GraphqlResponse`].
626    pub async fn execute(&self, query: &str) -> GraphqlResponse {
627        let selections = match Parser::new(query).parse() {
628            Ok(s) => s,
629            Err(e) => return GraphqlResponse::err(format!("Parse error: {e}")),
630        };
631
632        let mut data_map = serde_json::Map::new();
633        let mut errors: Vec<GraphqlError> = Vec::new();
634
635        for sel in selections {
636            // Introspection shortcut.
637            if sel.field == "__schema" || sel.field == "__type" {
638                let sdl = self.introspect();
639                data_map.insert(sel.field.clone(), JsonValue::String(sdl));
640                continue;
641            }
642
643            match self.execute_selection(&sel).await {
644                Ok(value) => {
645                    data_map.insert(sel.field.clone(), value);
646                }
647                Err(e) => {
648                    errors.push(e);
649                }
650            }
651        }
652
653        if !errors.is_empty() {
654            return GraphqlResponse::errors(errors);
655        }
656
657        GraphqlResponse::ok(JsonValue::Object(data_map))
658    }
659
660    /// Execute a single top-level selection against the store.
661    async fn execute_selection(&self, sel: &ParsedSelection) -> Result<JsonValue, GraphqlError> {
662        let field = &sel.field;
663
664        // Determine whether this is a singular or plural field.
665        // Convention: if the field name matches a registered entity type exactly
666        // and has an `id` argument, treat as singular.  Otherwise, treat as collection.
667
668        let (entity_type, is_singular) = self.resolve_entity_type(field);
669
670        let entity_type = entity_type.ok_or_else(|| {
671            GraphqlError::new(format!("Unknown field '{}': no entity type found", field))
672        })?;
673
674        if is_singular {
675            self.execute_single(&entity_type, sel).await
676        } else {
677            self.execute_collection(&entity_type, sel).await
678        }
679    }
680
681    /// Resolve `field` to an `(entity_type, is_singular)` pair.
682    ///
683    /// Rules:
684    /// - If field equals a registered entity name → singular.
685    /// - If field equals `<entity_name>s` → collection.
686    /// - If field ends with `s` and `field[..len-1]` is registered → collection.
687    fn resolve_entity_type(&self, field: &str) -> (Option<String>, bool) {
688        let schema = self.schema.read().expect("schema lock poisoned");
689        // Exact match → singular.
690        if schema.entities.iter().any(|e| e.name == field) {
691            return (Some(field.to_string()), true);
692        }
693        // Plural match: field = entity_name + "s".
694        for entity in &schema.entities {
695            let plural = format!("{}s", entity.name);
696            if *field == plural {
697                return (Some(entity.name.clone()), false);
698            }
699        }
700        // No match.
701        (None, false)
702    }
703
704    /// Execute a singular `entity(id: "...")` query.
705    async fn execute_single(
706        &self,
707        entity_type: &str,
708        sel: &ParsedSelection,
709    ) -> Result<JsonValue, GraphqlError> {
710        let id = sel
711            .args
712            .get("id")
713            .and_then(|v| v.as_str())
714            .ok_or_else(|| GraphqlError::new("Singular query requires an 'id' argument"))?;
715
716        let _query = EntityQuery::new(entity_type)
717            .filter(QueryFilter::Eq(
718                "id".to_string(),
719                JsonValue::String(id.to_string()),
720            ))
721            .limit(1);
722
723        // MemoryEntityStore filters by entity_type automatically; we also need to
724        // match the system `id` field. The MemoryEntityStore uses (entity_type, id)
725        // as a composite key but the query API filters on `data`. We pass id as a
726        // special row field via data. However, looking at the implementation,
727        // `id` is a top-level field on EntityRow, not in `data`. We therefore
728        // query without the id filter in data and manually match below.
729        let query_no_id_filter = EntityQuery::new(entity_type);
730        let mut rows = self
731            .store
732            .query(query_no_id_filter)
733            .await
734            .map_err(GraphqlError::from)?;
735
736        rows.retain(|r| r.id == id);
737
738        if rows.is_empty() {
739            return Ok(JsonValue::Null);
740        }
741
742        let row = &rows[0];
743        Ok(self.project_row(row, &sel.sub_fields))
744    }
745
746    /// Execute a collection `entities(where, orderBy, ...)` query.
747    async fn execute_collection(
748        &self,
749        entity_type: &str,
750        sel: &ParsedSelection,
751    ) -> Result<JsonValue, GraphqlError> {
752        let first = sel.args.get("first").and_then(|v| v.as_usize());
753        let skip = sel.args.get("skip").and_then(|v| v.as_usize());
754        let order_by = sel
755            .args
756            .get("orderBy")
757            .and_then(|v| v.as_str())
758            .map(|s| s.to_string());
759        let order_direction = sel
760            .args
761            .get("orderDirection")
762            .and_then(|v| v.as_str())
763            .unwrap_or("asc")
764            .to_lowercase();
765
766        let sort_order = if order_direction == "desc" {
767            SortOrder::Desc
768        } else {
769            SortOrder::Asc
770        };
771
772        // Build filters from `where` argument.
773        let filters = if let Some(where_arg) = sel.args.get("where") {
774            let obj = where_arg
775                .as_obj()
776                .ok_or_else(|| GraphqlError::new("'where' argument must be an object"))?;
777            self.parse_where_filters(obj)?
778        } else {
779            Vec::new()
780        };
781
782        let mut q = EntityQuery::new(entity_type);
783        for f in filters {
784            q = q.filter(f);
785        }
786        if let Some(ob) = order_by {
787            q = q.order_by(ob, sort_order);
788        }
789        if let Some(n) = first {
790            q = q.limit(n);
791        }
792        if let Some(n) = skip {
793            q = q.offset(n);
794        }
795
796        let rows = self.store.query(q).await.map_err(GraphqlError::from)?;
797
798        let values: Vec<JsonValue> = rows
799            .iter()
800            .map(|row| self.project_row(row, &sel.sub_fields))
801            .collect();
802
803        Ok(JsonValue::Array(values))
804    }
805
806    /// Parse a `where` object into [`QueryFilter`]s.
807    ///
808    /// Supported key patterns:
809    /// - `field`        → Eq
810    /// - `field_gt`     → Gt
811    /// - `field_lt`     → Lt
812    /// - `field_gte`    → Gte
813    /// - `field_lte`    → Lte
814    /// - `field_in`     → In (value must be a JSON array string or ArgValue::Obj is ignored)
815    fn parse_where_filters(
816        &self,
817        obj: &HashMap<String, ArgValue>,
818    ) -> Result<Vec<QueryFilter>, GraphqlError> {
819        let mut filters = Vec::new();
820
821        for (key, val) in obj {
822            let json_val = arg_to_json(val);
823
824            if let Some(field) = key.strip_suffix("_gt") {
825                filters.push(QueryFilter::Gt(field.to_string(), json_val));
826            } else if let Some(field) = key.strip_suffix("_lt") {
827                filters.push(QueryFilter::Lt(field.to_string(), json_val));
828            } else if let Some(field) = key.strip_suffix("_gte") {
829                filters.push(QueryFilter::Gte(field.to_string(), json_val));
830            } else if let Some(field) = key.strip_suffix("_lte") {
831                filters.push(QueryFilter::Lte(field.to_string(), json_val));
832            } else if let Some(field) = key.strip_suffix("_in") {
833                // Expect an array encoded as a JSON string or a direct ArgValue array.
834                // We encode it as a JSON array in json_val.
835                let items = match json_val {
836                    JsonValue::Array(arr) => arr,
837                    JsonValue::String(s) => {
838                        // Try to parse it as JSON array.
839                        serde_json::from_str::<Vec<JsonValue>>(&s)
840                            .unwrap_or_else(|_| vec![JsonValue::String(s)])
841                    }
842                    other => vec![other],
843                };
844                filters.push(QueryFilter::In(field.to_string(), items));
845            } else {
846                // Plain equality.
847                filters.push(QueryFilter::Eq(key.clone(), json_val));
848            }
849        }
850
851        Ok(filters)
852    }
853
854    /// Project an entity row into a JSON object containing only requested fields.
855    ///
856    /// If `sub_fields` is empty all fields are included.
857    fn project_row(&self, row: &crate::entity::EntityRow, sub_fields: &[String]) -> JsonValue {
858        let mut obj = serde_json::Map::new();
859
860        let include_all = sub_fields.is_empty();
861
862        let want = |name: &str| -> bool { include_all || sub_fields.iter().any(|f| f == name) };
863
864        // System fields.
865        if want("id") {
866            obj.insert("id".to_string(), JsonValue::String(row.id.clone()));
867        }
868        if want("blockNumber") {
869            obj.insert(
870                "blockNumber".to_string(),
871                JsonValue::Number(row.block_number.into()),
872            );
873        }
874        if want("txHash") {
875            obj.insert("txHash".to_string(), JsonValue::String(row.tx_hash.clone()));
876        }
877        if want("logIndex") {
878            obj.insert(
879                "logIndex".to_string(),
880                JsonValue::Number(row.log_index.into()),
881            );
882        }
883
884        // User data fields.
885        for (k, v) in &row.data {
886            if want(k) {
887                obj.insert(k.clone(), v.clone());
888            }
889        }
890
891        JsonValue::Object(obj)
892    }
893}
894
895// ─── Helpers ──────────────────────────────────────────────────────────────────
896
897/// Convert an [`ArgValue`] to a [`serde_json::Value`].
898fn arg_to_json(val: &ArgValue) -> JsonValue {
899    match val {
900        ArgValue::Str(s) => JsonValue::String(s.clone()),
901        ArgValue::Num(n) => {
902            // Prefer integer representation when the number is whole.
903            if n.fract() == 0.0 && *n >= 0.0 && *n <= u64::MAX as f64 {
904                JsonValue::Number((*n as u64).into())
905            } else if n.fract() == 0.0 && *n < 0.0 && *n >= i64::MIN as f64 {
906                JsonValue::Number((*n as i64).into())
907            } else {
908                serde_json::Number::from_f64(*n)
909                    .map(JsonValue::Number)
910                    .unwrap_or(JsonValue::Null)
911            }
912        }
913        ArgValue::Ident(s) => {
914            // Common boolean literals.
915            match s.as_str() {
916                "true" => JsonValue::Bool(true),
917                "false" => JsonValue::Bool(false),
918                "null" => JsonValue::Null,
919                _ => JsonValue::String(s.clone()),
920            }
921        }
922        ArgValue::Obj(map) => {
923            let mut obj = serde_json::Map::new();
924            for (k, v) in map {
925                obj.insert(k.clone(), arg_to_json(v));
926            }
927            JsonValue::Object(obj)
928        }
929    }
930}
931
932/// Convert a `snake_case` entity name to `PascalCase` GraphQL type name.
933fn pascal_case(s: &str) -> String {
934    s.split('_')
935        .map(|part| {
936            let mut c = part.chars();
937            match c.next() {
938                None => String::new(),
939                Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
940            }
941        })
942        .collect()
943}
944
945/// Map a [`FieldType`] to a GraphQL type string, respecting nullability.
946fn field_type_to_gql(ft: &FieldType, nullable: bool) -> String {
947    let base = field_type_to_gql_scalar(ft);
948    if nullable {
949        base.to_string()
950    } else {
951        format!("{}!", base)
952    }
953}
954
955/// Map a [`FieldType`] to a GraphQL scalar name (no nullability suffix).
956fn field_type_to_gql_scalar(ft: &FieldType) -> &'static str {
957    match ft {
958        FieldType::String => "String",
959        FieldType::Int64 => "BigInt",
960        FieldType::Uint64 => "BigInt",
961        FieldType::Float64 => "Float",
962        FieldType::Bool => "Boolean",
963        FieldType::Json => "String",
964        FieldType::Bytes => "String",
965    }
966}
967
968// ─── Tests ────────────────────────────────────────────────────────────────────
969
970#[cfg(test)]
971mod tests {
972    use std::collections::HashMap;
973    use std::sync::Arc;
974
975    use super::*;
976    use crate::entity::{
977        EntityRow, EntitySchemaBuilder, EntityStore, FieldType, MemoryEntityStore,
978    };
979
980    // ── Helpers ───────────────────────────────────────────────────────────────
981
982    fn swap_schema() -> EntitySchema {
983        EntitySchemaBuilder::new("swap")
984            .primary_key("id")
985            .field("pool", FieldType::String, true)
986            .field("amount0", FieldType::Uint64, false)
987            .field("amount1", FieldType::Uint64, false)
988            .nullable_field("trader", FieldType::String, false)
989            .build()
990    }
991
992    fn transfer_schema() -> EntitySchema {
993        EntitySchemaBuilder::new("transfer")
994            .primary_key("id")
995            .field("from", FieldType::String, true)
996            .field("to", FieldType::String, true)
997            .field("value", FieldType::Uint64, false)
998            .build()
999    }
1000
1001    fn make_swap(id: &str, pool: &str, amount0: u64, amount1: u64, block: u64) -> EntityRow {
1002        let mut data = HashMap::new();
1003        data.insert("pool".to_string(), serde_json::json!(pool));
1004        data.insert("amount0".to_string(), serde_json::json!(amount0));
1005        data.insert("amount1".to_string(), serde_json::json!(amount1));
1006        EntityRow {
1007            id: id.to_string(),
1008            entity_type: "swap".to_string(),
1009            block_number: block,
1010            tx_hash: format!("0xtx_{id}"),
1011            log_index: 0,
1012            data,
1013        }
1014    }
1015
1016    fn make_transfer(id: &str, from: &str, to: &str, value: u64, block: u64) -> EntityRow {
1017        let mut data = HashMap::new();
1018        data.insert("from".to_string(), serde_json::json!(from));
1019        data.insert("to".to_string(), serde_json::json!(to));
1020        data.insert("value".to_string(), serde_json::json!(value));
1021        EntityRow {
1022            id: id.to_string(),
1023            entity_type: "transfer".to_string(),
1024            block_number: block,
1025            tx_hash: format!("0xtx_{id}"),
1026            log_index: 0,
1027            data,
1028        }
1029    }
1030
1031    async fn seeded_executor() -> GraphqlExecutor {
1032        let store = Arc::new(MemoryEntityStore::new());
1033        store.register_schema(&swap_schema()).await.unwrap();
1034        store.register_schema(&transfer_schema()).await.unwrap();
1035
1036        store
1037            .upsert(make_swap("s1", "0xPOOL_A", 1000, 500, 10))
1038            .await
1039            .unwrap();
1040        store
1041            .upsert(make_swap("s2", "0xPOOL_A", 2000, 1000, 11))
1042            .await
1043            .unwrap();
1044        store
1045            .upsert(make_swap("s3", "0xPOOL_B", 3000, 1500, 12))
1046            .await
1047            .unwrap();
1048
1049        store
1050            .upsert(make_transfer("t1", "0xAlice", "0xBob", 100, 10))
1051            .await
1052            .unwrap();
1053        store
1054            .upsert(make_transfer("t2", "0xBob", "0xCharlie", 200, 11))
1055            .await
1056            .unwrap();
1057
1058        let executor = GraphqlExecutor::new(store);
1059        executor.register_schema(swap_schema());
1060        executor.register_schema(transfer_schema());
1061        executor
1062    }
1063
1064    // ── Test 1: SDL schema generation ─────────────────────────────────────────
1065
1066    #[test]
1067    fn test_schema_generation_contains_type() {
1068        let mut gql_schema = GraphqlSchema::new();
1069        gql_schema.add_entity(swap_schema());
1070        let sdl = gql_schema.sdl();
1071
1072        assert!(sdl.contains("type Swap {"), "SDL missing Swap type:\n{sdl}");
1073        assert!(
1074            sdl.contains("pool: String!"),
1075            "SDL missing pool field:\n{sdl}"
1076        );
1077        assert!(
1078            sdl.contains("amount0: BigInt!"),
1079            "SDL missing amount0 field:\n{sdl}"
1080        );
1081        assert!(
1082            sdl.contains("trader: String"),
1083            "SDL missing nullable trader field:\n{sdl}"
1084        );
1085    }
1086
1087    // ── Test 2: SDL contains filter input ─────────────────────────────────────
1088
1089    #[test]
1090    fn test_schema_generation_filter_input() {
1091        let mut gql_schema = GraphqlSchema::new();
1092        gql_schema.add_entity(swap_schema());
1093        let sdl = gql_schema.sdl();
1094
1095        assert!(
1096            sdl.contains("input swap_filter {"),
1097            "SDL missing swap_filter input:\n{sdl}"
1098        );
1099        assert!(
1100            sdl.contains("amount0_gt:"),
1101            "SDL missing amount0_gt in filter:\n{sdl}"
1102        );
1103        assert!(
1104            sdl.contains("pool_in:"),
1105            "SDL missing pool_in in filter:\n{sdl}"
1106        );
1107    }
1108
1109    // ── Test 3: SDL contains Query type with singular and plural fields ────────
1110
1111    #[test]
1112    fn test_schema_generation_query_type() {
1113        let mut gql_schema = GraphqlSchema::new();
1114        gql_schema.add_entity(swap_schema());
1115        let sdl = gql_schema.sdl();
1116
1117        assert!(
1118            sdl.contains("type Query {"),
1119            "SDL missing Query type:\n{sdl}"
1120        );
1121        assert!(
1122            sdl.contains("swap(id: String!): Swap"),
1123            "SDL missing singular swap:\n{sdl}"
1124        );
1125        assert!(sdl.contains("swaps("), "SDL missing plural swaps:\n{sdl}");
1126    }
1127
1128    // ── Test 4: SDL pascal_case helper ────────────────────────────────────────
1129
1130    #[test]
1131    fn test_pascal_case_conversion() {
1132        assert_eq!(pascal_case("swap"), "Swap");
1133        assert_eq!(pascal_case("erc20_transfer"), "Erc20Transfer");
1134        assert_eq!(pascal_case("uniswap_v3_pool"), "UniswapV3Pool");
1135    }
1136
1137    // ── Test 5: Introspection returns SDL ─────────────────────────────────────
1138
1139    #[tokio::test]
1140    async fn test_introspection() {
1141        let executor = seeded_executor().await;
1142        let sdl = executor.introspect();
1143        assert!(
1144            sdl.contains("type Swap {"),
1145            "introspect missing Swap type:\n{sdl}"
1146        );
1147        assert!(
1148            sdl.contains("type Transfer {"),
1149            "introspect missing Transfer type:\n{sdl}"
1150        );
1151    }
1152
1153    // ── Test 6: Introspection via __schema query ───────────────────────────────
1154
1155    #[tokio::test]
1156    async fn test_introspection_query() {
1157        let executor = seeded_executor().await;
1158        let resp = executor.execute("{ __schema { types { name } } }").await;
1159        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1160        let data = resp.data.unwrap();
1161        let sdl = data["__schema"].as_str().unwrap();
1162        assert!(sdl.contains("type Swap {"));
1163    }
1164
1165    // ── Test 7: Collection query returns all entities ─────────────────────────
1166
1167    #[tokio::test]
1168    async fn test_collection_query_all() {
1169        let executor = seeded_executor().await;
1170        let resp = executor
1171            .execute("{ swaps { id pool amount0 amount1 } }")
1172            .await;
1173        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1174        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1175        assert_eq!(arr.len(), 3);
1176    }
1177
1178    // ── Test 8: Singular query by id ──────────────────────────────────────────
1179
1180    #[tokio::test]
1181    async fn test_singular_query_by_id() {
1182        let executor = seeded_executor().await;
1183        let resp = executor
1184            .execute(r#"{ swap(id: "s2") { id pool amount0 } }"#)
1185            .await;
1186        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1187        let row = &resp.data.unwrap()["swap"];
1188        assert_eq!(row["id"], "s2");
1189        assert_eq!(row["pool"], "0xPOOL_A");
1190        assert_eq!(row["amount0"], 2000);
1191    }
1192
1193    // ── Test 9: Singular query for unknown id returns null ────────────────────
1194
1195    #[tokio::test]
1196    async fn test_singular_query_missing_id() {
1197        let executor = seeded_executor().await;
1198        let resp = executor
1199            .execute(r#"{ swap(id: "nonexistent") { id } }"#)
1200            .await;
1201        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1202        assert_eq!(resp.data.unwrap()["swap"], JsonValue::Null);
1203    }
1204
1205    // ── Test 10: Collection query with where filter ────────────────────────────
1206
1207    #[tokio::test]
1208    async fn test_collection_with_where_filter() {
1209        let executor = seeded_executor().await;
1210        let resp = executor
1211            .execute(r#"{ swaps(where: { pool: "0xPOOL_A" }) { id pool } }"#)
1212            .await;
1213        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1214        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1215        assert_eq!(arr.len(), 2);
1216        for row in &arr {
1217            assert_eq!(row["pool"], "0xPOOL_A");
1218        }
1219    }
1220
1221    // ── Test 11: Collection query with first/skip pagination ──────────────────
1222
1223    #[tokio::test]
1224    async fn test_collection_pagination() {
1225        let executor = seeded_executor().await;
1226        // Sort by amount0 ascending, skip 1, take 1 → should be s2 (2000).
1227        let resp = executor
1228            .execute(
1229                r#"{ swaps(first: 1, skip: 1, orderBy: "amount0", orderDirection: "asc") { id amount0 } }"#,
1230            )
1231            .await;
1232        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1233        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1234        assert_eq!(arr.len(), 1);
1235        assert_eq!(arr[0]["amount0"], 2000);
1236    }
1237
1238    // ── Test 12: Collection query with orderBy desc ────────────────────────────
1239
1240    #[tokio::test]
1241    async fn test_collection_order_desc() {
1242        let executor = seeded_executor().await;
1243        let resp = executor
1244            .execute(r#"{ swaps(orderBy: "amount0", orderDirection: "desc") { id amount0 } }"#)
1245            .await;
1246        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1247        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1248        assert_eq!(arr.len(), 3);
1249        let first_amount = arr[0]["amount0"].as_u64().unwrap();
1250        let last_amount = arr[2]["amount0"].as_u64().unwrap();
1251        assert!(first_amount > last_amount, "expected descending order");
1252    }
1253
1254    // ── Test 13: Unknown entity type returns error ─────────────────────────────
1255
1256    #[tokio::test]
1257    async fn test_unknown_entity_returns_error() {
1258        let executor = seeded_executor().await;
1259        let resp = executor.execute("{ unknownEntity { id } }").await;
1260        assert!(resp.is_error(), "expected an error for unknown entity");
1261        let errs = resp.errors.unwrap();
1262        assert!(
1263            errs[0].message.contains("Unknown field"),
1264            "wrong error message: {}",
1265            errs[0].message
1266        );
1267    }
1268
1269    // ── Test 14: Collection gt filter ─────────────────────────────────────────
1270
1271    #[tokio::test]
1272    async fn test_where_gt_filter() {
1273        let executor = seeded_executor().await;
1274        let resp = executor
1275            .execute(r#"{ swaps(where: { amount0_gt: 1000 }) { id amount0 } }"#)
1276            .await;
1277        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1278        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1279        // s2 (2000) and s3 (3000) are > 1000.
1280        assert_eq!(arr.len(), 2);
1281        for row in &arr {
1282            assert!(row["amount0"].as_u64().unwrap() > 1000);
1283        }
1284    }
1285
1286    // ── Test 15: Response formatting — ok ─────────────────────────────────────
1287
1288    #[test]
1289    fn test_response_ok_format() {
1290        let resp = GraphqlResponse::ok(serde_json::json!({ "swap": { "id": "s1" } }));
1291        assert!(!resp.is_error());
1292        let json = serde_json::to_value(&resp).unwrap();
1293        assert!(json.get("data").is_some());
1294        assert!(json.get("errors").is_none());
1295        assert_eq!(json["data"]["swap"]["id"], "s1");
1296    }
1297
1298    // ── Test 16: Response formatting — error ──────────────────────────────────
1299
1300    #[test]
1301    fn test_response_error_format() {
1302        let resp = GraphqlResponse::err("something went wrong");
1303        assert!(resp.is_error());
1304        let json = serde_json::to_value(&resp).unwrap();
1305        assert!(json.get("errors").is_some());
1306        assert!(json.get("data").is_none());
1307        assert_eq!(json["errors"][0]["message"], "something went wrong");
1308    }
1309
1310    // ── Test 17: Field projection ─────────────────────────────────────────────
1311
1312    #[tokio::test]
1313    async fn test_field_projection() {
1314        let executor = seeded_executor().await;
1315        // Only request `id` and `pool` — amount0/amount1 should not appear.
1316        let resp = executor
1317            .execute(
1318                r#"{ swaps(first: 1, orderBy: "amount0", orderDirection: "asc") { id pool } }"#,
1319            )
1320            .await;
1321        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1322        let row = &resp.data.unwrap()["swaps"][0];
1323        assert!(row.get("id").is_some());
1324        assert!(row.get("pool").is_some());
1325        assert!(
1326            row.get("amount0").is_none(),
1327            "amount0 should be projected out"
1328        );
1329    }
1330
1331    // ── Test 18: Multiple entity types in one query ───────────────────────────
1332
1333    #[tokio::test]
1334    async fn test_multi_entity_query() {
1335        let executor = seeded_executor().await;
1336        let resp = executor.execute("{ swaps { id } transfers { id } }").await;
1337        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1338        let data = resp.data.unwrap();
1339        assert_eq!(data["swaps"].as_array().unwrap().len(), 3);
1340        assert_eq!(data["transfers"].as_array().unwrap().len(), 2);
1341    }
1342
1343    // ── Test 19: Parse error returns error response ───────────────────────────
1344
1345    #[tokio::test]
1346    async fn test_parse_error() {
1347        let executor = seeded_executor().await;
1348        let resp = executor.execute("{ unclosed { id ").await;
1349        assert!(resp.is_error(), "expected parse error");
1350    }
1351
1352    // ── Test 20: SubscriptionConfig defaults ─────────────────────────────────
1353
1354    #[test]
1355    fn test_subscription_config_default() {
1356        let cfg = SubscriptionConfig::default();
1357        assert!(cfg.entity_types.is_empty());
1358        assert_eq!(cfg.buffer_size, 256);
1359        assert!(cfg.events.contains(&SubscriptionEvent::Insert));
1360        assert!(cfg.events.contains(&SubscriptionEvent::Reorg));
1361        assert!(cfg.from_block.is_none());
1362    }
1363
1364    // ── Test 21: SubscriptionConfig serializes correctly ─────────────────────
1365
1366    #[test]
1367    fn test_subscription_config_serialization() {
1368        let cfg = SubscriptionConfig {
1369            entity_types: vec!["swap".to_string()],
1370            events: vec![SubscriptionEvent::Insert, SubscriptionEvent::Delete],
1371            from_block: Some(1_000_000),
1372            buffer_size: 64,
1373        };
1374        let json = serde_json::to_value(&cfg).unwrap();
1375        assert_eq!(json["entity_types"][0], "swap");
1376        assert_eq!(json["from_block"], 1_000_000);
1377        assert_eq!(json["events"][0], "insert");
1378        assert_eq!(json["events"][1], "delete");
1379    }
1380
1381    // ── Test 22: Collection with lte filter ───────────────────────────────────
1382
1383    #[tokio::test]
1384    async fn test_where_lte_filter() {
1385        let executor = seeded_executor().await;
1386        let resp = executor
1387            .execute(r#"{ swaps(where: { amount0_lte: 2000 }) { id amount0 } }"#)
1388            .await;
1389        assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1390        let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1391        // s1 (1000) and s2 (2000) are <= 2000.
1392        assert_eq!(arr.len(), 2);
1393        for row in &arr {
1394            assert!(row["amount0"].as_u64().unwrap() <= 2000);
1395        }
1396    }
1397
1398    // ── Test 23: Singular query requires id argument ──────────────────────────
1399
1400    #[tokio::test]
1401    async fn test_singular_without_id_returns_error() {
1402        let executor = seeded_executor().await;
1403        // Singular field name without id argument.
1404        let resp = executor.execute("{ swap { id pool } }").await;
1405        // Without an `id` argument, execute_single should error.
1406        assert!(resp.is_error(), "expected error for singular without id");
1407    }
1408}