use super::{
index_scanner::IndexScanner, nitrite_index::NitriteIndexProvider, IndexDescriptor, IndexMap,
};
use crate::{
collection::{FindPlan, NitriteId},
derive_index_map_name,
errors::{ErrorKind, NitriteError, NitriteResult},
store::{NitriteMap, NitriteMapProvider, NitriteStore, NitriteStoreProvider},
validate_index_field, FieldValues, Value, UNIQUE_INDEX,
};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
static UNIQUE_CONSTRAINT_ERROR: Lazy<NitriteError> = Lazy::new(|| {
NitriteError::new(
"Unique constraint violated",
ErrorKind::UniqueConstraintViolation,
)
});
static COMPOUND_INDEX_ERROR: Lazy<NitriteError> = Lazy::new(|| {
NitriteError::new(
"Compound multikey index is supported on the first field of the index only",
ErrorKind::IndexingError,
)
});
#[derive(Clone)]
pub struct CompoundIndex {
inner: Arc<CompoundIndexInner>,
}
impl CompoundIndex {
pub fn new(index_descriptor: IndexDescriptor, nitrite_store: NitriteStore) -> CompoundIndex {
let inner = CompoundIndexInner::new(index_descriptor, nitrite_store);
CompoundIndex {
inner: Arc::new(inner),
}
}
}
impl Deref for CompoundIndex {
type Target = Arc<CompoundIndexInner>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl NitriteIndexProvider for CompoundIndex {
fn index_descriptor(&self) -> NitriteResult<IndexDescriptor> {
self.inner.index_descriptor()
}
fn write(&self, field_values: &FieldValues) -> NitriteResult<()> {
self.inner.write(field_values)
}
fn remove(&self, field_values: &FieldValues) -> NitriteResult<()> {
self.inner.remove(field_values)
}
fn drop_index(&self) -> NitriteResult<()> {
self.inner.drop_index()
}
fn find_nitrite_ids(&self, find_plan: &FindPlan) -> NitriteResult<Vec<NitriteId>> {
self.inner.find_nitrite_ids(find_plan)
}
fn is_unique(&self) -> bool {
self.inner.is_unique()
}
}
pub struct CompoundIndexInner {
index_descriptor: IndexDescriptor,
nitrite_store: NitriteStore,
}
impl CompoundIndexInner {
fn new(index_descriptor: IndexDescriptor, nitrite_store: NitriteStore) -> Self {
Self {
index_descriptor,
nitrite_store,
}
}
fn find_index_map(&self) -> NitriteResult<NitriteMap> {
let map_name = derive_index_map_name(&self.index_descriptor);
self.nitrite_store.open_map(&map_name)
}
fn add_index_element(
&self,
index_map: &NitriteMap,
field_values: &FieldValues,
value: &Value,
) -> NitriteResult<()> {
let existing = index_map.get(value)?;
let mut sub_map = match existing {
Some(map) => map,
None => Value::Map(BTreeMap::new())
};
self.populate_sub_map(&mut sub_map, field_values, 1)?;
index_map.put(value.clone(), sub_map)
}
fn remove_index_element(
&self,
index_map: NitriteMap,
field_values: &FieldValues,
value: &Value,
) -> NitriteResult<()> {
let sub_map = index_map.get(value)?;
let mut sub_map = sub_map.unwrap_or(Value::Map(BTreeMap::new()));
self.delete_from_sub_map(&mut sub_map, field_values, 1)?;
index_map.put(value.clone(), sub_map)
}
fn populate_sub_map(
&self,
sub_map: &mut Value,
field_values: &FieldValues,
depth: usize,
) -> NitriteResult<()> {
if depth >= field_values.values().len() {
return Ok(());
}
let values = field_values.values();
let (_, value) = values.get(depth)
.ok_or_else(|| NitriteError::new(
&format!("Field value at depth {} not found in compound index", depth),
ErrorKind::IndexingError,
))?;
let db_value = match value {
Value::Array(_) => {
log::error!("Compound multikey index is supported on the first field of the index only");
return Err(COMPOUND_INDEX_ERROR.clone());
}
value => {
if !value.is_comparable() {
log::error!("Found non comparable value {} in compound index {:?}", value, self.index_descriptor);
return Err(NitriteError::new(
&format!("{} is not comparable", value),
ErrorKind::IndexingError,
));
}
value.clone()
}
};
if depth == field_values.values().len() - 1 {
let sub_map_inner = sub_map.as_map_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index corruption: expected map at depth {} for field values {:?}", depth, field_values.values()),
ErrorKind::IndexingError
))?;
let mut nitrite_ids = sub_map_inner.remove(&db_value)
.unwrap_or_else(|| Value::Array(Vec::new()));
let nitrite_ids_arr = nitrite_ids.as_array_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index error: expected array of NitriteIds for key {:?} at depth {}", db_value, depth),
ErrorKind::IndexingError
))?;
let nitrite_ids = self.add_nitrite_ids(nitrite_ids_arr, field_values)?;
sub_map_inner.insert(db_value, Value::Array(nitrite_ids));
} else {
let sub_map_inner = sub_map.as_map_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index corruption: expected map at depth {} for field values {:?}", depth, field_values.values()),
ErrorKind::IndexingError
))?;
let mut sub_map2 = sub_map_inner.remove(&db_value)
.unwrap_or_else(|| Value::Map(BTreeMap::new()));
self.populate_sub_map(&mut sub_map2, field_values, depth + 1)?;
sub_map_inner.insert(db_value, sub_map2);
}
Ok(())
}
fn delete_from_sub_map(
&self,
sub_map: &mut Value,
field_values: &FieldValues,
depth: usize,
) -> NitriteResult<()> {
let values = field_values.values();
let (_, value) = values.get(depth)
.ok_or_else(|| NitriteError::new(
&format!("Field value at depth {} not found in compound index", depth),
ErrorKind::IndexingError,
))?;
let db_value = match value {
Value::Null => Value::Null,
value => {
if !value.is_comparable() {
return Ok(());
}
value.clone()
}
};
if depth == field_values.values().len() - 1 {
let sub_map_inner = sub_map.as_map_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index corruption during deletion: expected map at depth {} for field values {:?}", depth, field_values.values()),
ErrorKind::IndexingError
))?;
let mut nitrite_ids = sub_map_inner.remove(&db_value)
.unwrap_or_else(|| Value::Array(Vec::new()));
let nitrite_ids_arr = nitrite_ids.as_array_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index error during deletion: expected array of NitriteIds for key {:?} at depth {}", db_value, depth),
ErrorKind::IndexingError
))?;
let nitrite_ids = self.remove_nitrite_ids(nitrite_ids_arr, field_values)?;
if !nitrite_ids.is_empty() {
sub_map_inner.insert(db_value, Value::Array(nitrite_ids));
}
} else {
let sub_map_inner = sub_map.as_map_mut()
.ok_or_else(|| NitriteError::new(
&format!("Compound index corruption during deletion: expected map at depth {} for field values {:?}", depth, field_values.values()),
ErrorKind::IndexingError
))?;
let mut sub_map2 = sub_map_inner.remove(&db_value)
.unwrap_or_else(|| Value::Map(BTreeMap::new()));
let is_empty = sub_map2.as_map()
.map(|m| m.is_empty())
.unwrap_or(true);
if !is_empty {
self.delete_from_sub_map(&mut sub_map2, field_values, depth + 1)?;
sub_map_inner.insert(db_value, sub_map2);
}
}
Ok(())
}
fn add_nitrite_ids(
&self,
nitrite_ids: &mut Vec<Value>,
field_values: &FieldValues,
) -> NitriteResult<Vec<Value>> {
if self.is_unique() && nitrite_ids.len() == 1 {
log::error!(
"Unique constraint violated for {:?}",
field_values.values()
);
return Err(UNIQUE_CONSTRAINT_ERROR.clone());
}
nitrite_ids.push(Value::NitriteId(*field_values.nitrite_id()));
nitrite_ids.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
nitrite_ids.dedup();
Ok(std::mem::take(nitrite_ids))
}
fn remove_nitrite_ids(
&self,
nitrite_ids: &mut Vec<Value>,
field_values: &FieldValues,
) -> NitriteResult<Vec<Value>> {
if !nitrite_ids.is_empty() {
let target_id = field_values.nitrite_id();
nitrite_ids.retain(|x| {
match x.as_nitrite_id() {
Some(id) => id != target_id,
None => {
log::warn!("Invalid NitriteId value in compound index: {:?}", x);
true }
}
});
}
Ok(std::mem::take(nitrite_ids))
}
fn scan_index(
&self,
find_plan: &FindPlan,
index_map: NitriteMap,
) -> NitriteResult<Vec<NitriteId>> {
if find_plan.index_scan_filter().is_none() {
return Ok(Vec::new());
}
let filters = find_plan.index_scan_filter()
.ok_or_else(|| NitriteError::new(
"Compound index scan error: index_scan_filter is required for compound index query optimization",
ErrorKind::InvalidOperation
))?
.filters();
let index_scan_order = find_plan.index_scan_order().unwrap_or_default();
let i_map = IndexMap::new(Some(index_map), None);
let index_scanner = IndexScanner::new(i_map);
index_scanner.scan(filters, index_scan_order)
}
fn index_descriptor(&self) -> NitriteResult<IndexDescriptor> {
Ok(self.index_descriptor.clone())
}
fn write(&self, field_values: &FieldValues) -> NitriteResult<()> {
let fields = field_values.fields();
let field_names = fields.field_names();
let first_field = field_names.first().map_or("", |x| x.as_str());
let first_value = field_values.get_value(first_field);
validate_index_field(first_value, first_field)?;
let index_map: NitriteMap = self.find_index_map()?;
match first_value {
None | Some(Value::Null) => {
self.add_index_element(&index_map, field_values, &Value::Null)?;
}
Some(Value::Array(arr)) => {
for value in arr {
self.add_index_element(&index_map, field_values, value)?;
}
}
Some(value) => {
if value.is_comparable() {
self.add_index_element(&index_map, field_values, value)?;
}
}
}
Ok(())
}
fn remove(&self, field_values: &FieldValues) -> NitriteResult<()> {
let fields = field_values.fields();
let field_names = fields.field_names();
let first_field = field_names.first().map_or("", |x| x.as_str());
let first_value = field_values.get_value(first_field);
validate_index_field(first_value, first_field)?;
let index_map: NitriteMap = self.find_index_map()?;
match first_value {
None | Some(Value::Null) => {
self.remove_index_element(index_map, field_values, &Value::Null)?;
}
Some(Value::Array(arr)) => {
for value in arr {
self.remove_index_element(index_map.clone(), field_values, value)?;
}
}
Some(value) => {
if value.is_comparable() {
self.remove_index_element(index_map, field_values, value)?;
}
}
}
Ok(())
}
fn drop_index(&self) -> NitriteResult<()> {
let index_map = self.find_index_map()?;
index_map.clear()?;
index_map.dispose()?;
Ok(())
}
fn find_nitrite_ids(&self, find_plan: &FindPlan) -> NitriteResult<Vec<NitriteId>> {
if find_plan.index_scan_filter().is_none() {
return Ok(Vec::new());
}
let index_map = self.find_index_map()?;
self.scan_index(find_plan, index_map)
}
fn is_unique(&self) -> bool {
self.index_descriptor
.index_type()
.eq_ignore_ascii_case(UNIQUE_INDEX)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::Fields;
use crate::{FieldValues, Value};
fn create_test_index_descriptor() -> IndexDescriptor {
IndexDescriptor::new(
UNIQUE_INDEX,
Fields::with_names(vec!["field1", "field2"]).unwrap(),
"test",
)
}
fn create_test_field_values() -> FieldValues {
FieldValues::new(
vec![
("field1".to_string(), Value::String("value1".to_string())),
("field2".to_string(), Value::String("value2".to_string())),
],
NitriteId::new(),
Fields::with_names(vec!["field1", "field2"]).unwrap(),
)
}
#[test]
fn test_compound_index_new() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor.clone(), nitrite_store.clone());
assert_eq!(compound_index.inner.index_descriptor, index_descriptor);
}
#[test]
fn test_compound_index_find_index_map() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.find_index_map();
assert!(result.is_ok());
}
#[test]
fn test_compound_index_add_index_element() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let index_map = compound_index.find_index_map().unwrap();
let field_values = create_test_field_values();
let value = Value::String("test_value".to_string());
let result = compound_index.add_index_element(&index_map, &field_values, &value);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_remove_index_element() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let index_map = compound_index.find_index_map().unwrap();
let field_values = create_test_field_values();
let value = Value::String("test_value".to_string());
let result = compound_index.remove_index_element(index_map, &field_values, &value);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_populate_sub_map() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let mut sub_map = Value::Map(BTreeMap::new());
let field_values = create_test_field_values();
let result = compound_index.populate_sub_map(&mut sub_map, &field_values, 0);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_delete_from_sub_map() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let mut sub_map = Value::Map(BTreeMap::new());
let field_values = create_test_field_values();
let result = compound_index.delete_from_sub_map(&mut sub_map, &field_values, 0);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_add_nitrite_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let mut nitrite_ids = Vec::new();
let field_values = create_test_field_values();
let result = compound_index.add_nitrite_ids(&mut nitrite_ids, &field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_remove_nitrite_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let mut nitrite_ids = vec![*field_values.nitrite_id()];
let result = compound_index.remove_nitrite_ids(&mut nitrite_ids, &field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_scan_index() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new();
let index_map = compound_index.find_index_map().unwrap();
let result = compound_index.scan_index(&find_plan, index_map);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_write() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_remove() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.remove(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_drop_index() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.drop_index();
assert!(result.is_ok());
}
#[test]
fn test_compound_index_find_nitrite_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new();
let result = compound_index.find_nitrite_ids(&find_plan);
assert!(result.is_ok());
}
#[test]
fn test_compound_index_is_unique() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let result = compound_index.is_unique();
assert!(result);
}
#[test]
fn test_populate_sub_map_graceful_error_handling() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_delete_from_sub_map_graceful_error_handling() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let _ = compound_index.write(&field_values);
let result = compound_index.remove(&field_values);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_remove_nitrite_ids_with_invalid_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let mut nitrite_ids = vec![
Value::String("not_an_id".to_string()), Value::I32(42), Value::Null, ];
let field_values = create_test_field_values();
let result = compound_index.inner.remove_nitrite_ids(&mut nitrite_ids, &field_values);
assert!(result.is_ok());
let retained_ids = result.unwrap();
assert_eq!(retained_ids.len(), 3);
}
#[test]
fn test_remove_nitrite_ids_removes_matching_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let target_id = *field_values.nitrite_id();
let mut nitrite_ids = vec![Value::NitriteId(target_id)];
let result = compound_index.inner.remove_nitrite_ids(&mut nitrite_ids, &field_values);
assert!(result.is_ok());
let retained_ids = result.unwrap();
assert_eq!(retained_ids.len(), 0);
}
#[test]
fn test_remove_nitrite_ids_preserves_non_matching_ids() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let other_id = NitriteId::new();
let mut nitrite_ids = vec![Value::NitriteId(other_id)];
let result = compound_index.inner.remove_nitrite_ids(&mut nitrite_ids, &field_values);
assert!(result.is_ok());
let retained_ids = result.unwrap();
assert_eq!(retained_ids.len(), 1);
}
#[test]
fn test_populate_sub_map_avoids_excessive_cloning() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let mut sub_map = Value::Map(BTreeMap::new());
let field_values = create_test_field_values();
for _ in 0..5 {
let result = compound_index.populate_sub_map(&mut sub_map, &field_values, 0);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_delete_from_sub_map_avoids_excessive_cloning() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let _ = compound_index.write(&field_values);
for _ in 0..3 {
let result = compound_index.remove(&field_values);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_add_nitrite_ids_dedup_efficiency() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
for _ in 0..3 {
let result = compound_index.write(&field_values);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_remove_method_efficient_array_processing() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let field_values = create_test_field_values();
let result = compound_index.write(&field_values);
assert!(result.is_ok());
let result = compound_index.remove(&field_values);
assert!(result.is_ok());
}
#[test]
fn test_scan_index_no_filter_early_return() {
let index_descriptor = create_test_index_descriptor();
let nitrite_store = NitriteStore::default();
let compound_index = CompoundIndex::new(index_descriptor, nitrite_store);
let find_plan = FindPlan::new(); let index_map = compound_index.find_index_map().unwrap();
let result = compound_index.scan_index(&find_plan, index_map);
assert!(result.is_ok());
let ids = result.unwrap();
assert_eq!(ids.len(), 0);
}
}