use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::ops::Bound::{Excluded, Included, Unbounded};
use crate::storage::schema::{value_to_canonical_key, CanonicalKey, CanonicalKeyFamily, Value};
use crate::storage::unified::bitmap_index::BitmapIndexManager;
use crate::storage::unified::entity::EntityId;
use crate::storage::unified::hash_index::{HashIndexConfig, HashIndexManager};
use crate::storage::unified::spatial_index::SpatialIndexManager;
enum CanonicalizedValue {
Exact(CanonicalKey),
Unsupported,
}
fn read_unpoisoned<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
lock.read()
}
fn write_unpoisoned<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
lock.write()
}
pub struct SortedColumnIndex {
entries: BTreeMap<CanonicalKey, Vec<EntityId>>,
range_family: Option<CanonicalKeyFamily>,
has_mixed_families: bool,
families: BTreeSet<CanonicalKeyFamily>,
}
impl SortedColumnIndex {
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
range_family: None,
has_mixed_families: false,
families: BTreeSet::new(),
}
}
pub fn insert(&mut self, key: CanonicalKey, entity_id: EntityId) {
self.families.insert(key.family());
match self.range_family {
Some(existing) if existing != key.family() => self.has_mixed_families = true,
None => self.range_family = Some(key.family()),
_ => {}
}
self.entries.entry(key).or_default().push(entity_id);
}
fn range_enabled(&self, family: CanonicalKeyFamily) -> bool {
!self.has_mixed_families && self.range_family == Some(family)
}
pub fn supports_range_key(&self, key: &CanonicalKey) -> bool {
self.range_enabled(key.family())
}
pub fn supports_mixed_integral_ranges(&self) -> bool {
!self.families.is_empty()
&& self.families.iter().all(|family| {
matches!(
family,
CanonicalKeyFamily::Integer | CanonicalKeyFamily::UnsignedInteger
)
})
}
pub fn range(&self, low: CanonicalKey, high: CanonicalKey) -> Option<Vec<EntityId>> {
if !self.range_enabled(low.family()) || low.family() != high.family() {
return None;
}
if low > high {
return Some(Vec::new());
}
Some(self.collect_range(low..=high))
}
pub fn greater_than(&self, threshold: CanonicalKey) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range((Excluded(threshold), Unbounded)))
}
pub fn greater_equal(&self, threshold: CanonicalKey) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range((Included(threshold), Unbounded)))
}
pub fn less_than(&self, threshold: CanonicalKey) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range((Unbounded, Excluded(threshold))))
}
pub fn less_equal(&self, threshold: CanonicalKey) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range((Unbounded, Included(threshold))))
}
pub fn len(&self) -> usize {
self.entries.values().map(|v| v.len()).sum()
}
pub fn range_limited(
&self,
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if !self.range_enabled(low.family()) || low.family() != high.family() {
return None;
}
if low > high {
return Some(Vec::new());
}
Some(self.collect_range_limited(low..=high, limit))
}
pub fn range_limited_same_family(
&self,
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if low.family() != high.family() {
return None;
}
if low > high {
return Some(Vec::new());
}
if !self.families.contains(&low.family()) {
return Some(Vec::new());
}
Some(self.collect_range_limited(low..=high, limit))
}
pub fn greater_than_limited(
&self,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range_limited((Excluded(threshold), Unbounded), limit))
}
pub fn greater_equal_limited(
&self,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range_limited((Included(threshold), Unbounded), limit))
}
pub fn less_than_limited(
&self,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range_limited((Unbounded, Excluded(threshold)), limit))
}
pub fn less_equal_limited(
&self,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
if !self.range_enabled(threshold.family()) {
return None;
}
Some(self.collect_range_limited((Unbounded, Included(threshold)), limit))
}
pub fn in_lookup_limited(
&self,
values: &[CanonicalKey],
limit: usize,
) -> Option<Vec<EntityId>> {
let mut sorted_vals = values.to_vec();
sorted_vals.sort_unstable();
sorted_vals.dedup();
let mut result = Vec::with_capacity(limit.min(sorted_vals.len() * 4));
'outer: for key in &sorted_vals {
if let Some(ids) = self.entries.get(key) {
for &id in ids {
result.push(id);
if result.len() >= limit {
break 'outer;
}
}
}
}
Some(result)
}
pub fn in_lookup_limited_filtered_by_set(
&self,
values: &[CanonicalKey],
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let mut sorted_vals = values.to_vec();
sorted_vals.sort_unstable();
sorted_vals.dedup();
let mut result = Vec::with_capacity(limit.min(sorted_vals.len() * 4));
'outer: for key in &sorted_vals {
if let Some(ids) = self.entries.get(key) {
for &id in ids {
if filter_set.contains(&id.raw()) {
result.push(id);
if result.len() >= limit {
break 'outer;
}
}
}
}
}
Some(result)
}
pub fn range_lookup_values<R>(&self, range: R, limit: usize) -> Vec<CanonicalKey>
where
R: std::ops::RangeBounds<CanonicalKey>,
{
self.entries
.range(range)
.take(limit)
.map(|(key, _)| key.clone())
.collect()
}
pub fn in_lookup_values(&self, values: &[CanonicalKey], limit: usize) -> Vec<CanonicalKey> {
let mut sorted_vals = values.to_vec();
sorted_vals.sort_unstable();
sorted_vals.dedup();
let mut result = Vec::with_capacity(sorted_vals.len().min(limit));
for key in sorted_vals {
if self.entries.contains_key(&key) {
result.push(key);
if result.len() >= limit {
break;
}
}
}
result
}
fn collect_range<R>(&self, range: R) -> Vec<EntityId>
where
R: std::ops::RangeBounds<CanonicalKey>,
{
let mut result = Vec::new();
for ids in self.entries.range(range).map(|(_, ids)| ids) {
result.extend_from_slice(ids);
}
result
}
fn collect_range_limited<R>(&self, range: R, limit: usize) -> Vec<EntityId>
where
R: std::ops::RangeBounds<CanonicalKey>,
{
let mut result = Vec::with_capacity(limit.min(512));
'outer: for ids in self.entries.range(range).map(|(_, ids)| ids) {
for &id in ids {
result.push(id);
if result.len() >= limit {
break 'outer;
}
}
}
result
}
pub fn collect_range_filtered_by_set<R>(
&self,
range: R,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>>
where
R: std::ops::RangeBounds<CanonicalKey>,
{
if self.has_mixed_families {
return None;
}
let mut result = Vec::new();
'outer: for ids in self.entries.range(range).map(|(_, ids)| ids) {
for &id in ids {
if filter_set.contains(&id.raw()) {
result.push(id);
if result.len() >= limit {
break 'outer;
}
}
}
}
Some(result)
}
}
pub struct SortedCompositeIndex {
entries: BTreeMap<Vec<CanonicalKey>, Vec<EntityId>>,
}
impl SortedCompositeIndex {
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
pub fn insert(&mut self, key: Vec<CanonicalKey>, entity_id: EntityId) {
self.entries.entry(key).or_default().push(entity_id);
}
pub fn len(&self) -> usize {
self.entries.values().map(|v| v.len()).sum()
}
pub fn prefix_range(
&self,
prefix: &[CanonicalKey],
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Vec<EntityId> {
if limit == 0 {
return Vec::new();
}
let mut low_key = prefix.to_vec();
low_key.push(low);
let mut high_key = prefix.to_vec();
high_key.push(high);
let mut out = Vec::with_capacity(limit.min(128));
for (_, ids) in self.entries.range(low_key..=high_key) {
for id in ids {
out.push(*id);
if out.len() >= limit {
return out;
}
}
}
out
}
pub fn prefix_eq(&self, prefix: &[CanonicalKey], limit: usize) -> Vec<EntityId> {
if limit == 0 || prefix.is_empty() {
return Vec::new();
}
let low = prefix.to_vec();
let mut out = Vec::with_capacity(limit.min(128));
for (k, ids) in self.entries.range(low.clone()..) {
if k.len() < prefix.len() || &k[..prefix.len()] != prefix {
break;
}
for id in ids {
out.push(*id);
if out.len() >= limit {
return out;
}
}
}
out
}
}
pub struct SortedIndexManager {
indices: RwLock<HashMap<(String, String), SortedColumnIndex>>,
composite: RwLock<HashMap<(String, Vec<String>), SortedCompositeIndex>>,
}
impl SortedIndexManager {
pub fn new() -> Self {
Self {
indices: RwLock::new(HashMap::new()),
composite: RwLock::new(HashMap::new()),
}
}
pub fn build_composite(
&self,
collection: &str,
columns: &[String],
entities: &[(EntityId, Vec<(String, Value)>)],
) -> usize {
let mut index = SortedCompositeIndex::new();
let mut count = 0;
'entity: for (eid, fields) in entities {
let mut tuple: Vec<CanonicalKey> = Vec::with_capacity(columns.len());
for col in columns {
let found = fields.iter().find(|(name, _)| name == col);
let key = match found {
Some((_, val)) => match classify_sorted_value(val) {
CanonicalizedValue::Exact(k) => k,
CanonicalizedValue::Unsupported => continue 'entity,
},
None => continue 'entity,
};
tuple.push(key);
}
index.insert(tuple, *eid);
count += 1;
}
write_unpoisoned(&self.composite).insert((collection.to_string(), columns.to_vec()), index);
count
}
pub fn has_composite_index(&self, collection: &str, columns: &[String]) -> bool {
let key = (collection.to_string(), columns.to_vec());
read_unpoisoned(&self.composite).contains_key(&key)
}
pub fn composite_prefix_range_lookup(
&self,
collection: &str,
columns: &[String],
prefix: &[CanonicalKey],
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let guard = read_unpoisoned(&self.composite);
let idx = guard.get(&(collection.to_string(), columns.to_vec()))?;
Some(idx.prefix_range(prefix, low, high, limit))
}
pub fn composite_prefix_eq_lookup(
&self,
collection: &str,
columns: &[String],
prefix: &[CanonicalKey],
limit: usize,
) -> Option<Vec<EntityId>> {
let guard = read_unpoisoned(&self.composite);
let idx = guard.get(&(collection.to_string(), columns.to_vec()))?;
Some(idx.prefix_eq(prefix, limit))
}
pub fn composite_entity_update(
&self,
collection: &str,
columns: &[String],
entity_id: EntityId,
old_fields: &[(String, Value)],
new_fields: &[(String, Value)],
) {
let build_tuple = |fields: &[(String, Value)]| -> Option<Vec<CanonicalKey>> {
let mut tuple = Vec::with_capacity(columns.len());
for col in columns {
let val = fields
.iter()
.find(|(name, _)| name == col)
.map(|(_, v)| v)?;
match classify_sorted_value(val) {
CanonicalizedValue::Exact(k) => tuple.push(k),
CanonicalizedValue::Unsupported => return None,
}
}
Some(tuple)
};
let old_tuple = build_tuple(old_fields);
let new_tuple = build_tuple(new_fields);
if old_tuple == new_tuple {
return;
}
let mut guard = write_unpoisoned(&self.composite);
let idx = match guard.get_mut(&(collection.to_string(), columns.to_vec())) {
Some(i) => i,
None => return,
};
if let Some(old) = old_tuple {
if let Some(ids) = idx.entries.get_mut(&old) {
ids.retain(|id| *id != entity_id);
if ids.is_empty() {
idx.entries.remove(&old);
}
}
}
if let Some(new) = new_tuple {
idx.insert(new, entity_id);
}
}
pub fn composite_entity_insert(
&self,
collection: &str,
entity_id: EntityId,
fields: &[(String, Value)],
) {
let mut guard = write_unpoisoned(&self.composite);
for ((coll, cols), idx) in guard.iter_mut() {
if coll != collection {
continue;
}
let mut tuple = Vec::with_capacity(cols.len());
let mut complete = true;
for col in cols {
let val = fields.iter().find(|(name, _)| name == col).map(|(_, v)| v);
match val.map(classify_sorted_value) {
Some(CanonicalizedValue::Exact(k)) => tuple.push(k),
_ => {
complete = false;
break;
}
}
}
if complete {
idx.insert(tuple, entity_id);
}
}
}
pub fn composite_insert_batch(
&self,
collection: &str,
columns: &[String],
rows: &[(EntityId, Vec<(String, Value)>)],
) {
let mut guard = write_unpoisoned(&self.composite);
let Some(idx) = guard.get_mut(&(collection.to_string(), columns.to_vec())) else {
return;
};
for (entity_id, fields) in rows {
let mut tuple = Vec::with_capacity(columns.len());
let mut complete = true;
for col in columns {
let val = fields.iter().find(|(name, _)| name == col).map(|(_, v)| v);
match val.map(classify_sorted_value) {
Some(CanonicalizedValue::Exact(k)) => tuple.push(k),
_ => {
complete = false;
break;
}
}
}
if complete {
idx.insert(tuple, *entity_id);
}
}
}
pub fn build_index(
&self,
collection: &str,
column: &str,
entities: &[(EntityId, Vec<(String, Value)>)],
) -> usize {
let mut index = SortedColumnIndex::new();
let mut count = 0;
for (eid, fields) in entities {
for (col, val) in fields {
if col == column {
match classify_sorted_value(val) {
CanonicalizedValue::Exact(key) => {
index.insert(key, *eid);
count += 1;
}
CanonicalizedValue::Unsupported => {}
}
}
}
}
write_unpoisoned(&self.indices).insert((collection.to_string(), column.to_string()), index);
count
}
pub fn entry_count(&self, collection: &str, columns: &[String]) -> Option<usize> {
if columns.len() > 1 {
let guard = read_unpoisoned(&self.composite);
return guard
.get(&(collection.to_string(), columns.to_vec()))
.map(SortedCompositeIndex::len);
}
let column = columns.first()?;
let guard = read_unpoisoned(&self.indices);
guard
.get(&(collection.to_string(), column.clone()))
.map(SortedColumnIndex::len)
}
pub(crate) fn range_lookup(
&self,
collection: &str,
column: &str,
low: CanonicalKey,
high: CanonicalKey,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
match indices.get(&key) {
Some(index) => index.range(low, high),
None => None,
}
}
pub(crate) fn gt_lookup(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
match indices.get(&key) {
Some(index) => index.greater_than(threshold),
None => None,
}
}
pub(crate) fn ge_lookup(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
match indices.get(&key) {
Some(index) => index.greater_equal(threshold),
None => None,
}
}
pub(crate) fn lt_lookup(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
match indices.get(&key) {
Some(index) => index.less_than(threshold),
None => None,
}
}
pub(crate) fn le_lookup(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
match indices.get(&key) {
Some(index) => index.less_equal(threshold),
None => None,
}
}
pub(crate) fn range_lookup_limited(
&self,
collection: &str,
column: &str,
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.range_limited(low, high, limit)
}
pub(crate) fn range_lookup_limited_same_family(
&self,
collection: &str,
column: &str,
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices
.get(&key)?
.range_limited_same_family(low, high, limit)
}
pub(crate) fn gt_lookup_limited(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.greater_than_limited(threshold, limit)
}
pub(crate) fn ge_lookup_limited(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.greater_equal_limited(threshold, limit)
}
pub(crate) fn lt_lookup_limited(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.less_than_limited(threshold, limit)
}
pub(crate) fn le_lookup_limited(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.less_equal_limited(threshold, limit)
}
pub(crate) fn range_filtered_by_set(
&self,
collection: &str,
column: &str,
low: CanonicalKey,
high: CanonicalKey,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
if low > high {
return Some(Vec::new());
}
let idx = indices.get(&key)?;
if !idx.supports_range_key(&low) || low.family() != high.family() {
return None;
}
idx.collect_range_filtered_by_set(low..=high, filter_set, limit)
}
pub(crate) fn gt_filtered_by_set(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&threshold) {
return None;
}
idx.collect_range_filtered_by_set((Excluded(threshold), Unbounded), filter_set, limit)
}
pub(crate) fn ge_filtered_by_set(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&threshold) {
return None;
}
idx.collect_range_filtered_by_set((Included(threshold), Unbounded), filter_set, limit)
}
pub(crate) fn lt_filtered_by_set(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&threshold) {
return None;
}
idx.collect_range_filtered_by_set((Unbounded, Excluded(threshold)), filter_set, limit)
}
pub(crate) fn le_filtered_by_set(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&threshold) {
return None;
}
idx.collect_range_filtered_by_set((Unbounded, Included(threshold)), filter_set, limit)
}
pub(crate) fn in_lookup_limited(
&self,
collection: &str,
column: &str,
values: &[CanonicalKey],
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices.get(&key)?.in_lookup_limited(values, limit)
}
pub(crate) fn in_lookup_limited_filtered_by_set(
&self,
collection: &str,
column: &str,
values: &[CanonicalKey],
filter_set: &std::collections::HashSet<u64>,
limit: usize,
) -> Option<Vec<EntityId>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
indices
.get(&key)?
.in_lookup_limited_filtered_by_set(values, filter_set, limit)
}
pub(crate) fn range_lookup_values(
&self,
collection: &str,
column: &str,
low: CanonicalKey,
high: CanonicalKey,
limit: usize,
) -> Option<Vec<CanonicalKey>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&low) || low.family() != high.family() {
return None;
}
Some(idx.range_lookup_values((Included(low), Included(high)), limit))
}
pub(crate) fn compare_lookup_values(
&self,
collection: &str,
column: &str,
threshold: CanonicalKey,
op: &crate::storage::query::ast::CompareOp,
limit: usize,
) -> Option<Vec<CanonicalKey>> {
use crate::storage::query::ast::CompareOp;
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
let idx = indices.get(&key)?;
if !idx.supports_range_key(&threshold) {
return None;
}
let values = match op {
CompareOp::Gt => idx.range_lookup_values((Excluded(threshold), Unbounded), limit),
CompareOp::Ge => idx.range_lookup_values((Included(threshold), Unbounded), limit),
CompareOp::Lt => idx.range_lookup_values((Unbounded, Excluded(threshold)), limit),
CompareOp::Le => idx.range_lookup_values((Unbounded, Included(threshold)), limit),
_ => return None,
};
Some(values)
}
pub(crate) fn in_lookup_values(
&self,
collection: &str,
column: &str,
values: &[CanonicalKey],
limit: usize,
) -> Option<Vec<CanonicalKey>> {
let indices = read_unpoisoned(&self.indices);
let key = (collection.to_string(), column.to_string());
Some(indices.get(&key)?.in_lookup_values(values, limit))
}
pub fn has_index(&self, collection: &str, column: &str) -> bool {
let indices = read_unpoisoned(&self.indices);
indices.contains_key(&(collection.to_string(), column.to_string()))
}
pub fn supports_mixed_integral_ranges(&self, collection: &str, column: &str) -> bool {
let indices = read_unpoisoned(&self.indices);
indices
.get(&(collection.to_string(), column.to_string()))
.is_some_and(SortedColumnIndex::supports_mixed_integral_ranges)
}
pub(crate) fn insert_one(
&self,
collection: &str,
column: &str,
value: &Value,
entity_id: EntityId,
) {
let mut indices = write_unpoisoned(&self.indices);
let k = (collection.to_string(), column.to_string());
if let Some(index) = indices.get_mut(&k) {
match classify_sorted_value(value) {
CanonicalizedValue::Exact(key) => index.insert(key, entity_id),
CanonicalizedValue::Unsupported => {}
}
}
}
pub(crate) fn delete_one(
&self,
collection: &str,
column: &str,
value: &Value,
entity_id: EntityId,
) {
let mut indices = write_unpoisoned(&self.indices);
let k = (collection.to_string(), column.to_string());
if let Some(index) = indices.get_mut(&k) {
let Some(key) = value_to_sorted_key(value) else {
return;
};
if let Some(bucket) = index.entries.get_mut(&key) {
bucket.retain(|id| *id != entity_id);
if bucket.is_empty() {
index.entries.remove(&key);
}
}
}
}
}
fn classify_sorted_value(val: &Value) -> CanonicalizedValue {
match value_to_canonical_key(val) {
Some(key) => CanonicalizedValue::Exact(key),
None => CanonicalizedValue::Unsupported,
}
}
pub(crate) fn value_to_sorted_key(val: &Value) -> Option<CanonicalKey> {
match classify_sorted_value(val) {
CanonicalizedValue::Exact(key) => Some(key),
CanonicalizedValue::Unsupported => None,
}
}
pub(crate) fn sorted_key_to_value(key: CanonicalKey) -> Value {
key.into_value()
}
#[derive(Debug, Clone)]
pub struct RegisteredIndex {
pub name: String,
pub collection: String,
pub columns: Vec<String>,
pub method: IndexMethodKind,
pub unique: bool,
}
impl RegisteredIndex {
pub fn hash_lookup_name(&self) -> std::borrow::Cow<'_, str> {
match self.method {
IndexMethodKind::Hash => std::borrow::Cow::Borrowed(self.name.as_str()),
IndexMethodKind::BTree => std::borrow::Cow::Owned(format!("{}_hash", self.name)),
IndexMethodKind::Bitmap | IndexMethodKind::Spatial => {
std::borrow::Cow::Borrowed(self.name.as_str())
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexMethodKind {
Hash,
Bitmap,
Spatial,
BTree,
}
pub struct IndexStore {
pub hash: HashIndexManager,
pub bitmap: BitmapIndexManager,
pub spatial: SpatialIndexManager,
pub sorted: SortedIndexManager,
registry: RwLock<HashMap<(String, String), RegisteredIndex>>,
}
impl IndexStore {
pub fn new() -> Self {
Self {
hash: HashIndexManager::new(),
bitmap: BitmapIndexManager::new(),
spatial: SpatialIndexManager::new(),
sorted: SortedIndexManager::new(),
registry: RwLock::new(HashMap::new()),
}
}
pub fn create_index(
&self,
name: &str,
collection: &str,
columns: &[String],
method: IndexMethodKind,
unique: bool,
entities: &[(EntityId, Vec<(String, Value)>)],
) -> Result<usize, String> {
let col = columns.first().map(|s| s.as_str()).unwrap_or("");
match method {
IndexMethodKind::Hash => {
self.hash
.create_index(&HashIndexConfig {
name: name.to_string(),
collection: collection.to_string(),
columns: columns.to_vec(),
unique,
})
.map_err(|e| e.to_string())?;
let mut count = 0;
for (entity_id, fields) in entities {
for (field_name, value) in fields {
if field_name == col {
let key = value_to_bytes(value);
self.hash
.insert(collection, name, key, *entity_id)
.map_err(|err| err.to_string())?;
count += 1;
}
}
}
Ok(count)
}
IndexMethodKind::Bitmap => {
self.bitmap.create_index(collection, col);
let mut count = 0;
for (entity_id, fields) in entities {
for (field_name, value) in fields {
if field_name == col {
let key = value_to_bytes(value);
self.bitmap
.insert(collection, col, *entity_id, &key)
.map_err(|err| err.to_string())?;
count += 1;
}
}
}
Ok(count)
}
IndexMethodKind::Spatial => {
self.spatial.create_index(collection, col);
Ok(0)
}
IndexMethodKind::BTree => {
if columns.len() > 1 {
let count = self.sorted.build_composite(collection, columns, entities);
return Ok(count);
}
let count = self.sorted.build_index(collection, col, entities);
self.hash
.create_index(&HashIndexConfig {
name: format!("{name}_hash"),
collection: collection.to_string(),
columns: columns.to_vec(),
unique: false,
})
.map_err(|err| err.to_string())?;
for (entity_id, fields) in entities {
for (field_name, value) in fields {
if field_name == col {
let key = value_to_bytes(value);
self.hash
.insert(collection, &format!("{name}_hash"), key, *entity_id)
.map_err(|err| err.to_string())?;
}
}
}
Ok(count)
}
}
}
pub fn drop_index(&self, name: &str, collection: &str) -> bool {
let mut registry = write_unpoisoned(&self.registry);
let key = (collection.to_string(), name.to_string());
if let Some(info) = registry.remove(&key) {
match info.method {
IndexMethodKind::Hash => self.hash.drop_index(collection, name),
IndexMethodKind::Bitmap => {
let col = info.columns.first().map(|s| s.as_str()).unwrap_or("");
self.bitmap.drop_index(collection, col)
}
IndexMethodKind::Spatial => {
let col = info.columns.first().map(|s| s.as_str()).unwrap_or("");
self.spatial.drop_index(collection, col)
}
IndexMethodKind::BTree => false,
};
true
} else {
false
}
}
pub fn register(&self, info: RegisteredIndex) {
let mut registry = write_unpoisoned(&self.registry);
registry.insert((info.collection.clone(), info.name.clone()), info);
}
pub fn hash_lookup(
&self,
collection: &str,
index_name: &str,
key: &[u8],
) -> Result<Vec<EntityId>, String> {
self.hash
.lookup(collection, index_name, key)
.map_err(|err| err.to_string())
}
pub fn bitmap_lookup(
&self,
collection: &str,
column: &str,
value: &[u8],
) -> Result<Vec<EntityId>, String> {
self.bitmap
.lookup(collection, column, value)
.map_err(|err| err.to_string())
}
pub fn bitmap_count(
&self,
collection: &str,
column: &str,
value: &[u8],
) -> Result<u64, String> {
self.bitmap
.count(collection, column, value)
.map_err(|err| err.to_string())
}
pub fn find_index_for_column(&self, collection: &str, column: &str) -> Option<RegisteredIndex> {
let registry = read_unpoisoned(&self.registry);
registry
.values()
.find(|idx| {
idx.collection == collection
&& idx.columns.len() == 1
&& idx.columns.first().is_some_and(|c| c == column)
})
.cloned()
}
pub fn list_indices(&self, collection: &str) -> Vec<RegisteredIndex> {
let registry = read_unpoisoned(&self.registry);
registry
.values()
.filter(|idx| idx.collection == collection)
.cloned()
.collect()
}
pub fn entries_indexed(&self, index: &RegisteredIndex) -> u64 {
match index.method {
IndexMethodKind::Hash => self
.hash
.index_stats(&index.collection, &index.name)
.map(|stats| stats.total_entries as u64)
.unwrap_or(0),
IndexMethodKind::BTree => self
.sorted
.entry_count(&index.collection, &index.columns)
.unwrap_or(0) as u64,
IndexMethodKind::Bitmap => index
.columns
.first()
.and_then(|column| self.bitmap.index_stats(&index.collection, column).ok())
.map(|stats| stats.entity_count as u64)
.unwrap_or(0),
IndexMethodKind::Spatial => index
.columns
.first()
.and_then(|column| self.spatial.index_stats(&index.collection, column).ok())
.map(|stats| stats.point_count as u64)
.unwrap_or(0),
}
}
pub fn indexed_columns_set(&self, collection: &str) -> std::collections::HashSet<String> {
let registry = read_unpoisoned(&self.registry);
let mut out = std::collections::HashSet::new();
for idx in registry.values() {
if idx.collection == collection {
for col in &idx.columns {
out.insert(col.clone());
}
}
}
out
}
pub fn index_entity_insert_batch(
&self,
collection: &str,
rows: &[(EntityId, Vec<(String, Value)>)],
) -> Result<(), String> {
if rows.is_empty() {
return Ok(());
}
let registry = self.registry.read();
let relevant: Vec<&RegisteredIndex> = registry
.values()
.filter(|idx| idx.collection == collection)
.collect();
if relevant.is_empty() {
return Ok(());
}
for idx in &relevant {
if matches!(idx.method, IndexMethodKind::BTree) && idx.columns.len() > 1 {
self.sorted
.composite_insert_batch(collection, &idx.columns, rows);
continue;
}
let col = idx.columns.first().map(|s| s.as_str()).unwrap_or("");
let btree_hash_name =
matches!(idx.method, IndexMethodKind::BTree).then(|| format!("{}_hash", idx.name));
for (entity_id, fields) in rows {
for (field_name, value) in fields {
if field_name != col {
continue;
}
let key = value_to_bytes(value);
match idx.method {
IndexMethodKind::Hash => {
self.hash
.insert(collection, &idx.name, key, *entity_id)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::Bitmap => {
self.bitmap
.insert(collection, col, *entity_id, &key)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::BTree => {
if !self.sorted.has_index(collection, col) {
return Err(format!(
"sorted index for collection '{collection}' column '{col}' was not found"
));
}
self.sorted.insert_one(collection, col, value, *entity_id);
let hash_name = btree_hash_name.as_deref().unwrap_or("");
self.hash
.insert(collection, hash_name, key, *entity_id)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::Spatial => {}
}
break;
}
}
}
Ok(())
}
pub fn index_entity_insert(
&self,
collection: &str,
entity_id: EntityId,
fields: &[(String, Value)],
) -> Result<(), String> {
let registry = self.registry.read();
for idx in registry.values() {
if idx.collection != collection {
continue;
}
if matches!(idx.method, IndexMethodKind::BTree) && idx.columns.len() > 1 {
self.sorted
.composite_entity_insert(collection, entity_id, fields);
continue;
}
let col = idx.columns.first().map(|s| s.as_str()).unwrap_or("");
for (field_name, value) in fields {
if field_name == col {
let key = value_to_bytes(value);
match idx.method {
IndexMethodKind::Hash => {
self.hash
.insert(collection, &idx.name, key, entity_id)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::Bitmap => {
self.bitmap
.insert(collection, col, entity_id, &key)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::BTree => {
if !self.sorted.has_index(collection, col) {
return Err(format!(
"sorted index for collection '{collection}' column '{col}' was not found"
));
}
self.sorted.insert_one(collection, col, value, entity_id);
self.hash
.insert(collection, &format!("{}_hash", idx.name), key, entity_id)
.map_err(|err| err.to_string())?;
}
IndexMethodKind::Spatial => {}
}
}
}
}
Ok(())
}
pub fn index_entity_delete(
&self,
collection: &str,
entity_id: EntityId,
fields: &[(String, Value)],
) -> Result<(), String> {
let registry = self.registry.read();
for idx in registry.values() {
if idx.collection != collection {
continue;
}
if matches!(idx.method, IndexMethodKind::BTree) && idx.columns.len() > 1 {
self.sorted.composite_entity_update(
collection,
&idx.columns,
entity_id,
fields,
&[],
);
continue;
}
let col = idx.columns.first().map(|s| s.as_str()).unwrap_or("");
for (field_name, value) in fields {
if field_name == col {
let key = value_to_bytes(value);
match idx.method {
IndexMethodKind::Hash => {
let _ = self.hash.remove(collection, &idx.name, &key, entity_id);
}
IndexMethodKind::Bitmap => {
let _ = self.bitmap.remove(collection, col, entity_id);
}
IndexMethodKind::BTree => {
self.sorted.delete_one(collection, col, value, entity_id);
let _ = self.hash.remove(
collection,
&format!("{}_hash", idx.name),
&key,
entity_id,
);
}
IndexMethodKind::Spatial => {}
}
}
}
}
Ok(())
}
pub fn index_entity_update(
&self,
collection: &str,
entity_id: EntityId,
old_fields: &[(String, Value)],
new_fields: &[(String, Value)],
) -> Result<(), String> {
let indexed_cols: std::collections::HashSet<String> = {
let registry = self.registry.read();
registry
.values()
.filter(|idx| idx.collection == collection)
.filter_map(|idx| idx.columns.first().cloned())
.collect()
};
if indexed_cols.is_empty() {
return Ok(());
}
let damage = crate::application::entity::row_damage_vector(old_fields, new_fields);
for (col, old_value, new_value) in &damage.changed {
if !indexed_cols.contains(col) {
continue;
}
self.index_entity_delete(collection, entity_id, &[(col.clone(), old_value.clone())])?;
self.index_entity_insert(collection, entity_id, &[(col.clone(), new_value.clone())])?;
}
for (col, old_value) in &damage.removed {
if !indexed_cols.contains(col) {
continue;
}
self.index_entity_delete(collection, entity_id, &[(col.clone(), old_value.clone())])?;
}
for (col, new_value) in &damage.added {
if !indexed_cols.contains(col) {
continue;
}
self.index_entity_insert(collection, entity_id, &[(col.clone(), new_value.clone())])?;
}
Ok(())
}
}
impl Default for IndexStore {
fn default() -> Self {
Self::new()
}
}
use crate::storage::unified::index::incremental::{
IndexMethodKind as IncMethodKind, SecondaryIndexBackend, SecondaryIndexHandle,
};
impl From<IndexMethodKind> for IncMethodKind {
fn from(value: IndexMethodKind) -> Self {
match value {
IndexMethodKind::Hash => IncMethodKind::Hash,
IndexMethodKind::Bitmap => IncMethodKind::Bitmap,
IndexMethodKind::Spatial => IncMethodKind::Spatial,
IndexMethodKind::BTree => IncMethodKind::BTree,
}
}
}
impl From<IncMethodKind> for IndexMethodKind {
fn from(value: IncMethodKind) -> Self {
match value {
IncMethodKind::Hash => IndexMethodKind::Hash,
IncMethodKind::Bitmap => IndexMethodKind::Bitmap,
IncMethodKind::Spatial => IndexMethodKind::Spatial,
IncMethodKind::BTree => IndexMethodKind::BTree,
}
}
}
impl SecondaryIndexBackend for IndexStore {
fn insert(
&self,
idx: &SecondaryIndexHandle,
key: &[u8],
row_id: EntityId,
) -> Result<(), String> {
let collection = idx.collection.as_ref();
let Some(col) = idx.leading_column() else {
return Ok(());
};
match idx.method {
IncMethodKind::Hash => self
.hash
.insert(collection, idx.name.as_ref(), key.to_vec(), row_id)
.map_err(|e| e.to_string()),
IncMethodKind::Bitmap => self
.bitmap
.insert(collection, col, row_id, key)
.map_err(|e| e.to_string()),
IncMethodKind::BTree => {
if idx.columns.len() > 1 {
return Ok(());
}
let aux = format!("{}_hash", idx.name);
self.hash
.insert(collection, &aux, key.to_vec(), row_id)
.map_err(|e| e.to_string())
}
IncMethodKind::Spatial => Ok(()),
}
}
fn remove(&self, idx: &SecondaryIndexHandle, key: &[u8], row_id: EntityId) {
let collection = idx.collection.as_ref();
let Some(col) = idx.leading_column() else {
return;
};
match idx.method {
IncMethodKind::Hash => {
let _ = self.hash.remove(collection, idx.name.as_ref(), key, row_id);
}
IncMethodKind::Bitmap => {
let _ = self.bitmap.remove(collection, col, row_id);
}
IncMethodKind::BTree => {
if idx.columns.len() > 1 {
return;
}
let aux = format!("{}_hash", idx.name);
let _ = self.hash.remove(collection, &aux, key, row_id);
}
IncMethodKind::Spatial => {}
}
}
}
fn value_to_bytes(value: &Value) -> Vec<u8> {
match value {
Value::Text(s) => s.as_bytes().to_vec(),
Value::Integer(n) => n.to_le_bytes().to_vec(),
Value::UnsignedInteger(n) => n.to_le_bytes().to_vec(),
Value::Float(n) => n.to_le_bytes().to_vec(),
Value::Boolean(b) => vec![*b as u8],
_ => format!("{:?}", value).into_bytes(),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ids(values: &[EntityId]) -> Vec<u64> {
values.iter().map(|id| id.raw()).collect()
}
#[test]
fn test_sorted_key_supports_text_ranges() {
let mut index = SortedColumnIndex::new();
index.insert(
value_to_sorted_key(&Value::text("alpha".to_string())).unwrap(),
EntityId::new(1),
);
index.insert(
value_to_sorted_key(&Value::text("bravo".to_string())).unwrap(),
EntityId::new(2),
);
index.insert(
value_to_sorted_key(&Value::text("charlie".to_string())).unwrap(),
EntityId::new(3),
);
assert_eq!(
ids(&index
.greater_than(value_to_sorted_key(&Value::text("alpha".to_string())).unwrap())
.unwrap()),
vec![2, 3]
);
assert_eq!(
ids(&index
.less_equal(value_to_sorted_key(&Value::text("alpha".to_string())).unwrap())
.unwrap()),
vec![1]
);
assert_eq!(
ids(&index
.range(
value_to_sorted_key(&Value::text("bravo".to_string())).unwrap(),
value_to_sorted_key(&Value::text("charlie".to_string())).unwrap(),
)
.unwrap()),
vec![2, 3]
);
}
#[test]
fn test_sorted_index_disables_range_lookup_when_mixed_families_are_present() {
let manager = SortedIndexManager::new();
let entities = vec![
(
EntityId::new(1),
vec![("score".to_string(), Value::Integer(10))],
),
(
EntityId::new(2),
vec![("score".to_string(), Value::Float(10.5))],
),
];
manager.build_index("numbers", "score", &entities);
assert_eq!(
manager.range_lookup(
"numbers",
"score",
value_to_sorted_key(&Value::Integer(0)).unwrap(),
value_to_sorted_key(&Value::Integer(20)).unwrap(),
),
None
);
}
#[test]
fn test_sorted_index_keeps_exact_in_lookup_when_mixed_families_are_present() {
let manager = SortedIndexManager::new();
let entities = vec![
(
EntityId::new(1),
vec![("mixed".to_string(), Value::Integer(10))],
),
(
EntityId::new(2),
vec![("mixed".to_string(), Value::text("ten".to_string()))],
),
];
manager.build_index("mixed_table", "mixed", &entities);
let matched = manager
.in_lookup_limited(
"mixed_table",
"mixed",
&[value_to_sorted_key(&Value::Integer(10)).unwrap()],
10,
)
.unwrap();
assert_eq!(ids(&matched), vec![1]);
}
#[test]
fn test_index_entity_insert_errors_when_registered_hash_index_is_missing() {
let store = IndexStore::new();
store.register(RegisteredIndex {
name: "idx_email".to_string(),
collection: "users".to_string(),
columns: vec!["email".to_string()],
method: IndexMethodKind::Hash,
unique: false,
});
let err = store
.index_entity_insert(
"users",
EntityId::new(1),
&[("email".to_string(), Value::text("a@b.com".to_string()))],
)
.expect_err("missing backing hash index should surface as an error");
assert!(err.contains("idx_email"));
assert!(err.contains("users"));
}
#[test]
fn test_index_entity_insert_batch_maintains_composite_btree() {
let store = IndexStore::new();
let columns = vec!["city".to_string(), "age".to_string()];
store
.create_index(
"idx_city_age",
"users",
&columns,
IndexMethodKind::BTree,
false,
&[],
)
.unwrap();
store.register(RegisteredIndex {
name: "idx_city_age".to_string(),
collection: "users".to_string(),
columns: columns.clone(),
method: IndexMethodKind::BTree,
unique: false,
});
let rows = vec![
(
EntityId::new(1),
vec![
("city".to_string(), Value::text("NYC".to_string())),
("age".to_string(), Value::Integer(30)),
],
),
(
EntityId::new(2),
vec![
("city".to_string(), Value::text("LA".to_string())),
("age".to_string(), Value::Integer(30)),
],
),
(
EntityId::new(3),
vec![
("city".to_string(), Value::text("NYC".to_string())),
("age".to_string(), Value::Integer(40)),
],
),
];
store.index_entity_insert_batch("users", &rows).unwrap();
let found = store
.sorted
.composite_prefix_range_lookup(
"users",
&columns,
&[value_to_sorted_key(&Value::text("NYC".to_string())).unwrap()],
value_to_sorted_key(&Value::Integer(20)).unwrap(),
value_to_sorted_key(&Value::Integer(45)).unwrap(),
10,
)
.unwrap();
assert_eq!(ids(&found), vec![1, 3]);
}
fn provision_hash_index(store: &IndexStore) {
store
.create_index(
"idx_email",
"users",
&["email".to_string()],
IndexMethodKind::Hash,
false,
&[],
)
.unwrap();
store.register(RegisteredIndex {
name: "idx_email".to_string(),
collection: "users".to_string(),
columns: vec!["email".to_string()],
method: IndexMethodKind::Hash,
unique: false,
});
}
#[test]
fn test_index_entity_delete_removes_row_from_hash_index() {
let store = IndexStore::new();
provision_hash_index(&store);
let id = EntityId::new(7);
let key = ("email".to_string(), Value::text("a@b.com".to_string()));
store
.index_entity_insert("users", id, &[key.clone()])
.unwrap();
assert_eq!(
ids(&store.hash_lookup("users", "idx_email", b"a@b.com").unwrap()),
vec![7]
);
store
.index_entity_delete("users", id, &[key.clone()])
.unwrap();
assert!(store
.hash_lookup("users", "idx_email", b"a@b.com")
.unwrap()
.is_empty());
}
#[test]
fn test_index_entity_delete_tolerates_missing_index() {
let store = IndexStore::new();
store.register(RegisteredIndex {
name: "idx_email".to_string(),
collection: "users".to_string(),
columns: vec!["email".to_string()],
method: IndexMethodKind::Hash,
unique: false,
});
store
.index_entity_delete(
"users",
EntityId::new(1),
&[("email".to_string(), Value::text("a@b.com".to_string()))],
)
.expect("delete must tolerate a missing backing index");
}
#[test]
fn test_index_entity_update_moves_row_to_new_key() {
let store = IndexStore::new();
provision_hash_index(&store);
let id = EntityId::new(11);
let old = ("email".to_string(), Value::text("old@x.com".to_string()));
let new = ("email".to_string(), Value::text("new@x.com".to_string()));
store
.index_entity_insert("users", id, &[old.clone()])
.unwrap();
store
.index_entity_update("users", id, &[old.clone()], &[new.clone()])
.unwrap();
assert!(store
.hash_lookup("users", "idx_email", b"old@x.com")
.unwrap()
.is_empty());
assert_eq!(
ids(&store
.hash_lookup("users", "idx_email", b"new@x.com")
.unwrap()),
vec![11]
);
}
#[test]
fn test_index_entity_update_skips_unchanged_columns() {
let store = IndexStore::new();
provision_hash_index(&store);
let id = EntityId::new(13);
let same = ("email".to_string(), Value::text("a@b.com".to_string()));
store
.index_entity_insert("users", id, &[same.clone()])
.unwrap();
store
.index_entity_update("users", id, &[same.clone()], &[same.clone()])
.unwrap();
assert_eq!(
ids(&store.hash_lookup("users", "idx_email", b"a@b.com").unwrap()),
vec![13]
);
}
#[test]
fn test_index_entity_update_indexes_newly_added_column() {
let store = IndexStore::new();
provision_hash_index(&store);
let id = EntityId::new(17);
let old: Vec<(String, Value)> = vec![]; let new = vec![("email".to_string(), Value::text("fresh@x.com".to_string()))];
store.index_entity_update("users", id, &old, &new).unwrap();
assert_eq!(
ids(&store
.hash_lookup("users", "idx_email", b"fresh@x.com")
.unwrap()),
vec![17]
);
}
#[test]
fn test_index_entity_update_removes_dropped_column() {
let store = IndexStore::new();
provision_hash_index(&store);
let id = EntityId::new(19);
let old = vec![("email".to_string(), Value::text("bye@x.com".to_string()))];
let new: Vec<(String, Value)> = vec![];
store.index_entity_insert("users", id, &old).unwrap();
assert_eq!(
ids(&store
.hash_lookup("users", "idx_email", b"bye@x.com")
.unwrap()),
vec![19]
);
store.index_entity_update("users", id, &old, &new).unwrap();
assert!(store
.hash_lookup("users", "idx_email", b"bye@x.com")
.unwrap()
.is_empty());
}
#[test]
fn test_index_entity_update_ignores_non_indexed_column_changes() {
let store = IndexStore::new();
provision_hash_index(&store); let id = EntityId::new(23);
let insert = vec![("email".to_string(), Value::text("a@b.com".to_string()))];
store.index_entity_insert("users", id, &insert).unwrap();
let old = vec![
("email".to_string(), Value::text("a@b.com".to_string())),
("age".to_string(), Value::Integer(30)),
];
let new = vec![
("email".to_string(), Value::text("a@b.com".to_string())),
("age".to_string(), Value::Integer(31)),
];
store.index_entity_update("users", id, &old, &new).unwrap();
assert_eq!(
ids(&store.hash_lookup("users", "idx_email", b"a@b.com").unwrap()),
vec![23]
);
}
}