use dashmap::{DashMap, DashSet};
use tracing::debug;
use serde_json::Value;
use std::sync::Arc;
use super::types::{DbError, LogEntry};
use super::StorageBackend;
pub fn index_doc(
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
collection: &str,
key: &str, value: &Value, ) {
for index_ref in indexes.iter() {
let index_name = index_ref.key();
if index_name.starts_with(&format!("{}:", collection)) {
let field_name = index_name.split(':').nth(1).unwrap_or("");
if field_name.is_empty() { continue; }
if let Some(val) = crate::query::get_nested_value(value, &field_name.split('.').collect::<Vec<_>>()) {
let val_str = val.to_string();
index_ref.value()
.entry(val_str)
.or_insert_with(DashSet::new)
.insert(key.to_string());
}
}
}
}
pub fn unindex_doc(
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
collection: &str,
key: &str, value: &Value, ) {
for index_ref in indexes.iter() {
let index_name = index_ref.key();
if index_name.starts_with(&format!("{}:", collection)) {
let field_name = index_name.split(':').nth(1).unwrap();
if let Some(val) = crate::query::get_nested_value(value, &field_name.split('.').collect::<Vec<_>>()) {
let val_str = val.to_string();
if let Some(key_set) = index_ref.value().get_mut(&val_str) {
key_set.remove(key);
}
}
}
}
}
pub fn track_query(
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
query_heatmap: &DashMap<String, u32>,
collection: &str,
field: &str,
storage: &Arc<dyn StorageBackend>,
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
) -> Result<(), DbError> {
let index_key = format!("{}:{}", collection, field);
if indexes.contains_key(&index_key) {
return Ok(());
}
let mut count = query_heatmap.entry(index_key.clone()).or_insert(0);
*count += 1;
if *count >= 3 {
debug!("🔥 Hot field detected! Auto-indexing {}.{}", collection, field);
create_index(indexes, storage, state, collection, field)?;
}
Ok(())
}
pub fn create_index(
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
collection: &str,
field: &str,
) -> Result<(), DbError> {
let index_key = format!("{}:{}", collection, field);
if indexes.contains_key(&index_key) {
return Ok(());
}
let field_index = DashMap::new();
if let Some(col) = state.get(collection) {
for entry in col.iter() {
let doc_value = match entry.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let log_entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
log_entry.value
}
};
if let Some(val) = crate::query::get_nested_value(
&doc_value,
&field.split('.').collect::<Vec<_>>(),
) {
field_index
.entry(val.to_string())
.or_insert_with(DashSet::new)
.insert(entry.key().clone());
}
}
}
indexes.insert(index_key, field_index);
let entry = LogEntry {
cmd: "INDEX".to_string(),
collection: collection.to_string(),
key: field.to_string(),
value: serde_json::json!(null),
};
storage.write_entry(&entry)?;
debug!("⚡ Persistent Index created on {}.{}", collection, field);
Ok(())
}