1use std::collections::HashMap;
38use std::sync::{Arc, RwLock};
39
40use serde::{Deserialize, Serialize};
41use serde_json::Value as JsonValue;
42
43use crate::entity::{EntityQuery, EntitySchema, EntityStore, FieldType, QueryFilter, SortOrder};
44use crate::error::IndexerError;
45
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub struct GraphqlError {
51 pub message: String,
53}
54
55impl GraphqlError {
56 fn new(msg: impl Into<String>) -> Self {
57 Self {
58 message: msg.into(),
59 }
60 }
61}
62
63impl From<IndexerError> for GraphqlError {
64 fn from(e: IndexerError) -> Self {
65 Self::new(e.to_string())
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct GraphqlResponse {
76 #[serde(skip_serializing_if = "Option::is_none")]
78 pub data: Option<JsonValue>,
79
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub errors: Option<Vec<GraphqlError>>,
83}
84
85impl GraphqlResponse {
86 pub fn ok(data: JsonValue) -> Self {
88 Self {
89 data: Some(data),
90 errors: None,
91 }
92 }
93
94 pub fn err(msg: impl Into<String>) -> Self {
96 Self {
97 data: None,
98 errors: Some(vec![GraphqlError::new(msg)]),
99 }
100 }
101
102 pub fn errors(errors: Vec<GraphqlError>) -> Self {
104 Self {
105 data: None,
106 errors: Some(errors),
107 }
108 }
109
110 pub fn is_error(&self) -> bool {
112 self.errors.is_some()
113 }
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SubscriptionConfig {
125 pub entity_types: Vec<String>,
127
128 pub events: Vec<SubscriptionEvent>,
130
131 pub from_block: Option<u64>,
133
134 pub buffer_size: usize,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum SubscriptionEvent {
142 Insert,
144 Update,
146 Delete,
148 Reorg,
150}
151
152impl Default for SubscriptionConfig {
153 fn default() -> Self {
154 Self {
155 entity_types: Vec::new(),
156 events: vec![
157 SubscriptionEvent::Insert,
158 SubscriptionEvent::Update,
159 SubscriptionEvent::Delete,
160 SubscriptionEvent::Reorg,
161 ],
162 from_block: None,
163 buffer_size: 256,
164 }
165 }
166}
167
168#[derive(Debug, Default, Clone)]
176pub struct GraphqlSchema {
177 entities: Vec<EntitySchema>,
178}
179
180impl GraphqlSchema {
181 pub fn new() -> Self {
183 Self::default()
184 }
185
186 pub fn add_entity(&mut self, schema: EntitySchema) {
188 self.entities.push(schema);
189 }
190
191 pub fn sdl(&self) -> String {
198 let mut out = String::new();
199
200 out.push_str("scalar BigInt\n\n");
202
203 for schema in &self.entities {
205 out.push_str(&self.entity_type_sdl(schema));
206 out.push_str(&self.filter_input_sdl(schema));
207 }
208
209 out.push_str("enum OrderDirection {\n asc\n desc\n}\n\n");
211
212 out.push_str("type Query {\n");
214 for schema in &self.entities {
215 let type_name = pascal_case(&schema.name);
216 let singular = schema.name.clone();
217 let plural = format!("{}s", schema.name);
218 out.push_str(&format!(" {}(id: String!): {}\n", singular, type_name));
219 out.push_str(&format!(
220 " {}(where: {}_filter, orderBy: String, orderDirection: OrderDirection, first: Int, skip: Int): [{}!]!\n",
221 plural, schema.name, type_name
222 ));
223 }
224 out.push_str("}\n");
225
226 out
227 }
228
229 fn entity_type_sdl(&self, schema: &EntitySchema) -> String {
230 let type_name = pascal_case(&schema.name);
231 let mut out = format!("type {} {{\n", type_name);
232 out.push_str(" id: String!\n");
234 out.push_str(" blockNumber: BigInt!\n");
235 out.push_str(" txHash: String!\n");
236 out.push_str(" logIndex: Int!\n");
237 for field in &schema.fields {
239 let gql_type = field_type_to_gql(&field.field_type, field.nullable);
240 out.push_str(&format!(" {}: {}\n", field.name, gql_type));
241 }
242 out.push_str("}\n\n");
243 out
244 }
245
246 fn filter_input_sdl(&self, schema: &EntitySchema) -> String {
247 let mut out = format!("input {}_filter {{\n", schema.name);
248 for field in &schema.fields {
250 let base = field_type_to_gql_scalar(&field.field_type);
251 out.push_str(&format!(" {}: {}\n", field.name, base));
252 out.push_str(&format!(" {}_gt: {}\n", field.name, base));
253 out.push_str(&format!(" {}_lt: {}\n", field.name, base));
254 out.push_str(&format!(" {}_gte: {}\n", field.name, base));
255 out.push_str(&format!(" {}_lte: {}\n", field.name, base));
256 out.push_str(&format!(" {}_in: [{}]\n", field.name, base));
257 }
258 out.push_str("}\n\n");
259 out
260 }
261}
262
263#[derive(Debug, Clone)]
267struct ParsedSelection {
268 field: String,
270
271 args: HashMap<String, ArgValue>,
273
274 sub_fields: Vec<String>,
276}
277
278#[derive(Debug, Clone)]
280enum ArgValue {
281 Str(String),
283 Num(f64),
285 Obj(HashMap<String, ArgValue>),
287 Ident(String),
289}
290
291impl ArgValue {
292 fn as_str(&self) -> Option<&str> {
293 match self {
294 ArgValue::Str(s) => Some(s.as_str()),
295 ArgValue::Ident(s) => Some(s.as_str()),
296 _ => None,
297 }
298 }
299
300 fn as_usize(&self) -> Option<usize> {
301 match self {
302 ArgValue::Num(n) => Some(*n as usize),
303 _ => None,
304 }
305 }
306
307 fn as_obj(&self) -> Option<&HashMap<String, ArgValue>> {
308 match self {
309 ArgValue::Obj(m) => Some(m),
310 _ => None,
311 }
312 }
313}
314
315struct Parser<'a> {
319 src: &'a [u8],
320 pos: usize,
321}
322
323impl<'a> Parser<'a> {
324 fn new(src: &'a str) -> Self {
325 Self {
326 src: src.as_bytes(),
327 pos: 0,
328 }
329 }
330
331 fn peek(&self) -> Option<u8> {
332 self.src.get(self.pos).copied()
333 }
334
335 fn consume(&mut self) -> Option<u8> {
336 let b = self.src.get(self.pos).copied();
337 if b.is_some() {
338 self.pos += 1;
339 }
340 b
341 }
342
343 fn skip_ws(&mut self) {
344 while let Some(b) = self.peek() {
345 if b == b'#' {
346 while let Some(c) = self.consume() {
348 if c == b'\n' {
349 break;
350 }
351 }
352 } else if b.is_ascii_whitespace() || b == b',' {
353 self.consume();
354 } else {
355 break;
356 }
357 }
358 }
359
360 fn expect(&mut self, ch: u8) -> Result<(), String> {
361 self.skip_ws();
362 match self.consume() {
363 Some(b) if b == ch => Ok(()),
364 Some(b) => Err(format!(
365 "expected '{}' but got '{}' at position {}",
366 ch as char, b as char, self.pos
367 )),
368 None => Err(format!(
369 "expected '{}' but reached end of input",
370 ch as char
371 )),
372 }
373 }
374
375 fn read_name(&mut self) -> Option<String> {
376 self.skip_ws();
377 let start = self.pos;
378 while let Some(b) = self.peek() {
379 if b.is_ascii_alphanumeric() || b == b'_' {
380 self.consume();
381 } else {
382 break;
383 }
384 }
385 if self.pos > start {
386 Some(String::from_utf8_lossy(&self.src[start..self.pos]).into_owned())
387 } else {
388 None
389 }
390 }
391
392 fn read_string(&mut self) -> Result<String, String> {
393 let mut s = String::new();
395 loop {
396 match self.consume() {
397 Some(b'"') => break,
398 Some(b'\\') => match self.consume() {
399 Some(b'"') => s.push('"'),
400 Some(b'\\') => s.push('\\'),
401 Some(b'n') => s.push('\n'),
402 Some(b't') => s.push('\t'),
403 Some(c) => s.push(c as char),
404 None => return Err("unterminated string escape".into()),
405 },
406 Some(c) => s.push(c as char),
407 None => return Err("unterminated string literal".into()),
408 }
409 }
410 Ok(s)
411 }
412
413 fn read_number(&mut self, first: u8) -> ArgValue {
414 let mut buf = String::new();
415 buf.push(first as char);
416 while let Some(b) = self.peek() {
417 if b.is_ascii_digit() || b == b'.' || b == b'-' || b == b'e' || b == b'E' {
418 buf.push(b as char);
419 self.consume();
420 } else {
421 break;
422 }
423 }
424 ArgValue::Num(buf.parse::<f64>().unwrap_or(0.0))
425 }
426
427 fn read_arg_value(&mut self) -> Result<ArgValue, String> {
428 self.skip_ws();
429 match self.peek() {
430 Some(b'"') => {
431 self.consume();
432 Ok(ArgValue::Str(self.read_string()?))
433 }
434 Some(b'{') => {
435 self.consume();
436 let obj = self.read_object()?;
437 Ok(ArgValue::Obj(obj))
438 }
439 Some(b) if b.is_ascii_digit() || b == b'-' => {
440 let first = self.consume().unwrap();
441 Ok(self.read_number(first))
442 }
443 Some(_) => {
444 match self.read_name() {
446 Some(name) => Ok(ArgValue::Ident(name)),
447 None => Err(format!("unexpected character at pos {}", self.pos)),
448 }
449 }
450 None => Err("unexpected end of input in argument value".into()),
451 }
452 }
453
454 fn read_object(&mut self) -> Result<HashMap<String, ArgValue>, String> {
455 let mut map = HashMap::new();
456 loop {
457 self.skip_ws();
458 if self.peek() == Some(b'}') {
459 self.consume();
460 break;
461 }
462 let key = self.read_name().ok_or("expected object key")?;
463 self.skip_ws();
464 self.expect(b':')?;
465 let val = self.read_arg_value()?;
466 map.insert(key, val);
467 }
468 Ok(map)
469 }
470
471 fn read_args(&mut self) -> Result<HashMap<String, ArgValue>, String> {
472 let mut args = HashMap::new();
474 loop {
475 self.skip_ws();
476 if self.peek() == Some(b')') {
477 self.consume();
478 break;
479 }
480 let key = self.read_name().ok_or("expected argument name")?;
481 self.skip_ws();
482 self.expect(b':')?;
483 let val = self.read_arg_value()?;
484 args.insert(key, val);
485 }
486 Ok(args)
487 }
488
489 fn read_sub_fields(&mut self) -> Result<Vec<String>, String> {
490 let mut fields = Vec::new();
492 loop {
493 self.skip_ws();
494 if self.peek() == Some(b'}') {
495 self.consume();
496 break;
497 }
498 if self.peek() == Some(b'{') {
500 self.consume();
501 self.read_sub_fields()?; continue;
503 }
504 match self.read_name() {
505 Some(name) => {
506 self.skip_ws();
507 if self.peek() == Some(b'{') {
509 self.consume();
510 self.read_sub_fields()?;
511 }
512 fields.push(name);
513 }
514 None => {
515 return Err(format!("expected field name at pos {}", self.pos));
516 }
517 }
518 }
519 Ok(fields)
520 }
521
522 fn parse(&mut self) -> Result<Vec<ParsedSelection>, String> {
524 self.skip_ws();
525
526 if let Some(b'q') | Some(b'm') | Some(b's') = self.peek() {
528 let kw = self.read_name().unwrap_or_default();
529 if kw != "query" && kw != "mutation" && kw != "subscription" {
530 return Err(format!("unexpected keyword '{kw}' at document start"));
534 }
535 self.skip_ws();
537 if self
538 .peek()
539 .is_some_and(|b| b.is_ascii_alphabetic() || b == b'_')
540 {
541 self.read_name();
542 }
543 }
544
545 self.skip_ws();
546 self.expect(b'{')?;
547
548 let mut selections = Vec::new();
549 loop {
550 self.skip_ws();
551 if self.peek() == Some(b'}') {
552 self.consume();
553 break;
554 }
555 let field = self.read_name().ok_or("expected selection field name")?;
556 self.skip_ws();
557
558 let mut args = HashMap::new();
559 if self.peek() == Some(b'(') {
560 self.consume();
561 args = self.read_args()?;
562 }
563
564 self.skip_ws();
565 let mut sub_fields = Vec::new();
566 if self.peek() == Some(b'{') {
567 self.consume();
568 sub_fields = self.read_sub_fields()?;
569 }
570
571 selections.push(ParsedSelection {
572 field,
573 args,
574 sub_fields,
575 });
576 }
577
578 Ok(selections)
579 }
580}
581
582pub struct GraphqlExecutor {
600 store: Arc<dyn EntityStore>,
601 schema: RwLock<GraphqlSchema>,
602}
603
604impl GraphqlExecutor {
605 pub fn new(store: Arc<dyn EntityStore>) -> Self {
607 Self {
608 store,
609 schema: RwLock::new(GraphqlSchema::new()),
610 }
611 }
612
613 pub fn register_schema(&self, entity_schema: EntitySchema) {
615 let mut schema = self.schema.write().expect("schema lock poisoned");
616 schema.add_entity(entity_schema);
617 }
618
619 pub fn introspect(&self) -> String {
621 let schema = self.schema.read().expect("schema lock poisoned");
622 schema.sdl()
623 }
624
625 pub async fn execute(&self, query: &str) -> GraphqlResponse {
627 let selections = match Parser::new(query).parse() {
628 Ok(s) => s,
629 Err(e) => return GraphqlResponse::err(format!("Parse error: {e}")),
630 };
631
632 let mut data_map = serde_json::Map::new();
633 let mut errors: Vec<GraphqlError> = Vec::new();
634
635 for sel in selections {
636 if sel.field == "__schema" || sel.field == "__type" {
638 let sdl = self.introspect();
639 data_map.insert(sel.field.clone(), JsonValue::String(sdl));
640 continue;
641 }
642
643 match self.execute_selection(&sel).await {
644 Ok(value) => {
645 data_map.insert(sel.field.clone(), value);
646 }
647 Err(e) => {
648 errors.push(e);
649 }
650 }
651 }
652
653 if !errors.is_empty() {
654 return GraphqlResponse::errors(errors);
655 }
656
657 GraphqlResponse::ok(JsonValue::Object(data_map))
658 }
659
660 async fn execute_selection(&self, sel: &ParsedSelection) -> Result<JsonValue, GraphqlError> {
662 let field = &sel.field;
663
664 let (entity_type, is_singular) = self.resolve_entity_type(field);
669
670 let entity_type = entity_type.ok_or_else(|| {
671 GraphqlError::new(format!("Unknown field '{}': no entity type found", field))
672 })?;
673
674 if is_singular {
675 self.execute_single(&entity_type, sel).await
676 } else {
677 self.execute_collection(&entity_type, sel).await
678 }
679 }
680
681 fn resolve_entity_type(&self, field: &str) -> (Option<String>, bool) {
688 let schema = self.schema.read().expect("schema lock poisoned");
689 if schema.entities.iter().any(|e| e.name == field) {
691 return (Some(field.to_string()), true);
692 }
693 for entity in &schema.entities {
695 let plural = format!("{}s", entity.name);
696 if *field == plural {
697 return (Some(entity.name.clone()), false);
698 }
699 }
700 (None, false)
702 }
703
704 async fn execute_single(
706 &self,
707 entity_type: &str,
708 sel: &ParsedSelection,
709 ) -> Result<JsonValue, GraphqlError> {
710 let id = sel
711 .args
712 .get("id")
713 .and_then(|v| v.as_str())
714 .ok_or_else(|| GraphqlError::new("Singular query requires an 'id' argument"))?;
715
716 let _query = EntityQuery::new(entity_type)
717 .filter(QueryFilter::Eq(
718 "id".to_string(),
719 JsonValue::String(id.to_string()),
720 ))
721 .limit(1);
722
723 let query_no_id_filter = EntityQuery::new(entity_type);
730 let mut rows = self
731 .store
732 .query(query_no_id_filter)
733 .await
734 .map_err(GraphqlError::from)?;
735
736 rows.retain(|r| r.id == id);
737
738 if rows.is_empty() {
739 return Ok(JsonValue::Null);
740 }
741
742 let row = &rows[0];
743 Ok(self.project_row(row, &sel.sub_fields))
744 }
745
746 async fn execute_collection(
748 &self,
749 entity_type: &str,
750 sel: &ParsedSelection,
751 ) -> Result<JsonValue, GraphqlError> {
752 let first = sel.args.get("first").and_then(|v| v.as_usize());
753 let skip = sel.args.get("skip").and_then(|v| v.as_usize());
754 let order_by = sel
755 .args
756 .get("orderBy")
757 .and_then(|v| v.as_str())
758 .map(|s| s.to_string());
759 let order_direction = sel
760 .args
761 .get("orderDirection")
762 .and_then(|v| v.as_str())
763 .unwrap_or("asc")
764 .to_lowercase();
765
766 let sort_order = if order_direction == "desc" {
767 SortOrder::Desc
768 } else {
769 SortOrder::Asc
770 };
771
772 let filters = if let Some(where_arg) = sel.args.get("where") {
774 let obj = where_arg
775 .as_obj()
776 .ok_or_else(|| GraphqlError::new("'where' argument must be an object"))?;
777 self.parse_where_filters(obj)?
778 } else {
779 Vec::new()
780 };
781
782 let mut q = EntityQuery::new(entity_type);
783 for f in filters {
784 q = q.filter(f);
785 }
786 if let Some(ob) = order_by {
787 q = q.order_by(ob, sort_order);
788 }
789 if let Some(n) = first {
790 q = q.limit(n);
791 }
792 if let Some(n) = skip {
793 q = q.offset(n);
794 }
795
796 let rows = self.store.query(q).await.map_err(GraphqlError::from)?;
797
798 let values: Vec<JsonValue> = rows
799 .iter()
800 .map(|row| self.project_row(row, &sel.sub_fields))
801 .collect();
802
803 Ok(JsonValue::Array(values))
804 }
805
806 fn parse_where_filters(
816 &self,
817 obj: &HashMap<String, ArgValue>,
818 ) -> Result<Vec<QueryFilter>, GraphqlError> {
819 let mut filters = Vec::new();
820
821 for (key, val) in obj {
822 let json_val = arg_to_json(val);
823
824 if let Some(field) = key.strip_suffix("_gt") {
825 filters.push(QueryFilter::Gt(field.to_string(), json_val));
826 } else if let Some(field) = key.strip_suffix("_lt") {
827 filters.push(QueryFilter::Lt(field.to_string(), json_val));
828 } else if let Some(field) = key.strip_suffix("_gte") {
829 filters.push(QueryFilter::Gte(field.to_string(), json_val));
830 } else if let Some(field) = key.strip_suffix("_lte") {
831 filters.push(QueryFilter::Lte(field.to_string(), json_val));
832 } else if let Some(field) = key.strip_suffix("_in") {
833 let items = match json_val {
836 JsonValue::Array(arr) => arr,
837 JsonValue::String(s) => {
838 serde_json::from_str::<Vec<JsonValue>>(&s)
840 .unwrap_or_else(|_| vec![JsonValue::String(s)])
841 }
842 other => vec![other],
843 };
844 filters.push(QueryFilter::In(field.to_string(), items));
845 } else {
846 filters.push(QueryFilter::Eq(key.clone(), json_val));
848 }
849 }
850
851 Ok(filters)
852 }
853
854 fn project_row(&self, row: &crate::entity::EntityRow, sub_fields: &[String]) -> JsonValue {
858 let mut obj = serde_json::Map::new();
859
860 let include_all = sub_fields.is_empty();
861
862 let want = |name: &str| -> bool { include_all || sub_fields.iter().any(|f| f == name) };
863
864 if want("id") {
866 obj.insert("id".to_string(), JsonValue::String(row.id.clone()));
867 }
868 if want("blockNumber") {
869 obj.insert(
870 "blockNumber".to_string(),
871 JsonValue::Number(row.block_number.into()),
872 );
873 }
874 if want("txHash") {
875 obj.insert("txHash".to_string(), JsonValue::String(row.tx_hash.clone()));
876 }
877 if want("logIndex") {
878 obj.insert(
879 "logIndex".to_string(),
880 JsonValue::Number(row.log_index.into()),
881 );
882 }
883
884 for (k, v) in &row.data {
886 if want(k) {
887 obj.insert(k.clone(), v.clone());
888 }
889 }
890
891 JsonValue::Object(obj)
892 }
893}
894
895fn arg_to_json(val: &ArgValue) -> JsonValue {
899 match val {
900 ArgValue::Str(s) => JsonValue::String(s.clone()),
901 ArgValue::Num(n) => {
902 if n.fract() == 0.0 && *n >= 0.0 && *n <= u64::MAX as f64 {
904 JsonValue::Number((*n as u64).into())
905 } else if n.fract() == 0.0 && *n < 0.0 && *n >= i64::MIN as f64 {
906 JsonValue::Number((*n as i64).into())
907 } else {
908 serde_json::Number::from_f64(*n)
909 .map(JsonValue::Number)
910 .unwrap_or(JsonValue::Null)
911 }
912 }
913 ArgValue::Ident(s) => {
914 match s.as_str() {
916 "true" => JsonValue::Bool(true),
917 "false" => JsonValue::Bool(false),
918 "null" => JsonValue::Null,
919 _ => JsonValue::String(s.clone()),
920 }
921 }
922 ArgValue::Obj(map) => {
923 let mut obj = serde_json::Map::new();
924 for (k, v) in map {
925 obj.insert(k.clone(), arg_to_json(v));
926 }
927 JsonValue::Object(obj)
928 }
929 }
930}
931
932fn pascal_case(s: &str) -> String {
934 s.split('_')
935 .map(|part| {
936 let mut c = part.chars();
937 match c.next() {
938 None => String::new(),
939 Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
940 }
941 })
942 .collect()
943}
944
945fn field_type_to_gql(ft: &FieldType, nullable: bool) -> String {
947 let base = field_type_to_gql_scalar(ft);
948 if nullable {
949 base.to_string()
950 } else {
951 format!("{}!", base)
952 }
953}
954
955fn field_type_to_gql_scalar(ft: &FieldType) -> &'static str {
957 match ft {
958 FieldType::String => "String",
959 FieldType::Int64 => "BigInt",
960 FieldType::Uint64 => "BigInt",
961 FieldType::Float64 => "Float",
962 FieldType::Bool => "Boolean",
963 FieldType::Json => "String",
964 FieldType::Bytes => "String",
965 }
966}
967
968#[cfg(test)]
971mod tests {
972 use std::collections::HashMap;
973 use std::sync::Arc;
974
975 use super::*;
976 use crate::entity::{
977 EntityRow, EntitySchemaBuilder, EntityStore, FieldType, MemoryEntityStore,
978 };
979
980 fn swap_schema() -> EntitySchema {
983 EntitySchemaBuilder::new("swap")
984 .primary_key("id")
985 .field("pool", FieldType::String, true)
986 .field("amount0", FieldType::Uint64, false)
987 .field("amount1", FieldType::Uint64, false)
988 .nullable_field("trader", FieldType::String, false)
989 .build()
990 }
991
992 fn transfer_schema() -> EntitySchema {
993 EntitySchemaBuilder::new("transfer")
994 .primary_key("id")
995 .field("from", FieldType::String, true)
996 .field("to", FieldType::String, true)
997 .field("value", FieldType::Uint64, false)
998 .build()
999 }
1000
1001 fn make_swap(id: &str, pool: &str, amount0: u64, amount1: u64, block: u64) -> EntityRow {
1002 let mut data = HashMap::new();
1003 data.insert("pool".to_string(), serde_json::json!(pool));
1004 data.insert("amount0".to_string(), serde_json::json!(amount0));
1005 data.insert("amount1".to_string(), serde_json::json!(amount1));
1006 EntityRow {
1007 id: id.to_string(),
1008 entity_type: "swap".to_string(),
1009 block_number: block,
1010 tx_hash: format!("0xtx_{id}"),
1011 log_index: 0,
1012 data,
1013 }
1014 }
1015
1016 fn make_transfer(id: &str, from: &str, to: &str, value: u64, block: u64) -> EntityRow {
1017 let mut data = HashMap::new();
1018 data.insert("from".to_string(), serde_json::json!(from));
1019 data.insert("to".to_string(), serde_json::json!(to));
1020 data.insert("value".to_string(), serde_json::json!(value));
1021 EntityRow {
1022 id: id.to_string(),
1023 entity_type: "transfer".to_string(),
1024 block_number: block,
1025 tx_hash: format!("0xtx_{id}"),
1026 log_index: 0,
1027 data,
1028 }
1029 }
1030
1031 async fn seeded_executor() -> GraphqlExecutor {
1032 let store = Arc::new(MemoryEntityStore::new());
1033 store.register_schema(&swap_schema()).await.unwrap();
1034 store.register_schema(&transfer_schema()).await.unwrap();
1035
1036 store
1037 .upsert(make_swap("s1", "0xPOOL_A", 1000, 500, 10))
1038 .await
1039 .unwrap();
1040 store
1041 .upsert(make_swap("s2", "0xPOOL_A", 2000, 1000, 11))
1042 .await
1043 .unwrap();
1044 store
1045 .upsert(make_swap("s3", "0xPOOL_B", 3000, 1500, 12))
1046 .await
1047 .unwrap();
1048
1049 store
1050 .upsert(make_transfer("t1", "0xAlice", "0xBob", 100, 10))
1051 .await
1052 .unwrap();
1053 store
1054 .upsert(make_transfer("t2", "0xBob", "0xCharlie", 200, 11))
1055 .await
1056 .unwrap();
1057
1058 let executor = GraphqlExecutor::new(store);
1059 executor.register_schema(swap_schema());
1060 executor.register_schema(transfer_schema());
1061 executor
1062 }
1063
1064 #[test]
1067 fn test_schema_generation_contains_type() {
1068 let mut gql_schema = GraphqlSchema::new();
1069 gql_schema.add_entity(swap_schema());
1070 let sdl = gql_schema.sdl();
1071
1072 assert!(sdl.contains("type Swap {"), "SDL missing Swap type:\n{sdl}");
1073 assert!(
1074 sdl.contains("pool: String!"),
1075 "SDL missing pool field:\n{sdl}"
1076 );
1077 assert!(
1078 sdl.contains("amount0: BigInt!"),
1079 "SDL missing amount0 field:\n{sdl}"
1080 );
1081 assert!(
1082 sdl.contains("trader: String"),
1083 "SDL missing nullable trader field:\n{sdl}"
1084 );
1085 }
1086
1087 #[test]
1090 fn test_schema_generation_filter_input() {
1091 let mut gql_schema = GraphqlSchema::new();
1092 gql_schema.add_entity(swap_schema());
1093 let sdl = gql_schema.sdl();
1094
1095 assert!(
1096 sdl.contains("input swap_filter {"),
1097 "SDL missing swap_filter input:\n{sdl}"
1098 );
1099 assert!(
1100 sdl.contains("amount0_gt:"),
1101 "SDL missing amount0_gt in filter:\n{sdl}"
1102 );
1103 assert!(
1104 sdl.contains("pool_in:"),
1105 "SDL missing pool_in in filter:\n{sdl}"
1106 );
1107 }
1108
1109 #[test]
1112 fn test_schema_generation_query_type() {
1113 let mut gql_schema = GraphqlSchema::new();
1114 gql_schema.add_entity(swap_schema());
1115 let sdl = gql_schema.sdl();
1116
1117 assert!(
1118 sdl.contains("type Query {"),
1119 "SDL missing Query type:\n{sdl}"
1120 );
1121 assert!(
1122 sdl.contains("swap(id: String!): Swap"),
1123 "SDL missing singular swap:\n{sdl}"
1124 );
1125 assert!(sdl.contains("swaps("), "SDL missing plural swaps:\n{sdl}");
1126 }
1127
1128 #[test]
1131 fn test_pascal_case_conversion() {
1132 assert_eq!(pascal_case("swap"), "Swap");
1133 assert_eq!(pascal_case("erc20_transfer"), "Erc20Transfer");
1134 assert_eq!(pascal_case("uniswap_v3_pool"), "UniswapV3Pool");
1135 }
1136
1137 #[tokio::test]
1140 async fn test_introspection() {
1141 let executor = seeded_executor().await;
1142 let sdl = executor.introspect();
1143 assert!(
1144 sdl.contains("type Swap {"),
1145 "introspect missing Swap type:\n{sdl}"
1146 );
1147 assert!(
1148 sdl.contains("type Transfer {"),
1149 "introspect missing Transfer type:\n{sdl}"
1150 );
1151 }
1152
1153 #[tokio::test]
1156 async fn test_introspection_query() {
1157 let executor = seeded_executor().await;
1158 let resp = executor.execute("{ __schema { types { name } } }").await;
1159 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1160 let data = resp.data.unwrap();
1161 let sdl = data["__schema"].as_str().unwrap();
1162 assert!(sdl.contains("type Swap {"));
1163 }
1164
1165 #[tokio::test]
1168 async fn test_collection_query_all() {
1169 let executor = seeded_executor().await;
1170 let resp = executor
1171 .execute("{ swaps { id pool amount0 amount1 } }")
1172 .await;
1173 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1174 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1175 assert_eq!(arr.len(), 3);
1176 }
1177
1178 #[tokio::test]
1181 async fn test_singular_query_by_id() {
1182 let executor = seeded_executor().await;
1183 let resp = executor
1184 .execute(r#"{ swap(id: "s2") { id pool amount0 } }"#)
1185 .await;
1186 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1187 let row = &resp.data.unwrap()["swap"];
1188 assert_eq!(row["id"], "s2");
1189 assert_eq!(row["pool"], "0xPOOL_A");
1190 assert_eq!(row["amount0"], 2000);
1191 }
1192
1193 #[tokio::test]
1196 async fn test_singular_query_missing_id() {
1197 let executor = seeded_executor().await;
1198 let resp = executor
1199 .execute(r#"{ swap(id: "nonexistent") { id } }"#)
1200 .await;
1201 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1202 assert_eq!(resp.data.unwrap()["swap"], JsonValue::Null);
1203 }
1204
1205 #[tokio::test]
1208 async fn test_collection_with_where_filter() {
1209 let executor = seeded_executor().await;
1210 let resp = executor
1211 .execute(r#"{ swaps(where: { pool: "0xPOOL_A" }) { id pool } }"#)
1212 .await;
1213 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1214 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1215 assert_eq!(arr.len(), 2);
1216 for row in &arr {
1217 assert_eq!(row["pool"], "0xPOOL_A");
1218 }
1219 }
1220
1221 #[tokio::test]
1224 async fn test_collection_pagination() {
1225 let executor = seeded_executor().await;
1226 let resp = executor
1228 .execute(
1229 r#"{ swaps(first: 1, skip: 1, orderBy: "amount0", orderDirection: "asc") { id amount0 } }"#,
1230 )
1231 .await;
1232 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1233 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1234 assert_eq!(arr.len(), 1);
1235 assert_eq!(arr[0]["amount0"], 2000);
1236 }
1237
1238 #[tokio::test]
1241 async fn test_collection_order_desc() {
1242 let executor = seeded_executor().await;
1243 let resp = executor
1244 .execute(r#"{ swaps(orderBy: "amount0", orderDirection: "desc") { id amount0 } }"#)
1245 .await;
1246 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1247 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1248 assert_eq!(arr.len(), 3);
1249 let first_amount = arr[0]["amount0"].as_u64().unwrap();
1250 let last_amount = arr[2]["amount0"].as_u64().unwrap();
1251 assert!(first_amount > last_amount, "expected descending order");
1252 }
1253
1254 #[tokio::test]
1257 async fn test_unknown_entity_returns_error() {
1258 let executor = seeded_executor().await;
1259 let resp = executor.execute("{ unknownEntity { id } }").await;
1260 assert!(resp.is_error(), "expected an error for unknown entity");
1261 let errs = resp.errors.unwrap();
1262 assert!(
1263 errs[0].message.contains("Unknown field"),
1264 "wrong error message: {}",
1265 errs[0].message
1266 );
1267 }
1268
1269 #[tokio::test]
1272 async fn test_where_gt_filter() {
1273 let executor = seeded_executor().await;
1274 let resp = executor
1275 .execute(r#"{ swaps(where: { amount0_gt: 1000 }) { id amount0 } }"#)
1276 .await;
1277 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1278 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1279 assert_eq!(arr.len(), 2);
1281 for row in &arr {
1282 assert!(row["amount0"].as_u64().unwrap() > 1000);
1283 }
1284 }
1285
1286 #[test]
1289 fn test_response_ok_format() {
1290 let resp = GraphqlResponse::ok(serde_json::json!({ "swap": { "id": "s1" } }));
1291 assert!(!resp.is_error());
1292 let json = serde_json::to_value(&resp).unwrap();
1293 assert!(json.get("data").is_some());
1294 assert!(json.get("errors").is_none());
1295 assert_eq!(json["data"]["swap"]["id"], "s1");
1296 }
1297
1298 #[test]
1301 fn test_response_error_format() {
1302 let resp = GraphqlResponse::err("something went wrong");
1303 assert!(resp.is_error());
1304 let json = serde_json::to_value(&resp).unwrap();
1305 assert!(json.get("errors").is_some());
1306 assert!(json.get("data").is_none());
1307 assert_eq!(json["errors"][0]["message"], "something went wrong");
1308 }
1309
1310 #[tokio::test]
1313 async fn test_field_projection() {
1314 let executor = seeded_executor().await;
1315 let resp = executor
1317 .execute(
1318 r#"{ swaps(first: 1, orderBy: "amount0", orderDirection: "asc") { id pool } }"#,
1319 )
1320 .await;
1321 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1322 let row = &resp.data.unwrap()["swaps"][0];
1323 assert!(row.get("id").is_some());
1324 assert!(row.get("pool").is_some());
1325 assert!(
1326 row.get("amount0").is_none(),
1327 "amount0 should be projected out"
1328 );
1329 }
1330
1331 #[tokio::test]
1334 async fn test_multi_entity_query() {
1335 let executor = seeded_executor().await;
1336 let resp = executor.execute("{ swaps { id } transfers { id } }").await;
1337 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1338 let data = resp.data.unwrap();
1339 assert_eq!(data["swaps"].as_array().unwrap().len(), 3);
1340 assert_eq!(data["transfers"].as_array().unwrap().len(), 2);
1341 }
1342
1343 #[tokio::test]
1346 async fn test_parse_error() {
1347 let executor = seeded_executor().await;
1348 let resp = executor.execute("{ unclosed { id ").await;
1349 assert!(resp.is_error(), "expected parse error");
1350 }
1351
1352 #[test]
1355 fn test_subscription_config_default() {
1356 let cfg = SubscriptionConfig::default();
1357 assert!(cfg.entity_types.is_empty());
1358 assert_eq!(cfg.buffer_size, 256);
1359 assert!(cfg.events.contains(&SubscriptionEvent::Insert));
1360 assert!(cfg.events.contains(&SubscriptionEvent::Reorg));
1361 assert!(cfg.from_block.is_none());
1362 }
1363
1364 #[test]
1367 fn test_subscription_config_serialization() {
1368 let cfg = SubscriptionConfig {
1369 entity_types: vec!["swap".to_string()],
1370 events: vec![SubscriptionEvent::Insert, SubscriptionEvent::Delete],
1371 from_block: Some(1_000_000),
1372 buffer_size: 64,
1373 };
1374 let json = serde_json::to_value(&cfg).unwrap();
1375 assert_eq!(json["entity_types"][0], "swap");
1376 assert_eq!(json["from_block"], 1_000_000);
1377 assert_eq!(json["events"][0], "insert");
1378 assert_eq!(json["events"][1], "delete");
1379 }
1380
1381 #[tokio::test]
1384 async fn test_where_lte_filter() {
1385 let executor = seeded_executor().await;
1386 let resp = executor
1387 .execute(r#"{ swaps(where: { amount0_lte: 2000 }) { id amount0 } }"#)
1388 .await;
1389 assert!(!resp.is_error(), "unexpected error: {:?}", resp.errors);
1390 let arr = resp.data.unwrap()["swaps"].as_array().unwrap().clone();
1391 assert_eq!(arr.len(), 2);
1393 for row in &arr {
1394 assert!(row["amount0"].as_u64().unwrap() <= 2000);
1395 }
1396 }
1397
1398 #[tokio::test]
1401 async fn test_singular_without_id_returns_error() {
1402 let executor = seeded_executor().await;
1403 let resp = executor.execute("{ swap { id pool } }").await;
1405 assert!(resp.is_error(), "expected error for singular without id");
1407 }
1408}