use crate::storage::StorageError;
use crate::storage::Storage as StorageEnum;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tokio::sync::RwLock;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IndexError {
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
#[error("Index not found: {0}")]
IndexNotFound(String),
#[error("Index already exists: {0}")]
IndexAlreadyExists(String),
#[error("Invalid index value")]
InvalidIndexValue,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexConfig {
pub name: String,
pub index_type: IndexType,
pub unique: bool,
pub sparse: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum IndexType {
Hash,
BTree,
FullText,
}
impl Default for IndexType {
fn default() -> Self {
Self::Hash
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexMetadata {
pub config: IndexConfig,
pub entry_count: usize,
pub size_bytes: usize,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexEntry {
pub value: String,
pub document_ids: Vec<String>,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
pub struct IndexedStorage {
primary: Arc<StorageEnum>,
indices: Arc<RwLock<HashMap<String, Box<dyn Index>>>>,
metadata: Arc<RwLock<HashMap<String, IndexMetadata>>>,
}
#[async_trait::async_trait]
pub trait Index: Send + Sync {
fn name(&self) -> &str;
fn index_type(&self) -> IndexType;
async fn insert(&mut self, value: &str, document_id: &str) -> Result<(), IndexError>;
async fn remove(&mut self, value: &str, document_id: &str) -> Result<(), IndexError>;
async fn get(&self, value: &str) -> Result<Vec<String>, IndexError>;
async fn values(&self) -> Result<Vec<String>, IndexError>;
async fn stats(&self) -> Result<IndexStats, IndexError>;
async fn clear(&mut self) -> Result<(), IndexError>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexStats {
pub entry_count: usize,
pub size_bytes: usize,
pub avg_entries_per_value: f64,
pub top_values: Vec<(String, usize)>,
}
pub struct HashIndex {
name: String,
data: HashMap<String, Vec<String>>,
unique: bool,
sparse: bool,
}
impl HashIndex {
pub fn new(name: String, config: &IndexConfig) -> Self {
Self {
name,
data: HashMap::new(),
unique: config.unique,
sparse: config.sparse,
}
}
}
#[async_trait::async_trait]
impl Index for HashIndex {
fn name(&self) -> &str {
&self.name
}
fn index_type(&self) -> IndexType {
IndexType::Hash
}
async fn insert(&mut self, value: &str, document_id: &str) -> Result<(), IndexError> {
if value.is_empty() && !self.sparse {
return Err(IndexError::InvalidIndexValue);
}
let entry = self.data.entry(value.to_string()).or_insert_with(Vec::new);
if self.unique && !entry.is_empty() {
return Err(IndexError::InvalidIndexValue);
}
if !entry.contains(&document_id.to_string()) {
entry.push(document_id.to_string());
}
Ok(())
}
async fn remove(&mut self, value: &str, document_id: &str) -> Result<(), IndexError> {
if let Some(entry) = self.data.get_mut(value) {
entry.retain(|id| id != document_id);
if entry.is_empty() {
self.data.remove(value);
}
}
Ok(())
}
async fn get(&self, value: &str) -> Result<Vec<String>, IndexError> {
Ok(self.data.get(value).cloned().unwrap_or_default())
}
async fn values(&self) -> Result<Vec<String>, IndexError> {
Ok(self.data.keys().cloned().collect())
}
async fn stats(&self) -> Result<IndexStats, IndexError> {
let entry_count = self.data.len();
let total_docs: usize = self.data.values().map(|v| v.len()).sum();
let avg_entries = if entry_count > 0 {
total_docs as f64 / entry_count as f64
} else {
0.0
};
let mut top_values: Vec<_> = self.data.iter()
.map(|(k, v)| (k.clone(), v.len()))
.collect();
top_values.sort_by(|a, b| b.1.cmp(&a.1));
top_values.truncate(10);
Ok(IndexStats {
entry_count,
size_bytes: std::mem::size_of_val(&self.data),
avg_entries_per_value: avg_entries,
top_values,
})
}
async fn clear(&mut self) -> Result<(), IndexError> {
self.data.clear();
Ok(())
}
}
pub struct BTreeIndex {
name: String,
data: BTreeMap<String, Vec<String>>,
unique: bool,
sparse: bool,
}
impl BTreeIndex {
pub fn new(name: String, config: &IndexConfig) -> Self {
Self {
name,
data: BTreeMap::new(),
unique: config.unique,
sparse: config.sparse,
}
}
}
#[async_trait::async_trait]
impl Index for BTreeIndex {
fn name(&self) -> &str {
&self.name
}
fn index_type(&self) -> IndexType {
IndexType::BTree
}
async fn insert(&mut self, value: &str, document_id: &str) -> Result<(), IndexError> {
if value.is_empty() && !self.sparse {
return Err(IndexError::InvalidIndexValue);
}
let entry = self.data.entry(value.to_string()).or_insert_with(Vec::new);
if self.unique && !entry.is_empty() {
return Err(IndexError::InvalidIndexValue);
}
if !entry.contains(&document_id.to_string()) {
entry.push(document_id.to_string());
}
Ok(())
}
async fn remove(&mut self, value: &str, document_id: &str) -> Result<(), IndexError> {
if let Some(entry) = self.data.get_mut(value) {
entry.retain(|id| id != document_id);
if entry.is_empty() {
self.data.remove(value);
}
}
Ok(())
}
async fn get(&self, value: &str) -> Result<Vec<String>, IndexError> {
Ok(self.data.get(value).cloned().unwrap_or_default())
}
async fn values(&self) -> Result<Vec<String>, IndexError> {
Ok(self.data.keys().cloned().collect())
}
async fn stats(&self) -> Result<IndexStats, IndexError> {
let entry_count = self.data.len();
let total_docs: usize = self.data.values().map(|v| v.len()).sum();
let avg_entries = if entry_count > 0 {
total_docs as f64 / entry_count as f64
} else {
0.0
};
let mut top_values: Vec<_> = self.data.iter()
.map(|(k, v)| (k.clone(), v.len()))
.collect();
top_values.sort_by(|a, b| b.1.cmp(&a.1));
top_values.truncate(10);
Ok(IndexStats {
entry_count,
size_bytes: std::mem::size_of_val(&self.data),
avg_entries_per_value: avg_entries,
top_values,
})
}
async fn clear(&mut self) -> Result<(), IndexError> {
self.data.clear();
Ok(())
}
}
impl IndexedStorage {
pub fn new(primary: Arc<StorageEnum>) -> Self {
Self {
primary,
indices: Arc::new(RwLock::new(HashMap::new())),
metadata: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn create_index(&self, config: IndexConfig) -> Result<(), IndexError> {
let index_name = config.name.clone();
if self.indices.read().await.contains_key(&index_name) {
return Err(IndexError::IndexAlreadyExists(index_name));
}
let index: Box<dyn Index> = match config.index_type {
IndexType::Hash => Box::new(HashIndex::new(index_name.clone(), &config)),
IndexType::BTree => Box::new(BTreeIndex::new(index_name.clone(), &config)),
IndexType::FullText => {
return Err(IndexError::InvalidIndexValue);
}
};
self.indices.write().await.insert(index_name.clone(), index);
let metadata = IndexMetadata {
config,
entry_count: 0,
size_bytes: 0,
last_updated: chrono::Utc::now(),
};
self.metadata.write().await.insert(index_name, metadata);
Ok(())
}
pub async fn drop_index(&self, name: &str) -> Result<(), IndexError> {
if !self.indices.read().await.contains_key(name) {
return Err(IndexError::IndexNotFound(name.to_string()));
}
self.indices.write().await.remove(name);
self.metadata.write().await.remove(name);
Ok(())
}
pub async fn list_indices(&self) -> Vec<String> {
self.indices.read().await.keys().cloned().collect()
}
pub async fn get_index_metadata(&self, name: &str) -> Option<IndexMetadata> {
self.metadata.read().await.get(name).cloned()
}
pub async fn query_by_index(&self, index_name: &str, value: &str) -> Result<Vec<String>, IndexError> {
let indices = self.indices.read().await;
let index = indices.get(index_name)
.ok_or_else(|| IndexError::IndexNotFound(index_name.to_string()))?;
index.get(value).await
}
pub async fn range_query(&self, index_name: &str, _start: &str, _end: &str) -> Result<Vec<String>, IndexError> {
let indices = self.indices.read().await;
let index = indices.get(index_name)
.ok_or_else(|| IndexError::IndexNotFound(index_name.to_string()))?;
if index.index_type() != IndexType::BTree {
return Err(IndexError::InvalidIndexValue);
}
Ok(Vec::new())
}
pub async fn get_index_stats(&self, name: &str) -> Result<IndexStats, IndexError> {
let indices = self.indices.read().await;
let index = indices.get(name)
.ok_or_else(|| IndexError::IndexNotFound(name.to_string()))?;
index.stats().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::MemoryStorage;
#[tokio::test]
async fn test_index_creation() {
let primary = Arc::new(StorageEnum::Memory(MemoryStorage::new()));
let indexed = IndexedStorage::new(primary);
let config = IndexConfig {
name: "test_index".to_string(),
index_type: IndexType::Hash,
unique: false,
sparse: false,
};
assert!(indexed.create_index(config).await.is_ok());
assert!(indexed.list_indices().await.contains(&"test_index".to_string()));
}
#[tokio::test]
async fn test_duplicate_index() {
let primary = Arc::new(StorageEnum::Memory(MemoryStorage::new()));
let indexed = IndexedStorage::new(primary);
let config = IndexConfig {
name: "test_index".to_string(),
index_type: IndexType::Hash,
unique: false,
sparse: false,
};
assert!(indexed.create_index(config.clone()).await.is_ok());
assert!(indexed.create_index(config).await.is_err());
}
#[tokio::test]
async fn test_index_drop() {
let primary = Arc::new(StorageEnum::Memory(MemoryStorage::new()));
let indexed = IndexedStorage::new(primary);
let config = IndexConfig {
name: "test_index".to_string(),
index_type: IndexType::Hash,
unique: false,
sparse: false,
};
assert!(indexed.create_index(config).await.is_ok());
assert!(indexed.drop_index("test_index").await.is_ok());
assert!(!indexed.list_indices().await.contains(&"test_index".to_string()));
}
}