use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
use uuid::Uuid;
use chrono::Utc;
use crate::types::{Document, SearchOptions, SearchResult, SearchFilter};
use crate::services::EncryptionService;
use super::vector_store::{VectorStore, CollectionInfo, CollectionHealth};
use super::vector_store::utils::cosine_similarity;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileCollectionMetadata {
pub name: String,
pub vector_size: usize,
pub distance_metric: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub document_count: usize,
}
pub struct LocalFileVectorStore {
base_path: PathBuf,
encryption_service: Arc<EncryptionService>,
documents: Arc<RwLock<HashMap<String, HashMap<String, Document>>>>, metadata: Arc<RwLock<HashMap<String, FileCollectionMetadata>>>,
dimensions: Arc<RwLock<Option<usize>>>,
initialized: Arc<RwLock<bool>>,
}
impl LocalFileVectorStore {
pub async fn new(
base_path: impl AsRef<Path>,
encryption_service: Arc<EncryptionService>,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
Ok(Self {
base_path,
encryption_service,
documents: Arc::new(RwLock::new(HashMap::new())),
metadata: Arc::new(RwLock::new(HashMap::new())),
dimensions: Arc::new(RwLock::new(None)),
initialized: Arc::new(RwLock::new(false)),
})
}
fn get_documents_path(&self, collection_name: &str) -> PathBuf {
self.base_path.join(format!("{}_documents.json", collection_name))
}
fn get_metadata_path(&self, collection_name: &str) -> PathBuf {
self.base_path.join(format!("{}_metadata.json", collection_name))
}
async fn load_collection(&self, collection_name: &str) -> Result<()> {
let docs_path = self.get_documents_path(collection_name);
let meta_path = self.get_metadata_path(collection_name);
if docs_path.exists() {
let content = tokio::fs::read_to_string(&docs_path).await?;
let documents: Vec<Document> = serde_json::from_str(&content)?;
let mut docs_map = self.documents.write().await;
let collection_docs = docs_map.entry(collection_name.to_string())
.or_insert_with(HashMap::new);
for doc in documents {
collection_docs.insert(doc.id.clone(), doc);
}
}
if meta_path.exists() {
let content = tokio::fs::read_to_string(&meta_path).await?;
let metadata: FileCollectionMetadata = serde_json::from_str(&content)?;
let mut meta_map = self.metadata.write().await;
meta_map.insert(collection_name.to_string(), metadata);
}
Ok(())
}
async fn save_collection(&self, collection_name: &str) -> Result<()> {
let docs_path = self.get_documents_path(collection_name);
let meta_path = self.get_metadata_path(collection_name);
if let Some(parent) = docs_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let documents = self.documents.read().await;
if let Some(collection_docs) = documents.get(collection_name) {
let docs_vec: Vec<Document> = collection_docs.values().cloned().collect();
let content = serde_json::to_string_pretty(&docs_vec)?;
tokio::fs::write(&docs_path, content).await?;
}
let metadata_map = self.metadata.read().await;
if let Some(metadata) = metadata_map.get(collection_name) {
let content = serde_json::to_string_pretty(metadata)?;
tokio::fs::write(&meta_path, content).await?;
}
Ok(())
}
async fn ensure_initialized(&self) -> Result<()> {
if !*self.initialized.read().await {
return Err(anyhow!("Vector store not initialized"));
}
Ok(())
}
async fn simple_search(
&self,
collection_name: &str,
query_vector: &[f32],
limit: usize,
score_threshold: f32,
) -> Result<Vec<(String, f32)>> {
let documents = self.documents.read().await;
let collection_docs = documents.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let mut candidates: Vec<(String, f32)> = Vec::new();
for (doc_id, document) in collection_docs.iter() {
if let Some(doc_embedding) = &document.embedding {
let similarity = cosine_similarity(query_vector, doc_embedding)?;
if similarity >= score_threshold {
candidates.push((doc_id.clone(), similarity));
}
}
}
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
candidates.truncate(limit);
Ok(candidates)
}
async fn matches_filter(&self, document: &Document, filter: &SearchFilter) -> Result<bool> {
if let Some(must_conditions) = &filter.must {
for condition in must_conditions {
let metadata_value = document.metadata.get(&condition.key);
match &condition.r#match {
crate::types::MatchCondition::Value { value } => {
if metadata_value != Some(value) {
return Ok(false);
}
}
crate::types::MatchCondition::Any { any } => {
if let Some(meta_value) = metadata_value {
if !any.contains(meta_value) {
return Ok(false);
}
} else {
return Ok(false);
}
}
crate::types::MatchCondition::Range { gte, lte } => {
if let Some(meta_value) = metadata_value {
if let Some(num_value) = meta_value.as_f64() {
if let Some(gte_val) = gte {
if num_value < *gte_val {
return Ok(false);
}
}
if let Some(lte_val) = lte {
if num_value > *lte_val {
return Ok(false);
}
}
} else {
return Ok(false);
}
} else {
return Ok(false);
}
}
}
}
}
Ok(true)
}
}
#[async_trait]
impl VectorStore for LocalFileVectorStore {
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self) -> Result<()> {
tokio::fs::create_dir_all(&self.base_path).await?;
if let Ok(entries) = tokio::fs::read_dir(&self.base_path).await {
let mut entries = entries;
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if let Some(file_name) = path.file_name() {
if let Some(file_str) = file_name.to_str() {
if file_str.ends_with("_metadata.json") {
let collection_name = file_str.replace("_metadata.json", "");
let _ = self.load_collection(&collection_name).await;
}
}
}
}
}
let mut initialized = self.initialized.write().await;
*initialized = true;
Ok(())
}
async fn is_initialized(&self) -> bool {
*self.initialized.read().await
}
async fn set_dimensions(&self, dimensions: usize) -> Result<()> {
let mut dims = self.dimensions.write().await;
*dims = Some(dimensions);
Ok(())
}
async fn add_document(&self, collection_name: &str, document: Document) -> Result<String> {
self.ensure_initialized().await?;
let doc_id = if document.id.is_empty() {
Uuid::new_v4().to_string()
} else {
document.id.clone()
};
let mut doc_to_insert = document;
doc_to_insert.id = doc_id.clone();
doc_to_insert.updated_at = Utc::now();
{
let mut documents = self.documents.write().await;
let collection_docs = documents.entry(collection_name.to_string())
.or_insert_with(HashMap::new);
collection_docs.insert(doc_id.clone(), doc_to_insert);
}
{
let mut metadata = self.metadata.write().await;
let documents = self.documents.read().await;
let doc_count = documents.get(collection_name).map(|d| d.len()).unwrap_or(0);
let collection_metadata = metadata.entry(collection_name.to_string())
.or_insert_with(|| FileCollectionMetadata {
name: collection_name.to_string(),
vector_size: 1024, distance_metric: "Cosine".to_string(),
created_at: Utc::now(),
document_count: 0,
});
collection_metadata.document_count = doc_count;
}
self.save_collection(collection_name).await?;
Ok(doc_id)
}
async fn add_documents(&self, collection_name: &str, documents: Vec<Document>) -> Result<Vec<String>> {
let mut ids = Vec::new();
for document in documents {
let id = self.add_document(collection_name, document).await?;
ids.push(id);
}
Ok(ids)
}
async fn search(
&self,
collection_name: &str,
query_vector: Vec<f32>,
options: SearchOptions,
) -> Result<Vec<SearchResult>> {
self.ensure_initialized().await?;
let limit = options.limit.unwrap_or(10);
let score_threshold = options.score_threshold.unwrap_or(0.0);
let candidates = self.simple_search(collection_name, &query_vector, limit * 2, score_threshold).await?;
let documents = self.documents.read().await;
let collection_docs = documents.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let mut results = Vec::new();
for (doc_id, score) in candidates {
if let Some(document) = collection_docs.get(&doc_id) {
if let Some(filter) = &options.filter {
if !self.matches_filter(document, filter).await? {
continue;
}
}
let result = SearchResult {
id: doc_id,
score,
document: Some(document.clone()),
payload: Some(document.metadata.clone().into_iter().collect()),
};
results.push(result);
if results.len() >= limit {
break;
}
}
}
Ok(results)
}
async fn get_document(&self, collection_name: &str, id: &str) -> Result<Option<Document>> {
self.ensure_initialized().await?;
let documents = self.documents.read().await;
let collection_docs = documents.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
Ok(collection_docs.get(id).cloned())
}
async fn update_document(&self, collection_name: &str, id: &str, mut document: Document) -> Result<()> {
self.ensure_initialized().await?;
document.id = id.to_string();
document.updated_at = Utc::now();
{
let mut documents = self.documents.write().await;
let collection_docs = documents.get_mut(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
collection_docs.insert(id.to_string(), document);
}
self.save_collection(collection_name).await?;
Ok(())
}
async fn delete_document(&self, collection_name: &str, id: &str) -> Result<bool> {
self.ensure_initialized().await?;
let existed = {
let mut documents = self.documents.write().await;
let collection_docs = documents.get_mut(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
collection_docs.remove(id).is_some()
};
if existed {
self.save_collection(collection_name).await?;
}
Ok(existed)
}
async fn list_documents(
&self,
collection_name: &str,
limit: Option<usize>,
filter: Option<SearchFilter>,
) -> Result<Vec<Document>> {
self.ensure_initialized().await?;
let documents = self.documents.read().await;
let collection_docs = documents.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let mut results = Vec::new();
let limit = limit.unwrap_or(50);
for document in collection_docs.values() {
if let Some(filter) = &filter {
if !self.matches_filter(document, filter).await? {
continue;
}
}
results.push(document.clone());
if results.len() >= limit {
break;
}
}
Ok(results)
}
async fn create_collection(&self, name: &str, vector_size: usize) -> Result<()> {
let metadata = FileCollectionMetadata {
name: name.to_string(),
vector_size,
distance_metric: "Cosine".to_string(),
created_at: Utc::now(),
document_count: 0,
};
{
let mut metadata_map = self.metadata.write().await;
metadata_map.insert(name.to_string(), metadata);
}
{
let mut documents = self.documents.write().await;
documents.insert(name.to_string(), HashMap::new());
}
self.save_collection(name).await?;
Ok(())
}
async fn delete_collection(&self, name: &str) -> Result<bool> {
let existed = {
let mut metadata_map = self.metadata.write().await;
let mut documents = self.documents.write().await;
let meta_existed = metadata_map.remove(name).is_some();
let docs_existed = documents.remove(name).is_some();
meta_existed || docs_existed
};
if existed {
let docs_path = self.get_documents_path(name);
let meta_path = self.get_metadata_path(name);
let _ = tokio::fs::remove_file(docs_path).await;
let _ = tokio::fs::remove_file(meta_path).await;
}
Ok(existed)
}
async fn list_collections(&self) -> Result<Vec<String>> {
let metadata = self.metadata.read().await;
Ok(metadata.keys().cloned().collect())
}
async fn get_collection_info(&self, name: &str) -> Result<Option<CollectionInfo>> {
let metadata = self.metadata.read().await;
let documents = self.documents.read().await;
if let Some(meta) = metadata.get(name) {
let points_count = documents.get(name).map(|d| d.len()).unwrap_or(0);
let info = CollectionInfo {
name: name.to_string(),
vector_size: meta.vector_size,
distance: meta.distance_metric.clone(),
points_count,
segments_count: Some(1),
disk_data_size: None,
ram_data_size: None,
};
Ok(Some(info))
} else {
Ok(None)
}
}
async fn scroll_collection(
&self,
collection_name: &str,
filter: Option<SearchFilter>,
limit: Option<usize>,
) -> Result<Vec<SearchResult>> {
let documents = self.list_documents(collection_name, limit, filter).await?;
let results = documents
.into_iter()
.map(|doc| SearchResult {
id: doc.id.clone(),
score: 1.0, document: Some(doc.clone()),
payload: Some(doc.metadata.into_iter().collect()),
})
.collect();
Ok(results)
}
async fn get_collections_health(&self) -> Result<HashMap<String, CollectionHealth>> {
let metadata = self.metadata.read().await;
let documents = self.documents.read().await;
let mut health_info = HashMap::new();
for (name, _meta) in metadata.iter() {
let points_count = documents.get(name).map(|d| d.len()).unwrap_or(0);
let health = CollectionHealth {
name: name.clone(),
status: "green".to_string(),
points_count,
segments_count: 1,
disk_size: 0, ram_size: 0,
last_updated: Utc::now(),
};
health_info.insert(name.clone(), health);
}
Ok(health_info)
}
async fn shutdown(&self) -> Result<()> {
let metadata = self.metadata.read().await;
for name in metadata.keys() {
let _ = self.save_collection(name).await;
}
let mut initialized = self.initialized.write().await;
*initialized = false;
Ok(())
}
async fn clear_document_cache(&self) -> Result<()> {
info!("📝 LocalFileVectorStore: no cache to clear (always loads from disk)");
Ok(())
}
async fn disable_optimizer(&self, _collection_name: &str) -> Result<()> {
Ok(())
}
async fn enable_optimizer(&self, _collection_name: &str) -> Result<()> {
Ok(())
}
}