1use std::collections::HashMap;
29use std::sync::Mutex;
30
31use serde::{Deserialize, Serialize};
32
33use crate::error::IndexerError;
34
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub enum FieldType {
42 String,
44 Int64,
46 Uint64,
48 Float64,
50 Bool,
52 Json,
54 Bytes,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct EntityField {
65 pub name: std::string::String,
67 pub field_type: FieldType,
69 pub indexed: bool,
71 pub nullable: bool,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct EntitySchema {
83 pub name: std::string::String,
85 pub primary_key: std::string::String,
87 pub fields: Vec<EntityField>,
89 }
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EntityRow {
101 pub id: std::string::String,
103 pub entity_type: std::string::String,
105 pub block_number: u64,
107 pub tx_hash: std::string::String,
109 pub log_index: u32,
111 pub data: HashMap<std::string::String, serde_json::Value>,
113}
114
115#[derive(Debug, Clone, Default)]
130pub struct EntityQuery {
131 pub entity_type: std::string::String,
133 pub filters: Vec<QueryFilter>,
135 pub order_by: Option<(std::string::String, SortOrder)>,
137 pub limit: Option<usize>,
139 pub offset: Option<usize>,
141}
142
143impl EntityQuery {
144 pub fn new(entity_type: impl Into<std::string::String>) -> Self {
146 Self {
147 entity_type: entity_type.into(),
148 filters: Vec::new(),
149 order_by: None,
150 limit: None,
151 offset: None,
152 }
153 }
154
155 pub fn filter(mut self, f: QueryFilter) -> Self {
157 self.filters.push(f);
158 self
159 }
160
161 pub fn order_by(mut self, field: impl Into<std::string::String>, order: SortOrder) -> Self {
163 self.order_by = Some((field.into(), order));
164 self
165 }
166
167 pub fn limit(mut self, n: usize) -> Self {
169 self.limit = Some(n);
170 self
171 }
172
173 pub fn offset(mut self, n: usize) -> Self {
175 self.offset = Some(n);
176 self
177 }
178}
179
180#[derive(Debug, Clone)]
182pub enum QueryFilter {
183 Eq(std::string::String, serde_json::Value),
185 Gt(std::string::String, serde_json::Value),
187 Lt(std::string::String, serde_json::Value),
189 Gte(std::string::String, serde_json::Value),
191 Lte(std::string::String, serde_json::Value),
193 In(std::string::String, Vec<serde_json::Value>),
195 Between(std::string::String, serde_json::Value, serde_json::Value),
197}
198
199#[derive(Debug, Clone, Copy)]
201pub enum SortOrder {
202 Asc,
204 Desc,
206}
207
208#[async_trait::async_trait]
215pub trait EntityStore: Send + Sync {
216 async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError>;
218
219 async fn insert(&self, row: EntityRow) -> Result<(), IndexerError>;
221
222 async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError>;
225
226 async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError>;
228
229 async fn delete_after_block(
232 &self,
233 entity_type: &str,
234 block_number: u64,
235 ) -> Result<u64, IndexerError>;
236
237 async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError>;
239
240 async fn count(&self, entity_type: &str) -> Result<u64, IndexerError>;
242}
243
244pub struct EntitySchemaBuilder {
262 name: std::string::String,
263 primary_key: std::string::String,
264 fields: Vec<EntityField>,
265}
266
267impl EntitySchemaBuilder {
268 pub fn new(name: impl Into<std::string::String>) -> Self {
270 Self {
271 name: name.into(),
272 primary_key: "id".to_string(),
273 fields: Vec::new(),
274 }
275 }
276
277 pub fn primary_key(mut self, pk: impl Into<std::string::String>) -> Self {
279 self.primary_key = pk.into();
280 self
281 }
282
283 pub fn field(
285 mut self,
286 name: impl Into<std::string::String>,
287 field_type: FieldType,
288 indexed: bool,
289 ) -> Self {
290 self.fields.push(EntityField {
291 name: name.into(),
292 field_type,
293 indexed,
294 nullable: false,
295 });
296 self
297 }
298
299 pub fn nullable_field(
301 mut self,
302 name: impl Into<std::string::String>,
303 field_type: FieldType,
304 indexed: bool,
305 ) -> Self {
306 self.fields.push(EntityField {
307 name: name.into(),
308 field_type,
309 indexed,
310 nullable: true,
311 });
312 self
313 }
314
315 pub fn build(self) -> EntitySchema {
317 EntitySchema {
318 name: self.name,
319 primary_key: self.primary_key,
320 fields: self.fields,
321 }
322 }
323}
324
325pub struct MemoryEntityStore {
332 schemas: Mutex<HashMap<std::string::String, EntitySchema>>,
334 rows: Mutex<HashMap<(std::string::String, std::string::String), EntityRow>>,
336}
337
338impl MemoryEntityStore {
339 pub fn new() -> Self {
341 Self {
342 schemas: Mutex::new(HashMap::new()),
343 rows: Mutex::new(HashMap::new()),
344 }
345 }
346}
347
348impl Default for MemoryEntityStore {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354fn matches_filter(row: &EntityRow, filter: &QueryFilter) -> bool {
356 match filter {
357 QueryFilter::Eq(field, value) => row.data.get(field) == Some(value),
358 QueryFilter::Gt(field, value) => row
359 .data
360 .get(field)
361 .is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Greater)),
362 QueryFilter::Lt(field, value) => row
363 .data
364 .get(field)
365 .is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Less)),
366 QueryFilter::Gte(field, value) => row.data.get(field).is_some_and(|v| {
367 matches!(
368 json_cmp(v, value),
369 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
370 )
371 }),
372 QueryFilter::Lte(field, value) => row.data.get(field).is_some_and(|v| {
373 matches!(
374 json_cmp(v, value),
375 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
376 )
377 }),
378 QueryFilter::In(field, values) => row.data.get(field).is_some_and(|v| values.contains(v)),
379 QueryFilter::Between(field, low, high) => row.data.get(field).is_some_and(|v| {
380 matches!(
381 json_cmp(v, low),
382 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
383 ) && matches!(
384 json_cmp(v, high),
385 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
386 )
387 }),
388 }
389}
390
391fn json_cmp(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
393 if let (Some(an), Some(bn)) = (a.as_f64(), b.as_f64()) {
395 return an.partial_cmp(&bn);
396 }
397 if let (Some(a_str), Some(b_str)) = (a.as_str(), b.as_str()) {
399 return Some(a_str.cmp(b_str));
400 }
401 None
402}
403
404#[async_trait::async_trait]
405impl EntityStore for MemoryEntityStore {
406 async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError> {
407 let mut schemas = self
408 .schemas
409 .lock()
410 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
411 schemas.insert(schema.name.clone(), schema.clone());
412 Ok(())
413 }
414
415 async fn insert(&self, row: EntityRow) -> Result<(), IndexerError> {
416 let mut rows = self
417 .rows
418 .lock()
419 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
420 let key = (row.entity_type.clone(), row.id.clone());
421 if rows.contains_key(&key) {
422 return Err(IndexerError::Storage(format!(
423 "entity '{}' with id '{}' already exists",
424 row.entity_type, row.id
425 )));
426 }
427 rows.insert(key, row);
428 Ok(())
429 }
430
431 async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError> {
432 let mut rows = self
433 .rows
434 .lock()
435 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
436 let key = (row.entity_type.clone(), row.id.clone());
437 rows.insert(key, row);
438 Ok(())
439 }
440
441 async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError> {
442 let mut rows = self
443 .rows
444 .lock()
445 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
446 rows.remove(&(entity_type.to_string(), id.to_string()));
447 Ok(())
448 }
449
450 async fn delete_after_block(
451 &self,
452 entity_type: &str,
453 block_number: u64,
454 ) -> Result<u64, IndexerError> {
455 let mut rows = self
456 .rows
457 .lock()
458 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
459 let to_remove: Vec<_> = rows
460 .iter()
461 .filter(|((et, _), row)| et == entity_type && row.block_number > block_number)
462 .map(|(key, _)| key.clone())
463 .collect();
464 let count = to_remove.len() as u64;
465 for key in to_remove {
466 rows.remove(&key);
467 }
468 Ok(count)
469 }
470
471 async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError> {
472 let rows = self
473 .rows
474 .lock()
475 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
476
477 let mut results: Vec<EntityRow> = rows
479 .values()
480 .filter(|row| {
481 row.entity_type == query.entity_type
482 && query.filters.iter().all(|f| matches_filter(row, f))
483 })
484 .cloned()
485 .collect();
486
487 if let Some((ref field, ref order)) = query.order_by {
489 results.sort_by(|a, b| {
490 let va = a.data.get(field);
491 let vb = b.data.get(field);
492 let cmp = match (va, vb) {
493 (Some(va), Some(vb)) => json_cmp(va, vb).unwrap_or(std::cmp::Ordering::Equal),
494 (Some(_), None) => std::cmp::Ordering::Less,
495 (None, Some(_)) => std::cmp::Ordering::Greater,
496 (None, None) => std::cmp::Ordering::Equal,
497 };
498 match order {
499 SortOrder::Asc => cmp,
500 SortOrder::Desc => cmp.reverse(),
501 }
502 });
503 }
504
505 if let Some(offset) = query.offset {
507 if offset < results.len() {
508 results = results.split_off(offset);
509 } else {
510 results.clear();
511 }
512 }
513
514 if let Some(limit) = query.limit {
516 results.truncate(limit);
517 }
518
519 Ok(results)
520 }
521
522 async fn count(&self, entity_type: &str) -> Result<u64, IndexerError> {
523 let rows = self
524 .rows
525 .lock()
526 .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
527 let count = rows
528 .values()
529 .filter(|row| row.entity_type == entity_type)
530 .count() as u64;
531 Ok(count)
532 }
533}
534
535#[cfg(test)]
538mod tests {
539 use super::*;
540
541 fn test_schema() -> EntitySchema {
542 EntitySchemaBuilder::new("transfer")
543 .primary_key("id")
544 .field("from", FieldType::String, true)
545 .field("to", FieldType::String, true)
546 .field("amount", FieldType::Uint64, false)
547 .nullable_field("memo", FieldType::String, false)
548 .build()
549 }
550
551 fn make_row(id: &str, from: &str, to: &str, amount: u64, block: u64) -> EntityRow {
552 let mut data = HashMap::new();
553 data.insert("from".to_string(), serde_json::json!(from));
554 data.insert("to".to_string(), serde_json::json!(to));
555 data.insert("amount".to_string(), serde_json::json!(amount));
556 EntityRow {
557 id: id.to_string(),
558 entity_type: "transfer".to_string(),
559 block_number: block,
560 tx_hash: format!("0xtx_{id}"),
561 log_index: 0,
562 data,
563 }
564 }
565
566 #[tokio::test]
567 async fn register_schema() {
568 let store = MemoryEntityStore::new();
569 let schema = test_schema();
570 store.register_schema(&schema).await.unwrap();
571 store.register_schema(&schema).await.unwrap();
573 }
574
575 #[tokio::test]
576 async fn insert_and_query() {
577 let store = MemoryEntityStore::new();
578 store.register_schema(&test_schema()).await.unwrap();
579
580 let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
581 store.insert(row).await.unwrap();
582
583 let results = store.query(EntityQuery::new("transfer")).await.unwrap();
584 assert_eq!(results.len(), 1);
585 assert_eq!(results[0].id, "t1");
586 }
587
588 #[tokio::test]
589 async fn insert_duplicate_fails() {
590 let store = MemoryEntityStore::new();
591 store.register_schema(&test_schema()).await.unwrap();
592
593 let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
594 store.insert(row.clone()).await.unwrap();
595
596 let err = store.insert(row).await.unwrap_err();
597 let msg = format!("{err}");
598 assert!(msg.contains("already exists"), "got: {msg}");
599 }
600
601 #[tokio::test]
602 async fn upsert_overwrites() {
603 let store = MemoryEntityStore::new();
604 store.register_schema(&test_schema()).await.unwrap();
605
606 let row1 = make_row("t1", "0xAlice", "0xBob", 100, 10);
607 store.insert(row1).await.unwrap();
608
609 let row2 = make_row("t1", "0xAlice", "0xBob", 200, 11);
611 store.upsert(row2).await.unwrap();
612
613 let results = store.query(EntityQuery::new("transfer")).await.unwrap();
614 assert_eq!(results.len(), 1);
615 assert_eq!(results[0].data["amount"], serde_json::json!(200));
616 assert_eq!(results[0].block_number, 11);
617 }
618
619 #[tokio::test]
620 async fn delete_entity() {
621 let store = MemoryEntityStore::new();
622 store.register_schema(&test_schema()).await.unwrap();
623
624 store
625 .insert(make_row("t1", "0xA", "0xB", 100, 10))
626 .await
627 .unwrap();
628 store
629 .insert(make_row("t2", "0xA", "0xC", 200, 11))
630 .await
631 .unwrap();
632
633 store.delete("transfer", "t1").await.unwrap();
634
635 let count = store.count("transfer").await.unwrap();
636 assert_eq!(count, 1);
637
638 let results = store.query(EntityQuery::new("transfer")).await.unwrap();
639 assert_eq!(results[0].id, "t2");
640 }
641
642 #[tokio::test]
643 async fn delete_after_block_for_reorg() {
644 let store = MemoryEntityStore::new();
645 store.register_schema(&test_schema()).await.unwrap();
646
647 store
648 .insert(make_row("t1", "0xA", "0xB", 100, 10))
649 .await
650 .unwrap();
651 store
652 .insert(make_row("t2", "0xA", "0xC", 200, 11))
653 .await
654 .unwrap();
655 store
656 .insert(make_row("t3", "0xA", "0xD", 300, 12))
657 .await
658 .unwrap();
659 store
660 .insert(make_row("t4", "0xA", "0xE", 400, 13))
661 .await
662 .unwrap();
663
664 let deleted = store.delete_after_block("transfer", 11).await.unwrap();
666 assert_eq!(deleted, 2); let count = store.count("transfer").await.unwrap();
669 assert_eq!(count, 2); }
671
672 #[tokio::test]
673 async fn query_with_eq_filter() {
674 let store = MemoryEntityStore::new();
675 store.register_schema(&test_schema()).await.unwrap();
676
677 store
678 .insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
679 .await
680 .unwrap();
681 store
682 .insert(make_row("t2", "0xAlice", "0xCharlie", 200, 11))
683 .await
684 .unwrap();
685 store
686 .insert(make_row("t3", "0xBob", "0xCharlie", 300, 12))
687 .await
688 .unwrap();
689
690 let results = store
691 .query(
692 EntityQuery::new("transfer")
693 .filter(QueryFilter::Eq("from".into(), serde_json::json!("0xAlice"))),
694 )
695 .await
696 .unwrap();
697 assert_eq!(results.len(), 2);
698 assert!(results
699 .iter()
700 .all(|r| r.data["from"] == serde_json::json!("0xAlice")));
701 }
702
703 #[tokio::test]
704 async fn query_with_gt_lt_filters() {
705 let store = MemoryEntityStore::new();
706 store.register_schema(&test_schema()).await.unwrap();
707
708 store
709 .insert(make_row("t1", "0xA", "0xB", 100, 10))
710 .await
711 .unwrap();
712 store
713 .insert(make_row("t2", "0xA", "0xC", 200, 11))
714 .await
715 .unwrap();
716 store
717 .insert(make_row("t3", "0xA", "0xD", 300, 12))
718 .await
719 .unwrap();
720
721 let results = store
723 .query(
724 EntityQuery::new("transfer")
725 .filter(QueryFilter::Gt("amount".into(), serde_json::json!(100)))
726 .filter(QueryFilter::Lt("amount".into(), serde_json::json!(300))),
727 )
728 .await
729 .unwrap();
730 assert_eq!(results.len(), 1);
731 assert_eq!(results[0].id, "t2");
732 }
733
734 #[tokio::test]
735 async fn query_with_in_filter() {
736 let store = MemoryEntityStore::new();
737 store.register_schema(&test_schema()).await.unwrap();
738
739 store
740 .insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
741 .await
742 .unwrap();
743 store
744 .insert(make_row("t2", "0xBob", "0xCharlie", 200, 11))
745 .await
746 .unwrap();
747 store
748 .insert(make_row("t3", "0xDave", "0xEve", 300, 12))
749 .await
750 .unwrap();
751
752 let results = store
753 .query(EntityQuery::new("transfer").filter(QueryFilter::In(
754 "from".into(),
755 vec![serde_json::json!("0xAlice"), serde_json::json!("0xDave")],
756 )))
757 .await
758 .unwrap();
759 assert_eq!(results.len(), 2);
760 }
761
762 #[tokio::test]
763 async fn query_with_sort_and_limit() {
764 let store = MemoryEntityStore::new();
765 store.register_schema(&test_schema()).await.unwrap();
766
767 store
768 .insert(make_row("t1", "0xA", "0xB", 300, 10))
769 .await
770 .unwrap();
771 store
772 .insert(make_row("t2", "0xA", "0xC", 100, 11))
773 .await
774 .unwrap();
775 store
776 .insert(make_row("t3", "0xA", "0xD", 200, 12))
777 .await
778 .unwrap();
779
780 let results = store
782 .query(
783 EntityQuery::new("transfer")
784 .order_by("amount", SortOrder::Asc)
785 .limit(2),
786 )
787 .await
788 .unwrap();
789 assert_eq!(results.len(), 2);
790 assert_eq!(results[0].data["amount"], serde_json::json!(100));
791 assert_eq!(results[1].data["amount"], serde_json::json!(200));
792 }
793
794 #[tokio::test]
795 async fn query_with_sort_desc() {
796 let store = MemoryEntityStore::new();
797 store.register_schema(&test_schema()).await.unwrap();
798
799 store
800 .insert(make_row("t1", "0xA", "0xB", 100, 10))
801 .await
802 .unwrap();
803 store
804 .insert(make_row("t2", "0xA", "0xC", 300, 11))
805 .await
806 .unwrap();
807 store
808 .insert(make_row("t3", "0xA", "0xD", 200, 12))
809 .await
810 .unwrap();
811
812 let results = store
813 .query(EntityQuery::new("transfer").order_by("amount", SortOrder::Desc))
814 .await
815 .unwrap();
816 assert_eq!(results[0].data["amount"], serde_json::json!(300));
817 assert_eq!(results[1].data["amount"], serde_json::json!(200));
818 assert_eq!(results[2].data["amount"], serde_json::json!(100));
819 }
820
821 #[tokio::test]
822 async fn count_entities() {
823 let store = MemoryEntityStore::new();
824 store.register_schema(&test_schema()).await.unwrap();
825
826 assert_eq!(store.count("transfer").await.unwrap(), 0);
827
828 store
829 .insert(make_row("t1", "0xA", "0xB", 100, 10))
830 .await
831 .unwrap();
832 store
833 .insert(make_row("t2", "0xA", "0xC", 200, 11))
834 .await
835 .unwrap();
836
837 assert_eq!(store.count("transfer").await.unwrap(), 2);
838 assert_eq!(store.count("approval").await.unwrap(), 0);
840 }
841
842 #[tokio::test]
843 async fn schema_builder_defaults() {
844 let schema = EntitySchemaBuilder::new("test_entity")
845 .field("name", FieldType::String, true)
846 .field("value", FieldType::Uint64, false)
847 .build();
848
849 assert_eq!(schema.name, "test_entity");
850 assert_eq!(schema.primary_key, "id"); assert_eq!(schema.fields.len(), 2);
852 assert!(schema.fields[0].indexed);
853 assert!(!schema.fields[0].nullable);
854 assert!(!schema.fields[1].indexed);
855 }
856
857 #[tokio::test]
858 async fn query_with_between_filter() {
859 let store = MemoryEntityStore::new();
860 store.register_schema(&test_schema()).await.unwrap();
861
862 store
863 .insert(make_row("t1", "0xA", "0xB", 100, 10))
864 .await
865 .unwrap();
866 store
867 .insert(make_row("t2", "0xA", "0xC", 200, 11))
868 .await
869 .unwrap();
870 store
871 .insert(make_row("t3", "0xA", "0xD", 300, 12))
872 .await
873 .unwrap();
874 store
875 .insert(make_row("t4", "0xA", "0xE", 400, 13))
876 .await
877 .unwrap();
878
879 let results = store
880 .query(EntityQuery::new("transfer").filter(QueryFilter::Between(
881 "amount".into(),
882 serde_json::json!(200),
883 serde_json::json!(300),
884 )))
885 .await
886 .unwrap();
887 assert_eq!(results.len(), 2);
888 assert!(results.iter().all(|r| {
889 let amt = r.data["amount"].as_u64().unwrap();
890 (200..=300).contains(&amt)
891 }));
892 }
893
894 #[tokio::test]
895 async fn query_with_offset() {
896 let store = MemoryEntityStore::new();
897 store.register_schema(&test_schema()).await.unwrap();
898
899 store
900 .insert(make_row("t1", "0xA", "0xB", 100, 10))
901 .await
902 .unwrap();
903 store
904 .insert(make_row("t2", "0xA", "0xC", 200, 11))
905 .await
906 .unwrap();
907 store
908 .insert(make_row("t3", "0xA", "0xD", 300, 12))
909 .await
910 .unwrap();
911
912 let results = store
914 .query(
915 EntityQuery::new("transfer")
916 .order_by("amount", SortOrder::Asc)
917 .offset(1)
918 .limit(1),
919 )
920 .await
921 .unwrap();
922 assert_eq!(results.len(), 1);
923 assert_eq!(results[0].data["amount"], serde_json::json!(200));
924 }
925}