use manifoldb_core::index::{IndexId, PropertyIndexEntry};
use manifoldb_core::{Entity, TransactionError};
use manifoldb_storage::Transaction;
use crate::schema::{IndexSchema, SchemaManager};
use crate::transaction::DatabaseTransaction;
#[derive(Debug, thiserror::Error)]
pub enum IndexMaintenanceError {
#[error("transaction error: {0}")]
Transaction(#[from] TransactionError),
#[error("schema error: {0}")]
Schema(String),
}
pub struct EntityIndexMaintenance;
impl EntityIndexMaintenance {
pub fn on_insert<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
) -> Result<(), IndexMaintenanceError> {
for label in &entity.labels {
let label_str = label.as_str();
let indexes = Self::get_indexes_for_table(tx, label_str)?;
for index in indexes {
if let Some(using) = &index.using {
let using_lower = using.to_lowercase();
if using_lower == "hnsw" || using_lower == "ivfflat" {
continue;
}
}
Self::add_index_entries(tx, entity, label_str, &index)?;
}
}
Ok(())
}
pub fn on_update<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
old_entity: &Entity,
new_entity: &Entity,
) -> Result<(), IndexMaintenanceError> {
for old_label in &old_entity.labels {
if !new_entity.labels.contains(old_label) {
let old_label_str = old_label.as_str();
let indexes = Self::get_indexes_for_table(tx, old_label_str)?;
for index in indexes {
if let Some(using) = &index.using {
let using_lower = using.to_lowercase();
if using_lower == "hnsw" || using_lower == "ivfflat" {
continue;
}
}
Self::remove_index_entries(tx, old_entity, old_label_str, &index)?;
}
}
}
for label in &new_entity.labels {
let label_str = label.as_str();
let indexes = Self::get_indexes_for_table(tx, label_str)?;
for index in indexes {
if let Some(using) = &index.using {
let using_lower = using.to_lowercase();
if using_lower == "hnsw" || using_lower == "ivfflat" {
continue;
}
}
let is_new_label = !old_entity.labels.contains(label);
if is_new_label {
Self::add_index_entries(tx, new_entity, label_str, &index)?;
} else {
Self::update_index_entries(tx, old_entity, new_entity, label_str, &index)?;
}
}
}
Ok(())
}
pub fn on_delete<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
) -> Result<(), IndexMaintenanceError> {
for label in &entity.labels {
let label_str = label.as_str();
let indexes = Self::get_indexes_for_table(tx, label_str)?;
for index in indexes {
if let Some(using) = &index.using {
let using_lower = using.to_lowercase();
if using_lower == "hnsw" || using_lower == "ivfflat" {
continue;
}
}
Self::remove_index_entries(tx, entity, label_str, &index)?;
}
}
Ok(())
}
fn get_indexes_for_table<T: Transaction>(
tx: &DatabaseTransaction<T>,
table_name: &str,
) -> Result<Vec<IndexSchema>, IndexMaintenanceError> {
let index_names = SchemaManager::list_indexes_for_table(tx, table_name)
.map_err(|e| IndexMaintenanceError::Schema(e.to_string()))?;
let mut indexes = Vec::new();
for name in index_names {
if let Some(index) = SchemaManager::get_index(tx, &name)
.map_err(|e| IndexMaintenanceError::Schema(e.to_string()))?
{
indexes.push(index);
}
}
Ok(indexes)
}
fn add_index_entries<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
label: &str,
index: &IndexSchema,
) -> Result<(), IndexMaintenanceError> {
if index.columns.len() != 1 {
return Ok(());
}
let column_name = &index.columns[0].expr;
if let Some(value) = entity.properties.get(column_name) {
if PropertyIndexEntry::is_indexable(value) {
let index_id = IndexId::from_label_property(label, column_name);
let entry = PropertyIndexEntry::new(index_id, value.clone(), entity.id);
if let Some(key) = entry.encode_key() {
tx.put_property_index(&key)?;
}
}
}
Ok(())
}
fn remove_index_entries<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
label: &str,
index: &IndexSchema,
) -> Result<(), IndexMaintenanceError> {
if index.columns.len() != 1 {
return Ok(());
}
let column_name = &index.columns[0].expr;
if let Some(value) = entity.properties.get(column_name) {
if PropertyIndexEntry::is_indexable(value) {
let index_id = IndexId::from_label_property(label, column_name);
let entry = PropertyIndexEntry::new(index_id, value.clone(), entity.id);
if let Some(key) = entry.encode_key() {
tx.delete_property_index(&key)?;
}
}
}
Ok(())
}
fn update_index_entries<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
old_entity: &Entity,
new_entity: &Entity,
label: &str,
index: &IndexSchema,
) -> Result<(), IndexMaintenanceError> {
if index.columns.len() != 1 {
return Ok(());
}
let column_name = &index.columns[0].expr;
let old_value = old_entity.properties.get(column_name);
let new_value = new_entity.properties.get(column_name);
if old_value == new_value {
return Ok(());
}
let index_id = IndexId::from_label_property(label, column_name);
if let Some(old_val) = old_value {
if PropertyIndexEntry::is_indexable(old_val) {
let entry = PropertyIndexEntry::new(index_id, old_val.clone(), old_entity.id);
if let Some(key) = entry.encode_key() {
tx.delete_property_index(&key)?;
}
}
}
if let Some(new_val) = new_value {
if PropertyIndexEntry::is_indexable(new_val) {
let entry = PropertyIndexEntry::new(index_id, new_val.clone(), new_entity.id);
if let Some(key) = entry.encode_key() {
tx.put_property_index(&key)?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
}