Skip to main content

chainindex_core/
entity.rs

1//! Entity system — structured storage for indexed blockchain data.
2//!
3//! Entities are typed records (like database rows) that handlers insert
4//! during indexing. The entity system provides:
5//! - Schema definition (field names, types, indexes)
6//! - Insert, upsert, delete operations
7//! - Query with filters
8//! - Automatic rollback on reorg (delete entities above fork block)
9//!
10//! # Example
11//!
12//! ```rust
13//! use chainindex_core::entity::{EntitySchemaBuilder, FieldType};
14//!
15//! let schema = EntitySchemaBuilder::new("erc20_transfer")
16//!     .primary_key("id")
17//!     .field("from", FieldType::String, true)
18//!     .field("to", FieldType::String, true)
19//!     .field("amount", FieldType::Uint64, false)
20//!     .nullable_field("memo", FieldType::String, false)
21//!     .build();
22//!
23//! assert_eq!(schema.name, "erc20_transfer");
24//! assert_eq!(schema.primary_key, "id");
25//! assert_eq!(schema.fields.len(), 4);
26//! ```
27
28use std::collections::HashMap;
29use std::sync::Mutex;
30
31use serde::{Deserialize, Serialize};
32
33use crate::error::IndexerError;
34
35// ─── Field Types ─────────────────────────────────────────────────────────────
36
37/// Supported field types for entity schemas.
38///
39/// These map to database column types in concrete storage backends.
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub enum FieldType {
42    /// UTF-8 string (TEXT in SQL).
43    String,
44    /// Signed 64-bit integer (BIGINT in SQL).
45    Int64,
46    /// Unsigned 64-bit integer (stored as BIGINT in most backends).
47    Uint64,
48    /// 64-bit floating point (DOUBLE/REAL in SQL).
49    Float64,
50    /// Boolean (BOOLEAN in SQL).
51    Bool,
52    /// Arbitrary JSON value (JSONB in Postgres, TEXT in SQLite).
53    Json,
54    /// Raw byte data (BYTEA in Postgres, BLOB in SQLite).
55    Bytes,
56}
57
58// ─── EntityField ─────────────────────────────────────────────────────────────
59
60/// A single field in an entity schema.
61///
62/// Describes the name, type, and indexing behavior of one column.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct EntityField {
65    /// The field name (column name).
66    pub name: std::string::String,
67    /// The field type.
68    pub field_type: FieldType,
69    /// Whether a database index should be created on this field.
70    pub indexed: bool,
71    /// Whether the field can be NULL.
72    pub nullable: bool,
73}
74
75// ─── EntitySchema ────────────────────────────────────────────────────────────
76
77/// Schema definition for an entity.
78///
79/// Every entity automatically has a `block_number` field (u64) used for
80/// reorg rollback via [`EntityStore::delete_after_block`].
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct EntitySchema {
83    /// The entity/table name (e.g. `"erc20_transfer"`).
84    pub name: std::string::String,
85    /// The field name used as the primary key.
86    pub primary_key: std::string::String,
87    /// The fields in this entity.
88    pub fields: Vec<EntityField>,
89    // block_number is always implicit for reorg rollback.
90}
91
92// ─── EntityRow ───────────────────────────────────────────────────────────────
93
94/// A single row in the entity store (dynamic key-value).
95///
96/// The `data` map holds all non-system fields as JSON values.
97/// System fields (`id`, `entity_type`, `block_number`, `tx_hash`, `log_index`)
98/// are stored as top-level fields for efficient filtering.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EntityRow {
101    /// Primary key value.
102    pub id: std::string::String,
103    /// The entity schema name (table name).
104    pub entity_type: std::string::String,
105    /// Block number where this entity was created/updated.
106    pub block_number: u64,
107    /// Transaction hash that produced this entity.
108    pub tx_hash: std::string::String,
109    /// Log index within the block.
110    pub log_index: u32,
111    /// User-defined field data.
112    pub data: HashMap<std::string::String, serde_json::Value>,
113}
114
115// ─── Query Types ─────────────────────────────────────────────────────────────
116
117/// Query filter for entities.
118///
119/// Build queries using the builder methods:
120///
121/// ```rust
122/// use chainindex_core::entity::{EntityQuery, QueryFilter, SortOrder};
123///
124/// let query = EntityQuery::new("erc20_transfer")
125///     .filter(QueryFilter::Eq("from".into(), serde_json::json!("0xAlice")))
126///     .order_by("block_number", SortOrder::Desc)
127///     .limit(10);
128/// ```
129#[derive(Debug, Clone, Default)]
130pub struct EntityQuery {
131    /// The entity type to query.
132    pub entity_type: std::string::String,
133    /// Filters to apply.
134    pub filters: Vec<QueryFilter>,
135    /// Sort order.
136    pub order_by: Option<(std::string::String, SortOrder)>,
137    /// Maximum number of results.
138    pub limit: Option<usize>,
139    /// Number of results to skip.
140    pub offset: Option<usize>,
141}
142
143impl EntityQuery {
144    /// Create a new query for the given entity type.
145    pub fn new(entity_type: impl Into<std::string::String>) -> Self {
146        Self {
147            entity_type: entity_type.into(),
148            filters: Vec::new(),
149            order_by: None,
150            limit: None,
151            offset: None,
152        }
153    }
154
155    /// Add a filter to the query.
156    pub fn filter(mut self, f: QueryFilter) -> Self {
157        self.filters.push(f);
158        self
159    }
160
161    /// Set the sort order.
162    pub fn order_by(mut self, field: impl Into<std::string::String>, order: SortOrder) -> Self {
163        self.order_by = Some((field.into(), order));
164        self
165    }
166
167    /// Set the maximum number of results.
168    pub fn limit(mut self, n: usize) -> Self {
169        self.limit = Some(n);
170        self
171    }
172
173    /// Set the offset (number of results to skip).
174    pub fn offset(mut self, n: usize) -> Self {
175        self.offset = Some(n);
176        self
177    }
178}
179
180/// A single filter predicate for an entity query.
181#[derive(Debug, Clone)]
182pub enum QueryFilter {
183    /// Field equals value.
184    Eq(std::string::String, serde_json::Value),
185    /// Field is greater than value.
186    Gt(std::string::String, serde_json::Value),
187    /// Field is less than value.
188    Lt(std::string::String, serde_json::Value),
189    /// Field is greater than or equal to value.
190    Gte(std::string::String, serde_json::Value),
191    /// Field is less than or equal to value.
192    Lte(std::string::String, serde_json::Value),
193    /// Field is one of the given values.
194    In(std::string::String, Vec<serde_json::Value>),
195    /// Field is between two values (inclusive).
196    Between(std::string::String, serde_json::Value, serde_json::Value),
197}
198
199/// Sort order for query results.
200#[derive(Debug, Clone, Copy)]
201pub enum SortOrder {
202    /// Ascending order.
203    Asc,
204    /// Descending order.
205    Desc,
206}
207
208// ─── EntityStore trait ───────────────────────────────────────────────────────
209
210/// Trait for entity storage backends.
211///
212/// Backends (memory, SQLite, Postgres) implement this trait to provide
213/// structured entity storage.
214#[async_trait::async_trait]
215pub trait EntityStore: Send + Sync {
216    /// Register (create) a schema. Backends may create tables/collections.
217    async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError>;
218
219    /// Insert a new entity row. Errors if a row with the same primary key exists.
220    async fn insert(&self, row: EntityRow) -> Result<(), IndexerError>;
221
222    /// Insert or update an entity row. If a row with the same primary key exists,
223    /// it is replaced.
224    async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError>;
225
226    /// Delete a single entity by type and primary key.
227    async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError>;
228
229    /// Delete all entities of the given type with `block_number > block_number`.
230    /// Used during reorg rollback. Returns the number of deleted rows.
231    async fn delete_after_block(
232        &self,
233        entity_type: &str,
234        block_number: u64,
235    ) -> Result<u64, IndexerError>;
236
237    /// Query entities with filters, sorting, and pagination.
238    async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError>;
239
240    /// Count entities of the given type.
241    async fn count(&self, entity_type: &str) -> Result<u64, IndexerError>;
242}
243
244// ─── EntitySchemaBuilder ─────────────────────────────────────────────────────
245
246/// Fluent builder for [`EntitySchema`].
247///
248/// # Example
249///
250/// ```rust
251/// use chainindex_core::entity::{EntitySchemaBuilder, FieldType};
252///
253/// let schema = EntitySchemaBuilder::new("swap")
254///     .primary_key("id")
255///     .field("pair", FieldType::String, true)
256///     .field("amount_in", FieldType::Uint64, false)
257///     .field("amount_out", FieldType::Uint64, false)
258///     .nullable_field("memo", FieldType::String, false)
259///     .build();
260/// ```
261pub struct EntitySchemaBuilder {
262    name: std::string::String,
263    primary_key: std::string::String,
264    fields: Vec<EntityField>,
265}
266
267impl EntitySchemaBuilder {
268    /// Create a new builder for an entity with the given name.
269    pub fn new(name: impl Into<std::string::String>) -> Self {
270        Self {
271            name: name.into(),
272            primary_key: "id".to_string(),
273            fields: Vec::new(),
274        }
275    }
276
277    /// Set the primary key field name (default: `"id"`).
278    pub fn primary_key(mut self, pk: impl Into<std::string::String>) -> Self {
279        self.primary_key = pk.into();
280        self
281    }
282
283    /// Add a required (non-nullable) field.
284    pub fn field(
285        mut self,
286        name: impl Into<std::string::String>,
287        field_type: FieldType,
288        indexed: bool,
289    ) -> Self {
290        self.fields.push(EntityField {
291            name: name.into(),
292            field_type,
293            indexed,
294            nullable: false,
295        });
296        self
297    }
298
299    /// Add a nullable field.
300    pub fn nullable_field(
301        mut self,
302        name: impl Into<std::string::String>,
303        field_type: FieldType,
304        indexed: bool,
305    ) -> Self {
306        self.fields.push(EntityField {
307            name: name.into(),
308            field_type,
309            indexed,
310            nullable: true,
311        });
312        self
313    }
314
315    /// Build the [`EntitySchema`].
316    pub fn build(self) -> EntitySchema {
317        EntitySchema {
318            name: self.name,
319            primary_key: self.primary_key,
320            fields: self.fields,
321        }
322    }
323}
324
325// ─── MemoryEntityStore ───────────────────────────────────────────────────────
326
327/// In-memory entity store for testing and development.
328///
329/// Stores entities in a `HashMap<(entity_type, id), EntityRow>` behind
330/// a `Mutex`. Not suitable for production (no persistence).
331pub struct MemoryEntityStore {
332    /// Registered schemas: entity_type -> EntitySchema.
333    schemas: Mutex<HashMap<std::string::String, EntitySchema>>,
334    /// All stored rows: (entity_type, id) -> EntityRow.
335    rows: Mutex<HashMap<(std::string::String, std::string::String), EntityRow>>,
336}
337
338impl MemoryEntityStore {
339    /// Create a new empty in-memory entity store.
340    pub fn new() -> Self {
341        Self {
342            schemas: Mutex::new(HashMap::new()),
343            rows: Mutex::new(HashMap::new()),
344        }
345    }
346}
347
348impl Default for MemoryEntityStore {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354/// Check whether a single row matches a query filter.
355fn matches_filter(row: &EntityRow, filter: &QueryFilter) -> bool {
356    match filter {
357        QueryFilter::Eq(field, value) => row.data.get(field) == Some(value),
358        QueryFilter::Gt(field, value) => row
359            .data
360            .get(field)
361            .is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Greater)),
362        QueryFilter::Lt(field, value) => row
363            .data
364            .get(field)
365            .is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Less)),
366        QueryFilter::Gte(field, value) => row.data.get(field).is_some_and(|v| {
367            matches!(
368                json_cmp(v, value),
369                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
370            )
371        }),
372        QueryFilter::Lte(field, value) => row.data.get(field).is_some_and(|v| {
373            matches!(
374                json_cmp(v, value),
375                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
376            )
377        }),
378        QueryFilter::In(field, values) => row.data.get(field).is_some_and(|v| values.contains(v)),
379        QueryFilter::Between(field, low, high) => row.data.get(field).is_some_and(|v| {
380            matches!(
381                json_cmp(v, low),
382                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
383            ) && matches!(
384                json_cmp(v, high),
385                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
386            )
387        }),
388    }
389}
390
391/// Compare two JSON values numerically or lexicographically.
392fn json_cmp(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
393    // Try numeric comparison first.
394    if let (Some(an), Some(bn)) = (a.as_f64(), b.as_f64()) {
395        return an.partial_cmp(&bn);
396    }
397    // Try string comparison.
398    if let (Some(a_str), Some(b_str)) = (a.as_str(), b.as_str()) {
399        return Some(a_str.cmp(b_str));
400    }
401    None
402}
403
404#[async_trait::async_trait]
405impl EntityStore for MemoryEntityStore {
406    async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError> {
407        let mut schemas = self
408            .schemas
409            .lock()
410            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
411        schemas.insert(schema.name.clone(), schema.clone());
412        Ok(())
413    }
414
415    async fn insert(&self, row: EntityRow) -> Result<(), IndexerError> {
416        let mut rows = self
417            .rows
418            .lock()
419            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
420        let key = (row.entity_type.clone(), row.id.clone());
421        if rows.contains_key(&key) {
422            return Err(IndexerError::Storage(format!(
423                "entity '{}' with id '{}' already exists",
424                row.entity_type, row.id
425            )));
426        }
427        rows.insert(key, row);
428        Ok(())
429    }
430
431    async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError> {
432        let mut rows = self
433            .rows
434            .lock()
435            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
436        let key = (row.entity_type.clone(), row.id.clone());
437        rows.insert(key, row);
438        Ok(())
439    }
440
441    async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError> {
442        let mut rows = self
443            .rows
444            .lock()
445            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
446        rows.remove(&(entity_type.to_string(), id.to_string()));
447        Ok(())
448    }
449
450    async fn delete_after_block(
451        &self,
452        entity_type: &str,
453        block_number: u64,
454    ) -> Result<u64, IndexerError> {
455        let mut rows = self
456            .rows
457            .lock()
458            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
459        let to_remove: Vec<_> = rows
460            .iter()
461            .filter(|((et, _), row)| et == entity_type && row.block_number > block_number)
462            .map(|(key, _)| key.clone())
463            .collect();
464        let count = to_remove.len() as u64;
465        for key in to_remove {
466            rows.remove(&key);
467        }
468        Ok(count)
469    }
470
471    async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError> {
472        let rows = self
473            .rows
474            .lock()
475            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
476
477        // Filter by entity_type and all query filters.
478        let mut results: Vec<EntityRow> = rows
479            .values()
480            .filter(|row| {
481                row.entity_type == query.entity_type
482                    && query.filters.iter().all(|f| matches_filter(row, f))
483            })
484            .cloned()
485            .collect();
486
487        // Sort.
488        if let Some((ref field, ref order)) = query.order_by {
489            results.sort_by(|a, b| {
490                let va = a.data.get(field);
491                let vb = b.data.get(field);
492                let cmp = match (va, vb) {
493                    (Some(va), Some(vb)) => json_cmp(va, vb).unwrap_or(std::cmp::Ordering::Equal),
494                    (Some(_), None) => std::cmp::Ordering::Less,
495                    (None, Some(_)) => std::cmp::Ordering::Greater,
496                    (None, None) => std::cmp::Ordering::Equal,
497                };
498                match order {
499                    SortOrder::Asc => cmp,
500                    SortOrder::Desc => cmp.reverse(),
501                }
502            });
503        }
504
505        // Offset.
506        if let Some(offset) = query.offset {
507            if offset < results.len() {
508                results = results.split_off(offset);
509            } else {
510                results.clear();
511            }
512        }
513
514        // Limit.
515        if let Some(limit) = query.limit {
516            results.truncate(limit);
517        }
518
519        Ok(results)
520    }
521
522    async fn count(&self, entity_type: &str) -> Result<u64, IndexerError> {
523        let rows = self
524            .rows
525            .lock()
526            .map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
527        let count = rows
528            .values()
529            .filter(|row| row.entity_type == entity_type)
530            .count() as u64;
531        Ok(count)
532    }
533}
534
535// ─── Tests ───────────────────────────────────────────────────────────────────
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    fn test_schema() -> EntitySchema {
542        EntitySchemaBuilder::new("transfer")
543            .primary_key("id")
544            .field("from", FieldType::String, true)
545            .field("to", FieldType::String, true)
546            .field("amount", FieldType::Uint64, false)
547            .nullable_field("memo", FieldType::String, false)
548            .build()
549    }
550
551    fn make_row(id: &str, from: &str, to: &str, amount: u64, block: u64) -> EntityRow {
552        let mut data = HashMap::new();
553        data.insert("from".to_string(), serde_json::json!(from));
554        data.insert("to".to_string(), serde_json::json!(to));
555        data.insert("amount".to_string(), serde_json::json!(amount));
556        EntityRow {
557            id: id.to_string(),
558            entity_type: "transfer".to_string(),
559            block_number: block,
560            tx_hash: format!("0xtx_{id}"),
561            log_index: 0,
562            data,
563        }
564    }
565
566    #[tokio::test]
567    async fn register_schema() {
568        let store = MemoryEntityStore::new();
569        let schema = test_schema();
570        store.register_schema(&schema).await.unwrap();
571        // Re-registering should overwrite without error.
572        store.register_schema(&schema).await.unwrap();
573    }
574
575    #[tokio::test]
576    async fn insert_and_query() {
577        let store = MemoryEntityStore::new();
578        store.register_schema(&test_schema()).await.unwrap();
579
580        let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
581        store.insert(row).await.unwrap();
582
583        let results = store.query(EntityQuery::new("transfer")).await.unwrap();
584        assert_eq!(results.len(), 1);
585        assert_eq!(results[0].id, "t1");
586    }
587
588    #[tokio::test]
589    async fn insert_duplicate_fails() {
590        let store = MemoryEntityStore::new();
591        store.register_schema(&test_schema()).await.unwrap();
592
593        let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
594        store.insert(row.clone()).await.unwrap();
595
596        let err = store.insert(row).await.unwrap_err();
597        let msg = format!("{err}");
598        assert!(msg.contains("already exists"), "got: {msg}");
599    }
600
601    #[tokio::test]
602    async fn upsert_overwrites() {
603        let store = MemoryEntityStore::new();
604        store.register_schema(&test_schema()).await.unwrap();
605
606        let row1 = make_row("t1", "0xAlice", "0xBob", 100, 10);
607        store.insert(row1).await.unwrap();
608
609        // Upsert with different amount.
610        let row2 = make_row("t1", "0xAlice", "0xBob", 200, 11);
611        store.upsert(row2).await.unwrap();
612
613        let results = store.query(EntityQuery::new("transfer")).await.unwrap();
614        assert_eq!(results.len(), 1);
615        assert_eq!(results[0].data["amount"], serde_json::json!(200));
616        assert_eq!(results[0].block_number, 11);
617    }
618
619    #[tokio::test]
620    async fn delete_entity() {
621        let store = MemoryEntityStore::new();
622        store.register_schema(&test_schema()).await.unwrap();
623
624        store
625            .insert(make_row("t1", "0xA", "0xB", 100, 10))
626            .await
627            .unwrap();
628        store
629            .insert(make_row("t2", "0xA", "0xC", 200, 11))
630            .await
631            .unwrap();
632
633        store.delete("transfer", "t1").await.unwrap();
634
635        let count = store.count("transfer").await.unwrap();
636        assert_eq!(count, 1);
637
638        let results = store.query(EntityQuery::new("transfer")).await.unwrap();
639        assert_eq!(results[0].id, "t2");
640    }
641
642    #[tokio::test]
643    async fn delete_after_block_for_reorg() {
644        let store = MemoryEntityStore::new();
645        store.register_schema(&test_schema()).await.unwrap();
646
647        store
648            .insert(make_row("t1", "0xA", "0xB", 100, 10))
649            .await
650            .unwrap();
651        store
652            .insert(make_row("t2", "0xA", "0xC", 200, 11))
653            .await
654            .unwrap();
655        store
656            .insert(make_row("t3", "0xA", "0xD", 300, 12))
657            .await
658            .unwrap();
659        store
660            .insert(make_row("t4", "0xA", "0xE", 400, 13))
661            .await
662            .unwrap();
663
664        // Reorg: delete everything after block 11.
665        let deleted = store.delete_after_block("transfer", 11).await.unwrap();
666        assert_eq!(deleted, 2); // t3 (12) and t4 (13)
667
668        let count = store.count("transfer").await.unwrap();
669        assert_eq!(count, 2); // t1 (10) and t2 (11) remain
670    }
671
672    #[tokio::test]
673    async fn query_with_eq_filter() {
674        let store = MemoryEntityStore::new();
675        store.register_schema(&test_schema()).await.unwrap();
676
677        store
678            .insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
679            .await
680            .unwrap();
681        store
682            .insert(make_row("t2", "0xAlice", "0xCharlie", 200, 11))
683            .await
684            .unwrap();
685        store
686            .insert(make_row("t3", "0xBob", "0xCharlie", 300, 12))
687            .await
688            .unwrap();
689
690        let results = store
691            .query(
692                EntityQuery::new("transfer")
693                    .filter(QueryFilter::Eq("from".into(), serde_json::json!("0xAlice"))),
694            )
695            .await
696            .unwrap();
697        assert_eq!(results.len(), 2);
698        assert!(results
699            .iter()
700            .all(|r| r.data["from"] == serde_json::json!("0xAlice")));
701    }
702
703    #[tokio::test]
704    async fn query_with_gt_lt_filters() {
705        let store = MemoryEntityStore::new();
706        store.register_schema(&test_schema()).await.unwrap();
707
708        store
709            .insert(make_row("t1", "0xA", "0xB", 100, 10))
710            .await
711            .unwrap();
712        store
713            .insert(make_row("t2", "0xA", "0xC", 200, 11))
714            .await
715            .unwrap();
716        store
717            .insert(make_row("t3", "0xA", "0xD", 300, 12))
718            .await
719            .unwrap();
720
721        // amount > 100 AND amount < 300 => only t2 (200)
722        let results = store
723            .query(
724                EntityQuery::new("transfer")
725                    .filter(QueryFilter::Gt("amount".into(), serde_json::json!(100)))
726                    .filter(QueryFilter::Lt("amount".into(), serde_json::json!(300))),
727            )
728            .await
729            .unwrap();
730        assert_eq!(results.len(), 1);
731        assert_eq!(results[0].id, "t2");
732    }
733
734    #[tokio::test]
735    async fn query_with_in_filter() {
736        let store = MemoryEntityStore::new();
737        store.register_schema(&test_schema()).await.unwrap();
738
739        store
740            .insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
741            .await
742            .unwrap();
743        store
744            .insert(make_row("t2", "0xBob", "0xCharlie", 200, 11))
745            .await
746            .unwrap();
747        store
748            .insert(make_row("t3", "0xDave", "0xEve", 300, 12))
749            .await
750            .unwrap();
751
752        let results = store
753            .query(EntityQuery::new("transfer").filter(QueryFilter::In(
754                "from".into(),
755                vec![serde_json::json!("0xAlice"), serde_json::json!("0xDave")],
756            )))
757            .await
758            .unwrap();
759        assert_eq!(results.len(), 2);
760    }
761
762    #[tokio::test]
763    async fn query_with_sort_and_limit() {
764        let store = MemoryEntityStore::new();
765        store.register_schema(&test_schema()).await.unwrap();
766
767        store
768            .insert(make_row("t1", "0xA", "0xB", 300, 10))
769            .await
770            .unwrap();
771        store
772            .insert(make_row("t2", "0xA", "0xC", 100, 11))
773            .await
774            .unwrap();
775        store
776            .insert(make_row("t3", "0xA", "0xD", 200, 12))
777            .await
778            .unwrap();
779
780        // Sort by amount ascending, limit 2.
781        let results = store
782            .query(
783                EntityQuery::new("transfer")
784                    .order_by("amount", SortOrder::Asc)
785                    .limit(2),
786            )
787            .await
788            .unwrap();
789        assert_eq!(results.len(), 2);
790        assert_eq!(results[0].data["amount"], serde_json::json!(100));
791        assert_eq!(results[1].data["amount"], serde_json::json!(200));
792    }
793
794    #[tokio::test]
795    async fn query_with_sort_desc() {
796        let store = MemoryEntityStore::new();
797        store.register_schema(&test_schema()).await.unwrap();
798
799        store
800            .insert(make_row("t1", "0xA", "0xB", 100, 10))
801            .await
802            .unwrap();
803        store
804            .insert(make_row("t2", "0xA", "0xC", 300, 11))
805            .await
806            .unwrap();
807        store
808            .insert(make_row("t3", "0xA", "0xD", 200, 12))
809            .await
810            .unwrap();
811
812        let results = store
813            .query(EntityQuery::new("transfer").order_by("amount", SortOrder::Desc))
814            .await
815            .unwrap();
816        assert_eq!(results[0].data["amount"], serde_json::json!(300));
817        assert_eq!(results[1].data["amount"], serde_json::json!(200));
818        assert_eq!(results[2].data["amount"], serde_json::json!(100));
819    }
820
821    #[tokio::test]
822    async fn count_entities() {
823        let store = MemoryEntityStore::new();
824        store.register_schema(&test_schema()).await.unwrap();
825
826        assert_eq!(store.count("transfer").await.unwrap(), 0);
827
828        store
829            .insert(make_row("t1", "0xA", "0xB", 100, 10))
830            .await
831            .unwrap();
832        store
833            .insert(make_row("t2", "0xA", "0xC", 200, 11))
834            .await
835            .unwrap();
836
837        assert_eq!(store.count("transfer").await.unwrap(), 2);
838        // Different entity type returns 0.
839        assert_eq!(store.count("approval").await.unwrap(), 0);
840    }
841
842    #[tokio::test]
843    async fn schema_builder_defaults() {
844        let schema = EntitySchemaBuilder::new("test_entity")
845            .field("name", FieldType::String, true)
846            .field("value", FieldType::Uint64, false)
847            .build();
848
849        assert_eq!(schema.name, "test_entity");
850        assert_eq!(schema.primary_key, "id"); // default primary key
851        assert_eq!(schema.fields.len(), 2);
852        assert!(schema.fields[0].indexed);
853        assert!(!schema.fields[0].nullable);
854        assert!(!schema.fields[1].indexed);
855    }
856
857    #[tokio::test]
858    async fn query_with_between_filter() {
859        let store = MemoryEntityStore::new();
860        store.register_schema(&test_schema()).await.unwrap();
861
862        store
863            .insert(make_row("t1", "0xA", "0xB", 100, 10))
864            .await
865            .unwrap();
866        store
867            .insert(make_row("t2", "0xA", "0xC", 200, 11))
868            .await
869            .unwrap();
870        store
871            .insert(make_row("t3", "0xA", "0xD", 300, 12))
872            .await
873            .unwrap();
874        store
875            .insert(make_row("t4", "0xA", "0xE", 400, 13))
876            .await
877            .unwrap();
878
879        let results = store
880            .query(EntityQuery::new("transfer").filter(QueryFilter::Between(
881                "amount".into(),
882                serde_json::json!(200),
883                serde_json::json!(300),
884            )))
885            .await
886            .unwrap();
887        assert_eq!(results.len(), 2);
888        assert!(results.iter().all(|r| {
889            let amt = r.data["amount"].as_u64().unwrap();
890            (200..=300).contains(&amt)
891        }));
892    }
893
894    #[tokio::test]
895    async fn query_with_offset() {
896        let store = MemoryEntityStore::new();
897        store.register_schema(&test_schema()).await.unwrap();
898
899        store
900            .insert(make_row("t1", "0xA", "0xB", 100, 10))
901            .await
902            .unwrap();
903        store
904            .insert(make_row("t2", "0xA", "0xC", 200, 11))
905            .await
906            .unwrap();
907        store
908            .insert(make_row("t3", "0xA", "0xD", 300, 12))
909            .await
910            .unwrap();
911
912        // Sort ascending by amount, skip first, take 1.
913        let results = store
914            .query(
915                EntityQuery::new("transfer")
916                    .order_by("amount", SortOrder::Asc)
917                    .offset(1)
918                    .limit(1),
919            )
920            .await
921            .unwrap();
922        assert_eq!(results.len(), 1);
923        assert_eq!(results[0].data["amount"], serde_json::json!(200));
924    }
925}