use super::{
index_map::normalize_index_value, index_scanner::IndexScanner,
nitrite_index::NitriteIndexProvider, IndexDescriptor, IndexMap,
};
use crate::{
collection::{FindPlan, NitriteId},
derive_index_map_name,
errors::{ErrorKind, NitriteError, NitriteResult},
store::{NitriteMap, NitriteMapProvider, NitriteStore, NitriteStoreProvider},
validate_index_field, common::Key, FieldValues, Value, UNIQUE_INDEX,
};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
static UNIQUE_CONSTRAINT_ERROR: Lazy<NitriteError> = Lazy::new(|| {
NitriteError::new(
"Unique constraint violated",
ErrorKind::UniqueConstraintViolation,
)
});
static COMPOUND_INDEX_ERROR: Lazy<NitriteError> = Lazy::new(|| {
NitriteError::new(
"Compound multikey index is supported on the first field of the index only",
ErrorKind::IndexingError,
)
});
#[derive(Clone)]
pub struct CompoundIndex {
inner: Arc<CompoundIndexInner>,
}
impl CompoundIndex {
pub fn new(index_descriptor: IndexDescriptor, nitrite_store: NitriteStore) -> CompoundIndex {
let inner = CompoundIndexInner::new(index_descriptor, nitrite_store);
CompoundIndex {
inner: Arc::new(inner),
}
}
}
impl Deref for CompoundIndex {
type Target = Arc<CompoundIndexInner>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl NitriteIndexProvider for CompoundIndex {
fn index_descriptor(&self) -> NitriteResult<IndexDescriptor> {
self.inner.index_descriptor()
}
fn write(&self, field_values: &FieldValues) -> NitriteResult<()> {
self.inner.write(field_values)
}
fn remove(&self, field_values: &FieldValues) -> NitriteResult<()> {
self.inner.remove(field_values)
}
fn drop_index(&self) -> NitriteResult<()> {
self.inner.drop_index()
}
fn find_nitrite_ids(&self, find_plan: &FindPlan) -> NitriteResult<Vec<NitriteId>> {
self.inner.find_nitrite_ids(find_plan)
}
fn is_unique(&self) -> bool {
self.inner.is_unique()
}
}
pub struct CompoundIndexInner {
index_descriptor: IndexDescriptor,
nitrite_store: NitriteStore,
}
impl CompoundIndexInner {
fn new(index_descriptor: IndexDescriptor, nitrite_store: NitriteStore) -> Self {
Self {
index_descriptor,
nitrite_store,
}
}
fn find_index_map(&self) -> NitriteResult<NitriteMap> {
let map_name = derive_index_map_name(&self.index_descriptor);
self.nitrite_store.open_map(&map_name)
}
fn field_count(&self) -> usize {
self.index_descriptor.index_fields().field_names().len()
}
fn composite_key(
&self,
field_values: &FieldValues,
first_value: &Value,
) -> NitriteResult<Key> {
let values = field_values.values();
let mut parts = Vec::with_capacity(values.len() + 1);
parts.push(normalize_index_value(first_value));
for (_, value) in values.iter().skip(1) {
match value {
Value::Array(_) => {
log::error!(
"Compound multikey index is supported on the first field of the index only"
);
return Err(COMPOUND_INDEX_ERROR.clone());
}
Value::Null => parts.push(Value::Null),
v if v.is_comparable() => parts.push(normalize_index_value(v)),
v => {
log::error!(
"Found non comparable value {} in compound index {:?}",
v,
self.index_descriptor
);
return Err(NitriteError::new(
&format!("{} is not comparable", v),
ErrorKind::IndexingError,
));
}
}
}
parts.push(Value::NitriteId(*field_values.nitrite_id()));
Ok(Value::Array(parts))
}
fn add_index_element(
&self,
index_map: &NitriteMap,
field_values: &FieldValues,
value: &Value,
) -> NitriteResult<()> {
let key = self.composite_key(field_values, value)?;
if self.is_unique() {
self.check_unique(index_map, &key)?;
}
index_map.put(key, Value::Null)
}
fn check_unique(&self, index_map: &NitriteMap, key: &Key) -> NitriteResult<()> {
let Value::Array(parts) = key else {
return Ok(());
};
let k = self.field_count();
if parts.len() != k + 1 {
return Ok(());
}
let tuple = &parts[..k];
let id = &parts[k];
let probe = Value::Array(tuple.to_vec());
if let Some(Value::Array(existing)) = index_map.ceiling_key(&probe)? {
if existing.len() == k + 1 && existing[..k] == *tuple && existing[k] != *id {
log::error!("Unique constraint violated for {:?}", tuple);
return Err(UNIQUE_CONSTRAINT_ERROR.clone());
}
}
Ok(())
}
fn remove_index_element(
&self,
index_map: NitriteMap,
field_values: &FieldValues,
value: &Value,
) -> NitriteResult<()> {
let key = self.composite_key(field_values, value)?;
index_map.remove(&key)?;
Ok(())
}
fn scan_index(
&self,
find_plan: &FindPlan,
index_map: NitriteMap,
) -> NitriteResult<Vec<NitriteId>> {
if find_plan.index_scan_filter().is_none() {
return Ok(Vec::new());
}
let filters = find_plan.index_scan_filter()
.ok_or_else(|| NitriteError::new(
"Compound index scan error: index_scan_filter is required for compound index query optimization",
ErrorKind::InvalidOperation
))?
.filters();
let index_scan_order = find_plan.index_scan_order().unwrap_or_default();
let i_map = IndexMap::composite(index_map, self.field_count());
let index_scanner = IndexScanner::new(i_map);
index_scanner.scan(filters, index_scan_order)
}
fn index_descriptor(&self) -> NitriteResult<IndexDescriptor> {
Ok(self.index_descriptor.clone())
}
fn write(&self, field_values: &FieldValues) -> NitriteResult<()> {
let fields = field_values.fields();
let field_names = fields.field_names();
let first_field = field_names.first().map_or("", |x| x.as_str());
let first_value = field_values.get_value(first_field);
validate_index_field(first_value, first_field)?;
let index_map: NitriteMap = self.find_index_map()?;
match first_value {
None | Some(Value::Null) => {
self.add_index_element(&index_map, field_values, &Value::Null)?;
}
Some(Value::Array(arr)) => {
for value in arr {
self.add_index_element(&index_map, field_values, value)?;
}
}
Some(value) => {
if value.is_comparable() {
self.add_index_element(&index_map, field_values, value)?;
}
}
}
Ok(())
}
fn remove(&self, field_values: &FieldValues) -> NitriteResult<()> {
let fields = field_values.fields();
let field_names = fields.field_names();
let first_field = field_names.first().map_or("", |x| x.as_str());
let first_value = field_values.get_value(first_field);
validate_index_field(first_value, first_field)?;
let index_map: NitriteMap = self.find_index_map()?;
match first_value {
None | Some(Value::Null) => {
self.remove_index_element(index_map, field_values, &Value::Null)?;
}
Some(Value::Array(arr)) => {
for value in arr {
self.remove_index_element(index_map.clone(), field_values, value)?;
}
}
Some(value) => {
if value.is_comparable() {
self.remove_index_element(index_map, field_values, value)?;
}
}
}
Ok(())
}
fn drop_index(&self) -> NitriteResult<()> {
let index_map = self.find_index_map()?;
index_map.clear()?;
index_map.dispose()?;
Ok(())
}
fn find_nitrite_ids(&self, find_plan: &FindPlan) -> NitriteResult<Vec<NitriteId>> {
if find_plan.index_scan_filter().is_none() {
return Ok(Vec::new());
}
let index_map = self.find_index_map()?;
self.scan_index(find_plan, index_map)
}
fn is_unique(&self) -> bool {
self.index_descriptor
.index_type()
.eq_ignore_ascii_case(UNIQUE_INDEX)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::Fields;
use crate::{FieldValues, Value};
fn create_test_index_descriptor() -> IndexDescriptor {
IndexDescriptor::new(
UNIQUE_INDEX,
Fields::with_names(vec!["field1", "field2"]).unwrap(),
"test",
)
}
fn create_test_field_values() -> FieldValues {
FieldValues::new(
vec![
("field1".to_string(), Value::String("value1".to_string())),
("field2".to_string(), Value::String("value2".to_string())),
],
NitriteId::new(),
Fields::with_names(vec!["field1", "field2"]).unwrap(),
)
}
#[test]
fn test_compound_index_new() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor.clone(), nitrite_store.clone());
assert_eq!(compound_index.inner.index_descriptor, index_descriptor);
}
#[test]
fn test_compound_index_find_index_map() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.find_index_map();
assert!(result.is_ok());
}
#[test]
fn test_compound_index_add_index_element() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let index_map = compound_index.find_index_map().unwrap();
let field_values = create_test_field_values();
let value = Value::String("test_value".to_string());
let result = compound_index.add_index_element(&index_map, &field_values, &value);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_remove_index_element() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let index_map = compound_index.find_index_map().unwrap();
let field_values = create_test_field_values();
let value = Value::String("test_value".to_string());
let result = compound_index.remove_index_element(index_map, &field_values, &value);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_composite_key_layout() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let first = Value::String("value1".to_string());
let key = compound_index.composite_key(&field_values, &first).unwrap();
match key {
Value::Array(parts) => {
assert_eq!(parts.len(), 3); assert_eq!(parts[0], Value::String("value1".to_string()));
assert_eq!(parts[1], Value::String("value2".to_string()));
assert!(parts[2].is_nitrite_id());
}
other => panic!("expected composite array key, got {other:?}"),
}
}
#[test]
fn test_compound_index_scan_index() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new();
let index_map = compound_index.find_index_map().unwrap();
let result = compound_index.scan_index(&find_plan, index_map);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_write() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_remove() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.remove(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_drop_index() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.drop_index();
assert!(result.is_ok());
}
#[test]
fn test_compound_index_find_nitrite_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new();
let result = compound_index.find_nitrite_ids(&find_plan);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_is_unique() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.is_unique();
assert!(result);
}
#[test]
fn test_populate_sub_map_graceful_error_handling() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_delete_from_sub_map_graceful_error_handling() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let _ = compound_index.write(&field_values);
let result = compound_index.remove(&field_values);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_compound_index_rejects_multikey_in_later_field() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = FieldValues::new(
vec![
("field1".to_string(), Value::String("a".to_string())),
("field2".to_string(), Value::Array(vec![Value::I32(1)])),
],
NitriteId::new(),
Fields::with_names(vec!["field1", "field2"]).unwrap(),
);
let first = Value::String("a".to_string());
let result = compound_index.composite_key(&field_values, &first);
assert!(result.is_err());
}
#[test]
fn test_delete_from_sub_map_avoids_excessive_cloning() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let _ = compound_index.write(&field_values);
for _ in 0..3 {
let result = compound_index.remove(&field_values);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_add_nitrite_ids_dedup_efficiency() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
for _ in 0..3 {
let result = compound_index.write(&field_values);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_remove_method_efficient_array_processing() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok());
let result = compound_index.remove(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_scan_index_no_filter_early_return() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new(); let index_map = compound_index.find_index_map().unwrap();
let result = compound_index.scan_index(&find_plan, index_map);
assert!(result.is_ok());
let ids = result.unwrap();
assert_eq!(ids.len(), 0);
}
}