use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use crate::error::IndexerError;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FieldType {
String,
Int64,
Uint64,
Float64,
Bool,
Json,
Bytes,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntityField {
pub name: std::string::String,
pub field_type: FieldType,
pub indexed: bool,
pub nullable: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntitySchema {
pub name: std::string::String,
pub primary_key: std::string::String,
pub fields: Vec<EntityField>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntityRow {
pub id: std::string::String,
pub entity_type: std::string::String,
pub block_number: u64,
pub tx_hash: std::string::String,
pub log_index: u32,
pub data: HashMap<std::string::String, serde_json::Value>,
}
#[derive(Debug, Clone, Default)]
pub struct EntityQuery {
pub entity_type: std::string::String,
pub filters: Vec<QueryFilter>,
pub order_by: Option<(std::string::String, SortOrder)>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
impl EntityQuery {
pub fn new(entity_type: impl Into<std::string::String>) -> Self {
Self {
entity_type: entity_type.into(),
filters: Vec::new(),
order_by: None,
limit: None,
offset: None,
}
}
pub fn filter(mut self, f: QueryFilter) -> Self {
self.filters.push(f);
self
}
pub fn order_by(mut self, field: impl Into<std::string::String>, order: SortOrder) -> Self {
self.order_by = Some((field.into(), order));
self
}
pub fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn offset(mut self, n: usize) -> Self {
self.offset = Some(n);
self
}
}
#[derive(Debug, Clone)]
pub enum QueryFilter {
Eq(std::string::String, serde_json::Value),
Gt(std::string::String, serde_json::Value),
Lt(std::string::String, serde_json::Value),
Gte(std::string::String, serde_json::Value),
Lte(std::string::String, serde_json::Value),
In(std::string::String, Vec<serde_json::Value>),
Between(std::string::String, serde_json::Value, serde_json::Value),
}
#[derive(Debug, Clone, Copy)]
pub enum SortOrder {
Asc,
Desc,
}
#[async_trait::async_trait]
pub trait EntityStore: Send + Sync {
async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError>;
async fn insert(&self, row: EntityRow) -> Result<(), IndexerError>;
async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError>;
async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError>;
async fn delete_after_block(
&self,
entity_type: &str,
block_number: u64,
) -> Result<u64, IndexerError>;
async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError>;
async fn count(&self, entity_type: &str) -> Result<u64, IndexerError>;
}
pub struct EntitySchemaBuilder {
name: std::string::String,
primary_key: std::string::String,
fields: Vec<EntityField>,
}
impl EntitySchemaBuilder {
pub fn new(name: impl Into<std::string::String>) -> Self {
Self {
name: name.into(),
primary_key: "id".to_string(),
fields: Vec::new(),
}
}
pub fn primary_key(mut self, pk: impl Into<std::string::String>) -> Self {
self.primary_key = pk.into();
self
}
pub fn field(
mut self,
name: impl Into<std::string::String>,
field_type: FieldType,
indexed: bool,
) -> Self {
self.fields.push(EntityField {
name: name.into(),
field_type,
indexed,
nullable: false,
});
self
}
pub fn nullable_field(
mut self,
name: impl Into<std::string::String>,
field_type: FieldType,
indexed: bool,
) -> Self {
self.fields.push(EntityField {
name: name.into(),
field_type,
indexed,
nullable: true,
});
self
}
pub fn build(self) -> EntitySchema {
EntitySchema {
name: self.name,
primary_key: self.primary_key,
fields: self.fields,
}
}
}
pub struct MemoryEntityStore {
schemas: Mutex<HashMap<std::string::String, EntitySchema>>,
rows: Mutex<HashMap<(std::string::String, std::string::String), EntityRow>>,
}
impl MemoryEntityStore {
pub fn new() -> Self {
Self {
schemas: Mutex::new(HashMap::new()),
rows: Mutex::new(HashMap::new()),
}
}
}
impl Default for MemoryEntityStore {
fn default() -> Self {
Self::new()
}
}
fn matches_filter(row: &EntityRow, filter: &QueryFilter) -> bool {
match filter {
QueryFilter::Eq(field, value) => row.data.get(field) == Some(value),
QueryFilter::Gt(field, value) => row
.data
.get(field)
.is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Greater)),
QueryFilter::Lt(field, value) => row
.data
.get(field)
.is_some_and(|v| json_cmp(v, value) == Some(std::cmp::Ordering::Less)),
QueryFilter::Gte(field, value) => row.data.get(field).is_some_and(|v| {
matches!(
json_cmp(v, value),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
)
}),
QueryFilter::Lte(field, value) => row.data.get(field).is_some_and(|v| {
matches!(
json_cmp(v, value),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
)
}),
QueryFilter::In(field, values) => row.data.get(field).is_some_and(|v| values.contains(v)),
QueryFilter::Between(field, low, high) => row.data.get(field).is_some_and(|v| {
matches!(
json_cmp(v, low),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
) && matches!(
json_cmp(v, high),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
)
}),
}
}
fn json_cmp(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
if let (Some(an), Some(bn)) = (a.as_f64(), b.as_f64()) {
return an.partial_cmp(&bn);
}
if let (Some(a_str), Some(b_str)) = (a.as_str(), b.as_str()) {
return Some(a_str.cmp(b_str));
}
None
}
#[async_trait::async_trait]
impl EntityStore for MemoryEntityStore {
async fn register_schema(&self, schema: &EntitySchema) -> Result<(), IndexerError> {
let mut schemas = self
.schemas
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
schemas.insert(schema.name.clone(), schema.clone());
Ok(())
}
async fn insert(&self, row: EntityRow) -> Result<(), IndexerError> {
let mut rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
let key = (row.entity_type.clone(), row.id.clone());
if rows.contains_key(&key) {
return Err(IndexerError::Storage(format!(
"entity '{}' with id '{}' already exists",
row.entity_type, row.id
)));
}
rows.insert(key, row);
Ok(())
}
async fn upsert(&self, row: EntityRow) -> Result<(), IndexerError> {
let mut rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
let key = (row.entity_type.clone(), row.id.clone());
rows.insert(key, row);
Ok(())
}
async fn delete(&self, entity_type: &str, id: &str) -> Result<(), IndexerError> {
let mut rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
rows.remove(&(entity_type.to_string(), id.to_string()));
Ok(())
}
async fn delete_after_block(
&self,
entity_type: &str,
block_number: u64,
) -> Result<u64, IndexerError> {
let mut rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
let to_remove: Vec<_> = rows
.iter()
.filter(|((et, _), row)| et == entity_type && row.block_number > block_number)
.map(|(key, _)| key.clone())
.collect();
let count = to_remove.len() as u64;
for key in to_remove {
rows.remove(&key);
}
Ok(count)
}
async fn query(&self, query: EntityQuery) -> Result<Vec<EntityRow>, IndexerError> {
let rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
let mut results: Vec<EntityRow> = rows
.values()
.filter(|row| {
row.entity_type == query.entity_type
&& query.filters.iter().all(|f| matches_filter(row, f))
})
.cloned()
.collect();
if let Some((ref field, ref order)) = query.order_by {
results.sort_by(|a, b| {
let va = a.data.get(field);
let vb = b.data.get(field);
let cmp = match (va, vb) {
(Some(va), Some(vb)) => json_cmp(va, vb).unwrap_or(std::cmp::Ordering::Equal),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
};
match order {
SortOrder::Asc => cmp,
SortOrder::Desc => cmp.reverse(),
}
});
}
if let Some(offset) = query.offset {
if offset < results.len() {
results = results.split_off(offset);
} else {
results.clear();
}
}
if let Some(limit) = query.limit {
results.truncate(limit);
}
Ok(results)
}
async fn count(&self, entity_type: &str) -> Result<u64, IndexerError> {
let rows = self
.rows
.lock()
.map_err(|e| IndexerError::Storage(format!("lock poisoned: {e}")))?;
let count = rows
.values()
.filter(|row| row.entity_type == entity_type)
.count() as u64;
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_schema() -> EntitySchema {
EntitySchemaBuilder::new("transfer")
.primary_key("id")
.field("from", FieldType::String, true)
.field("to", FieldType::String, true)
.field("amount", FieldType::Uint64, false)
.nullable_field("memo", FieldType::String, false)
.build()
}
fn make_row(id: &str, from: &str, to: &str, amount: u64, block: u64) -> EntityRow {
let mut data = HashMap::new();
data.insert("from".to_string(), serde_json::json!(from));
data.insert("to".to_string(), serde_json::json!(to));
data.insert("amount".to_string(), serde_json::json!(amount));
EntityRow {
id: id.to_string(),
entity_type: "transfer".to_string(),
block_number: block,
tx_hash: format!("0xtx_{id}"),
log_index: 0,
data,
}
}
#[tokio::test]
async fn register_schema() {
let store = MemoryEntityStore::new();
let schema = test_schema();
store.register_schema(&schema).await.unwrap();
store.register_schema(&schema).await.unwrap();
}
#[tokio::test]
async fn insert_and_query() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
store.insert(row).await.unwrap();
let results = store.query(EntityQuery::new("transfer")).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, "t1");
}
#[tokio::test]
async fn insert_duplicate_fails() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
let row = make_row("t1", "0xAlice", "0xBob", 100, 10);
store.insert(row.clone()).await.unwrap();
let err = store.insert(row).await.unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("already exists"), "got: {msg}");
}
#[tokio::test]
async fn upsert_overwrites() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
let row1 = make_row("t1", "0xAlice", "0xBob", 100, 10);
store.insert(row1).await.unwrap();
let row2 = make_row("t1", "0xAlice", "0xBob", 200, 11);
store.upsert(row2).await.unwrap();
let results = store.query(EntityQuery::new("transfer")).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].data["amount"], serde_json::json!(200));
assert_eq!(results[0].block_number, 11);
}
#[tokio::test]
async fn delete_entity() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
store.delete("transfer", "t1").await.unwrap();
let count = store.count("transfer").await.unwrap();
assert_eq!(count, 1);
let results = store.query(EntityQuery::new("transfer")).await.unwrap();
assert_eq!(results[0].id, "t2");
}
#[tokio::test]
async fn delete_after_block_for_reorg() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 300, 12))
.await
.unwrap();
store
.insert(make_row("t4", "0xA", "0xE", 400, 13))
.await
.unwrap();
let deleted = store.delete_after_block("transfer", 11).await.unwrap();
assert_eq!(deleted, 2);
let count = store.count("transfer").await.unwrap();
assert_eq!(count, 2); }
#[tokio::test]
async fn query_with_eq_filter() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xAlice", "0xCharlie", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xBob", "0xCharlie", 300, 12))
.await
.unwrap();
let results = store
.query(
EntityQuery::new("transfer")
.filter(QueryFilter::Eq("from".into(), serde_json::json!("0xAlice"))),
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results
.iter()
.all(|r| r.data["from"] == serde_json::json!("0xAlice")));
}
#[tokio::test]
async fn query_with_gt_lt_filters() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 300, 12))
.await
.unwrap();
let results = store
.query(
EntityQuery::new("transfer")
.filter(QueryFilter::Gt("amount".into(), serde_json::json!(100)))
.filter(QueryFilter::Lt("amount".into(), serde_json::json!(300))),
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, "t2");
}
#[tokio::test]
async fn query_with_in_filter() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xAlice", "0xBob", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xBob", "0xCharlie", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xDave", "0xEve", 300, 12))
.await
.unwrap();
let results = store
.query(EntityQuery::new("transfer").filter(QueryFilter::In(
"from".into(),
vec![serde_json::json!("0xAlice"), serde_json::json!("0xDave")],
)))
.await
.unwrap();
assert_eq!(results.len(), 2);
}
#[tokio::test]
async fn query_with_sort_and_limit() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 300, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 100, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 200, 12))
.await
.unwrap();
let results = store
.query(
EntityQuery::new("transfer")
.order_by("amount", SortOrder::Asc)
.limit(2),
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].data["amount"], serde_json::json!(100));
assert_eq!(results[1].data["amount"], serde_json::json!(200));
}
#[tokio::test]
async fn query_with_sort_desc() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 300, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 200, 12))
.await
.unwrap();
let results = store
.query(EntityQuery::new("transfer").order_by("amount", SortOrder::Desc))
.await
.unwrap();
assert_eq!(results[0].data["amount"], serde_json::json!(300));
assert_eq!(results[1].data["amount"], serde_json::json!(200));
assert_eq!(results[2].data["amount"], serde_json::json!(100));
}
#[tokio::test]
async fn count_entities() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
assert_eq!(store.count("transfer").await.unwrap(), 0);
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
assert_eq!(store.count("transfer").await.unwrap(), 2);
assert_eq!(store.count("approval").await.unwrap(), 0);
}
#[tokio::test]
async fn schema_builder_defaults() {
let schema = EntitySchemaBuilder::new("test_entity")
.field("name", FieldType::String, true)
.field("value", FieldType::Uint64, false)
.build();
assert_eq!(schema.name, "test_entity");
assert_eq!(schema.primary_key, "id"); assert_eq!(schema.fields.len(), 2);
assert!(schema.fields[0].indexed);
assert!(!schema.fields[0].nullable);
assert!(!schema.fields[1].indexed);
}
#[tokio::test]
async fn query_with_between_filter() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 300, 12))
.await
.unwrap();
store
.insert(make_row("t4", "0xA", "0xE", 400, 13))
.await
.unwrap();
let results = store
.query(EntityQuery::new("transfer").filter(QueryFilter::Between(
"amount".into(),
serde_json::json!(200),
serde_json::json!(300),
)))
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results.iter().all(|r| {
let amt = r.data["amount"].as_u64().unwrap();
(200..=300).contains(&amt)
}));
}
#[tokio::test]
async fn query_with_offset() {
let store = MemoryEntityStore::new();
store.register_schema(&test_schema()).await.unwrap();
store
.insert(make_row("t1", "0xA", "0xB", 100, 10))
.await
.unwrap();
store
.insert(make_row("t2", "0xA", "0xC", 200, 11))
.await
.unwrap();
store
.insert(make_row("t3", "0xA", "0xD", 300, 12))
.await
.unwrap();
let results = store
.query(
EntityQuery::new("transfer")
.order_by("amount", SortOrder::Asc)
.offset(1)
.limit(1),
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].data["amount"], serde_json::json!(200));
}
}