use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde::{Serialize, Deserialize};
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use chrono::Utc;
use tracing::{warn, info};
use std::time::Instant;
use crate::types::{Document, SearchOptions, SearchResult, SearchFilter};
use crate::services::EncryptionService;
use super::vector_store::{VectorStore, CollectionInfo, CollectionHealth};
use super::vector_store::utils::{cosine_similarity, generate_dummy_vector};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionConfig {
pub name: String,
pub dimensions: usize,
pub distance_metric: String,
}
impl Default for CollectionConfig {
fn default() -> Self {
Self {
name: "default".to_string(),
dimensions: 1024,
distance_metric: "Cosine".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HnswConfig {
pub m: usize,
pub ef_construction: usize,
pub ef_search: usize,
pub max_connections: usize,
}
impl Default for HnswConfig {
fn default() -> Self {
Self {
m: 16,
ef_construction: 200,
ef_search: 50,
max_connections: 16,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorIndexEntry {
#[serde(rename = "vectorId")]
pub vector_id: String,
#[serde(rename = "documentId")]
pub document_id: String,
pub position: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorIndex {
pub vectors: Vec<VectorIndexEntry>,
pub dimensions: usize,
pub count: usize,
#[serde(rename = "lastUpdated")]
pub last_updated: String,
}
#[derive(Debug, Clone)]
pub struct CollectionFiles {
pub metadata_file: PathBuf,
pub documents_file: PathBuf,
pub vectors_file: PathBuf,
pub vector_index_file: PathBuf,
pub user_id: String,
}
#[derive(Debug, Clone)]
pub struct CacheEntry<T> {
pub data: T,
pub last_accessed: Instant,
}
#[derive(Debug)]
pub struct LruCache<T> {
entries: HashMap<String, CacheEntry<T>>,
access_order: VecDeque<String>,
max_size: usize,
}
impl<T> LruCache<T> {
pub fn new(max_size: usize) -> Self {
Self {
entries: HashMap::new(),
access_order: VecDeque::new(),
max_size,
}
}
pub fn get(&mut self, key: &str) -> Option<&T> {
if let Some(entry) = self.entries.get_mut(key) {
entry.last_accessed = Instant::now();
self.access_order.retain(|k| k != key);
self.access_order.push_back(key.to_string());
Some(&entry.data)
} else {
None
}
}
pub fn insert(&mut self, key: String, data: T) {
if self.entries.len() >= self.max_size {
if let Some(oldest_key) = self.access_order.pop_front() {
self.entries.remove(&oldest_key);
}
}
let entry = CacheEntry {
data,
last_accessed: Instant::now(),
};
if self.entries.contains_key(&key) {
self.access_order.retain(|k| k != &key);
}
self.entries.insert(key.clone(), entry);
self.access_order.push_back(key);
}
pub fn remove(&mut self, key: &str) -> Option<T> {
if let Some(entry) = self.entries.remove(key) {
self.access_order.retain(|k| k != key);
Some(entry.data)
} else {
None
}
}
pub fn clear(&mut self) {
self.entries.clear();
self.access_order.clear();
}
pub fn len(&self) -> usize {
self.entries.len()
}
}
pub struct EmbeddedQdrantVectorStore {
base_path: PathBuf,
data_path: PathBuf,
encryption_service: Arc<EncryptionService>,
collections: Arc<RwLock<HashMap<String, CollectionConfig>>>,
collection_files: Arc<RwLock<HashMap<String, CollectionFiles>>>,
hnsw_config: HnswConfig,
document_cache: Arc<RwLock<LruCache<HashMap<String, Document>>>>, vector_cache: Arc<RwLock<LruCache<Vec<Vec<f32>>>>>, index_cache: Arc<RwLock<LruCache<VectorIndex>>>,
file_locks: Arc<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
dimensions: Arc<RwLock<Option<usize>>>,
initialized: Arc<RwLock<bool>>,
current_user_context: Arc<RwLock<Option<String>>>,
}
impl EmbeddedQdrantVectorStore {
pub async fn new(
base_path: impl AsRef<Path>,
encryption_service: Arc<EncryptionService>,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
let data_path = base_path.join("qdrant-data");
let mut collections = HashMap::new();
collections.insert(
"chat_history".to_string(),
CollectionConfig {
name: "chat_history".to_string(),
dimensions: 1, distance_metric: "Cosine".to_string(),
},
);
collections.insert(
"aws_estate".to_string(),
CollectionConfig {
name: "aws_estate".to_string(),
dimensions: 1024, distance_metric: "Cosine".to_string(),
},
);
if data_path.exists() {
if let Ok(entries) = tokio::fs::read_dir(&data_path).await {
let mut dir_entries = entries;
while let Ok(Some(entry)) = dir_entries.next_entry().await {
if let Ok(path) = entry.path().canonicalize() {
if path.is_dir() {
if let Ok(user_entries) = tokio::fs::read_dir(&path).await {
let mut user_dir = user_entries;
while let Ok(Some(user_entry)) = user_dir.next_entry().await {
let file_name = user_entry.file_name();
let file_name_str = file_name.to_string_lossy();
if file_name_str.ends_with("-documents.json") {
let collection_name = file_name_str
.strip_suffix("-documents.json")
.unwrap()
.to_string();
if !collections.contains_key(&collection_name) {
info!("📁 Auto-discovered collection: {}", collection_name);
collections.insert(
collection_name.clone(),
CollectionConfig {
name: collection_name.clone(),
dimensions: 1024, distance_metric: "Cosine".to_string(),
},
);
}
}
}
}
}
}
}
}
}
Ok(Self {
base_path,
data_path,
encryption_service,
collections: Arc::new(RwLock::new(collections)),
collection_files: Arc::new(RwLock::new(HashMap::new())),
hnsw_config: HnswConfig::default(),
document_cache: Arc::new(RwLock::new(LruCache::new(100))), vector_cache: Arc::new(RwLock::new(LruCache::new(50))), index_cache: Arc::new(RwLock::new(LruCache::new(100))), file_locks: Arc::new(std::sync::Mutex::new(HashMap::new())), dimensions: Arc::new(RwLock::new(None)),
initialized: Arc::new(RwLock::new(false)),
current_user_context: Arc::new(RwLock::new(None)),
})
}
async fn ensure_initialized(&self) -> Result<()> {
if !*self.initialized.read().await {
return Err(anyhow!("Vector store not initialized"));
}
Ok(())
}
pub async fn set_user_context(&self, user_id: &str) {
let mut context = self.current_user_context.write().await;
*context = Some(user_id.to_string());
}
async fn initialize_user_collection_files(&self, user_id: &str) -> Result<()> {
let collections = self.collections.read().await;
let mut collection_files = self.collection_files.write().await;
let user_dir = self.data_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
for (name, _) in collections.iter() {
let collection_key = format!("{}-{}", user_id, name);
let files = CollectionFiles {
metadata_file: user_dir.join(format!("{}-metadata.json", name)),
documents_file: user_dir.join(format!("{}-documents.json", name)),
vectors_file: user_dir.join(format!("{}-vectors.bin", name)),
vector_index_file: user_dir.join(format!("{}-vector-index.json", name)),
user_id: user_id.to_string(),
};
collection_files.insert(collection_key, files);
}
Ok(())
}
async fn load_existing_data(&self) -> Result<()> {
info!("Initialized with lazy loading - collections will load on first access");
Ok(())
}
async fn get_collection_documents(&self, collection_name: &str) -> Result<HashMap<String, Document>> {
if collection_name == "chat_history" {
{
let mut cache = self.document_cache.write().await;
cache.remove(collection_name);
}
let documents = self.load_documents(collection_name).await?;
let mut doc_map = HashMap::new();
for doc in documents {
doc_map.insert(doc.id.clone(), doc);
}
{
let mut cache = self.document_cache.write().await;
cache.insert(collection_name.to_string(), doc_map.clone());
}
return Ok(doc_map);
}
{
let mut cache = self.document_cache.write().await;
if let Some(docs) = cache.get(collection_name) {
return Ok(docs.clone());
}
}
let documents = self.load_documents(collection_name).await?;
let mut doc_map = HashMap::new();
for doc in documents {
doc_map.insert(doc.id.clone(), doc);
}
{
let mut cache = self.document_cache.write().await;
cache.insert(collection_name.to_string(), doc_map.clone());
}
Ok(doc_map)
}
async fn load_documents(&self, collection_name: &str) -> Result<Vec<Document>> {
if let Some(user_id) = self.current_user_context.read().await.as_ref() {
return self.load_user_collection_documents(user_id, collection_name).await;
}
Ok(Vec::new())
}
async fn save_documents(&self, collection_name: &str, documents: &[Document]) -> Result<()> {
warn!("save_documents is deprecated - collections should use user-specific encrypted storage");
Ok(())
}
async fn save_user_collection_document(&self, collection_name: &str, user_id: &str, document: &Document) -> Result<()> {
let user_dir = self.data_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let collection_file = user_dir.join(format!("{}-documents.json", collection_name));
let mut existing_docs: Vec<Document> = if collection_file.exists() {
let content = tokio::fs::read_to_string(&collection_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
serde_json::from_value(documents.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
existing_docs.retain(|d| d.id != document.id);
existing_docs.push(document.clone());
#[derive(serde::Serialize)]
struct DocumentsWrapper<'a> {
documents: &'a Vec<Document>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = DocumentsWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: chrono::Utc::now().to_rfc3339(),
};
let content = serde_json::to_string_pretty(&wrapper)?;
tokio::fs::write(&collection_file, content).await?;
Ok(())
}
async fn save_user_collection_documents_batch(&self, collection_name: &str, user_id: &str, documents: &[Document]) -> Result<()> {
if documents.is_empty() {
return Ok(());
}
let batch_start = std::time::Instant::now();
let user_dir = self.data_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let collection_file = user_dir.join(format!("{}-documents.json", collection_name));
let mut existing_docs: Vec<Document> = if collection_file.exists() {
let content = tokio::fs::read_to_string(&collection_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(docs) = wrapper.get("documents") {
serde_json::from_value(docs.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
let original_count = existing_docs.len();
let new_ids: std::collections::HashSet<String> = documents.iter().map(|d| d.id.clone()).collect();
existing_docs.retain(|d| !new_ids.contains(&d.id));
existing_docs.extend_from_slice(documents);
#[derive(serde::Serialize)]
struct DocumentsWrapper<'a> {
documents: &'a Vec<Document>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = DocumentsWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: chrono::Utc::now().to_rfc3339(),
};
let content = serde_json::to_string_pretty(&wrapper)?;
tokio::fs::write(&collection_file, content).await?;
let duration = batch_start.elapsed();
info!("✅ EMBEDDED BATCH SAVE SUCCESS: Saved {} documents to local file in {:?} (was: {}, now: {})",
documents.len(), duration, original_count, existing_docs.len());
Ok(())
}
async fn load_user_collection_documents(&self, user_id: &str, collection_name: &str) -> Result<Vec<Document>> {
let user_dir = self.data_path.join(user_id);
let collection_file = user_dir.join(format!("{}-documents.json", collection_name));
if !collection_file.exists() {
return Ok(Vec::new());
}
let content = tokio::fs::read_to_string(&collection_file).await?;
if content.trim().is_empty() {
return Ok(Vec::new());
}
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
Ok(serde_json::from_value(documents.clone()).unwrap_or_default())
} else {
Ok(serde_json::from_str(&content).unwrap_or_default())
}
} else {
Ok(Vec::new())
}
}
async fn load_user_vector_index(&self, user_id: &str, collection_name: &str) -> Result<VectorIndex> {
let collection_key = format!("{}-{}", user_id, collection_name);
let collection_files = self.collection_files.read().await;
let files = collection_files.get(&collection_key)
.ok_or_else(|| anyhow!("Collection {} for user {} not found", collection_name, user_id))?;
if !files.vector_index_file.exists() {
return Ok(VectorIndex {
vectors: Vec::new(),
dimensions: 1024, count: 0,
last_updated: Utc::now().to_rfc3339(),
});
}
let content = tokio::fs::read_to_string(&files.vector_index_file).await?;
let index: VectorIndex = serde_json::from_str(&content)?;
Ok(index)
}
async fn save_user_vector_index(&self, user_id: &str, collection_name: &str, index: &VectorIndex) -> Result<()> {
let collection_key = format!("{}-{}", user_id, collection_name);
let collection_files = self.collection_files.read().await;
let files = collection_files.get(&collection_key)
.ok_or_else(|| anyhow!("Collection {} for user {} not found", collection_name, user_id))?;
let content = serde_json::to_string_pretty(index)?;
tokio::fs::write(&files.vector_index_file, content).await?;
Ok(())
}
async fn save_vectors_binary(&self, collection_name: &str, vectors: &[Vec<f32>]) -> Result<()> {
let collection_files = self.collection_files.read().await;
let files = collection_files.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let mut binary_data = Vec::new();
binary_data.extend_from_slice(&(vectors.len() as u32).to_le_bytes());
if !vectors.is_empty() {
binary_data.extend_from_slice(&(vectors[0].len() as u32).to_le_bytes());
} else {
binary_data.extend_from_slice(&0u32.to_le_bytes());
}
for vector in vectors {
for &value in vector {
binary_data.extend_from_slice(&value.to_le_bytes());
}
}
tokio::fs::write(&files.vectors_file, binary_data).await?;
Ok(())
}
async fn load_vectors_binary(&self, collection_name: &str) -> Result<Vec<Vec<f32>>> {
let collection_files = self.collection_files.read().await;
let files = collection_files.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
if !files.vectors_file.exists() {
return Ok(Vec::new());
}
let binary_data = tokio::fs::read(&files.vectors_file).await?;
if binary_data.len() < 8 {
return Ok(Vec::new());
}
let mut offset = 0;
let vector_count = u32::from_le_bytes([
binary_data[offset], binary_data[offset + 1],
binary_data[offset + 2], binary_data[offset + 3]
]) as usize;
offset += 4;
let dimensions = u32::from_le_bytes([
binary_data[offset], binary_data[offset + 1],
binary_data[offset + 2], binary_data[offset + 3]
]) as usize;
offset += 4;
let mut vectors = Vec::new();
for _ in 0..vector_count {
let mut vector = Vec::new();
for _ in 0..dimensions {
if offset + 4 <= binary_data.len() {
let value = f32::from_le_bytes([
binary_data[offset], binary_data[offset + 1],
binary_data[offset + 2], binary_data[offset + 3]
]);
vector.push(value);
offset += 4;
}
}
if vector.len() == dimensions {
vectors.push(vector);
}
}
Ok(vectors)
}
pub async fn get_memory_usage(&self) -> (usize, usize, usize) {
let doc_cache = self.document_cache.read().await;
let vec_cache = self.vector_cache.read().await;
let idx_cache = self.index_cache.read().await;
let doc_count = doc_cache.len();
let vec_count = vec_cache.len();
let idx_count = idx_cache.len();
(doc_count, vec_count, idx_count)
}
pub async fn clear_caches(&self) {
{
let mut doc_cache = self.document_cache.write().await;
doc_cache.clear();
}
{
let mut vec_cache = self.vector_cache.write().await;
vec_cache.clear();
}
{
let mut idx_cache = self.index_cache.write().await;
idx_cache.clear();
}
info!("All caches cleared to free memory");
}
async fn save_user_vectors_binary(&self, user_id: &str, collection_name: &str, vectors: &[Vec<f32>]) -> Result<()> {
let collection_key = format!("{}-{}", user_id, collection_name);
let collection_files = self.collection_files.read().await;
let files = collection_files.get(&collection_key)
.ok_or_else(|| anyhow!("Collection {} for user {} not found", collection_name, user_id))?;
let mut binary_data = Vec::new();
binary_data.extend_from_slice(&(vectors.len() as u32).to_le_bytes());
if !vectors.is_empty() {
binary_data.extend_from_slice(&(vectors[0].len() as u32).to_le_bytes());
} else {
binary_data.extend_from_slice(&0u32.to_le_bytes());
}
for vector in vectors {
for &value in vector {
binary_data.extend_from_slice(&value.to_le_bytes());
}
}
tokio::fs::write(&files.vectors_file, binary_data).await?;
Ok(())
}
async fn save_user_collection_metadata(&self, user_id: &str, collection_name: &str) -> Result<()> {
let collection_key = format!("{}-{}", user_id, collection_name);
let collection_files = self.collection_files.read().await;
let files = collection_files.get(&collection_key)
.ok_or_else(|| anyhow!("Collection {} for user {} not found", collection_name, user_id))?;
let collections = self.collections.read().await;
let config = collections.get(collection_name)
.ok_or_else(|| anyhow!("Collection config {} not found", collection_name))?;
let collection_docs = self.get_collection_documents(collection_name).await?;
let metadata = serde_json::json!({
"collection_name": collection_name,
"user_id": user_id,
"dimensions": config.dimensions,
"distance_metric": config.distance_metric,
"document_count": collection_docs.len(),
"created_at": Utc::now().to_rfc3339(),
"hnsw_config": {
"m": self.hnsw_config.m,
"ef_construction": self.hnsw_config.ef_construction,
"ef_search": self.hnsw_config.ef_search,
"max_connections": self.hnsw_config.max_connections
}
});
let content = serde_json::to_string_pretty(&metadata)?;
tokio::fs::write(&files.metadata_file, content).await?;
Ok(())
}
async fn save_collection_metadata(&self, collection_name: &str) -> Result<()> {
let collection_files = self.collection_files.read().await;
let files = collection_files.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let collections = self.collections.read().await;
let config = collections.get(collection_name)
.ok_or_else(|| anyhow!("Collection config {} not found", collection_name))?;
let collection_docs = self.get_collection_documents(collection_name).await?;
let metadata = serde_json::json!({
"collection_name": collection_name,
"dimensions": config.dimensions,
"distance_metric": config.distance_metric,
"document_count": collection_docs.len(),
"created_at": Utc::now().to_rfc3339(),
"hnsw_config": {
"m": self.hnsw_config.m,
"ef_construction": self.hnsw_config.ef_construction,
"ef_search": self.hnsw_config.ef_search,
"max_connections": self.hnsw_config.max_connections
}
});
let content = serde_json::to_string_pretty(&metadata)?;
tokio::fs::write(&files.metadata_file, content).await?;
Ok(())
}
async fn generate_user_vector_files(&self, user_id: &str, collection_name: &str) -> Result<()> {
let collection_docs = self.get_collection_documents(collection_name).await?;
let collection_key = format!("{}-{}", user_id, collection_name);
let collections = self.collections.read().await;
let collection_config = collections.get(collection_name)
.ok_or_else(|| anyhow!("Collection config {} not found", collection_name))?;
let dimensions = collection_config.dimensions;
let mut vectors = Vec::new();
let mut vector_entries = Vec::new();
for (position, (doc_id, document)) in collection_docs.iter().enumerate() {
if let Some(embedding) = &document.embedding {
let vector_id = Uuid::new_v4().to_string();
vectors.push(embedding.clone());
vector_entries.push(VectorIndexEntry {
vector_id,
document_id: doc_id.clone(),
position,
});
}
}
let vector_index = VectorIndex {
vectors: vector_entries,
dimensions,
count: vectors.len(),
last_updated: Utc::now().to_rfc3339(),
};
{
let mut vec_cache = self.vector_cache.write().await;
vec_cache.insert(collection_key.clone(), vectors.clone());
}
{
let mut idx_cache = self.index_cache.write().await;
idx_cache.insert(collection_key, vector_index.clone());
}
self.save_user_vectors_binary(user_id, collection_name, &vectors).await?;
self.save_user_vector_index(user_id, collection_name, &vector_index).await?;
self.save_user_collection_metadata(user_id, collection_name).await?;
info!(
"Generated vector files for user '{}' collection '{}': {} vectors saved",
user_id, collection_name, vectors.len()
);
Ok(())
}
async fn hnsw_search(
&self,
collection_name: &str,
query_embedding: &[f32],
limit: usize,
) -> Result<Vec<(String, f32)>> {
let user_id = if let Some(context) = self.current_user_context.read().await.as_ref() {
context.clone()
} else {
return Err(anyhow!("No user context set for vector search"));
};
let documents = self.load_user_collection_documents(&user_id, collection_name).await?;
let mut collection_docs = HashMap::new();
for doc in documents {
collection_docs.insert(doc.id.clone(), doc);
}
let mut candidates: Vec<(String, f32)> = Vec::new();
for (doc_id, document) in collection_docs.iter() {
if let Some(doc_embedding) = &document.embedding {
let similarity = cosine_similarity(query_embedding, doc_embedding)?;
candidates.push((doc_id.clone(), similarity));
}
}
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
candidates.truncate(limit);
Ok(candidates)
}
async fn matches_filter(&self, document: &Document, filter: &SearchFilter) -> Result<bool> {
if let Some(must_conditions) = &filter.must {
for condition in must_conditions {
let metadata_value = document.metadata.get(&condition.key);
match &condition.r#match {
crate::types::MatchCondition::Value { value } => {
if metadata_value != Some(value) {
return Ok(false);
}
}
crate::types::MatchCondition::Any { any } => {
if let Some(meta_value) = metadata_value {
if !any.contains(meta_value) {
return Ok(false);
}
} else {
return Ok(false);
}
}
crate::types::MatchCondition::Range { gte, lte } => {
if let Some(meta_value) = metadata_value {
if let Some(num_value) = meta_value.as_f64() {
if let Some(gte_val) = gte {
if num_value < *gte_val {
return Ok(false);
}
}
if let Some(lte_val) = lte {
if num_value > *lte_val {
return Ok(false);
}
}
} else {
return Ok(false);
}
} else {
return Ok(false);
}
}
}
}
}
Ok(true)
}
}
#[async_trait]
impl VectorStore for EmbeddedQdrantVectorStore {
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self) -> Result<()> {
tokio::fs::create_dir_all(&self.base_path).await?;
self.load_existing_data().await?;
let mut initialized = self.initialized.write().await;
*initialized = true;
info!("EmbeddedQdrantVectorStore initialized with user-specific directories");
Ok(())
}
async fn is_initialized(&self) -> bool {
*self.initialized.read().await
}
async fn set_dimensions(&self, dimensions: usize) -> Result<()> {
let mut dims = self.dimensions.write().await;
*dims = Some(dimensions);
Ok(())
}
async fn add_document(&self, collection_name: &str, document: Document) -> Result<String> {
let user_id = if let Some(context) = self.current_user_context.read().await.as_ref() {
context.clone()
} else {
return Err(anyhow!("No user context set. Call set_user_context() before adding documents."));
};
self.initialize_user_collection_files(&user_id).await?;
let (id, _) = self.add_document_internal(collection_name, document, false, false).await?;
if let Err(e) = self.generate_user_vector_files(&user_id, collection_name).await {
warn!("Failed to generate vector files for user {} collection {}: {}", user_id, collection_name, e);
}
Ok(id)
}
async fn add_documents(&self, collection_name: &str, documents: Vec<Document>) -> Result<Vec<String>> {
let mut ids = Vec::new();
let mut processed_docs = Vec::new();
let user_id = if let Some(context) = self.current_user_context.read().await.as_ref() {
context.clone()
} else {
return Err(anyhow!("No user context set. Call set_user_context() before adding documents."));
};
self.initialize_user_collection_files(&user_id).await?;
let file_key = format!("{}_{}", user_id, collection_name);
let file_lock = {
let mut locks = self.file_locks.lock().unwrap();
locks.entry(file_key.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
};
info!("🔒 Acquiring file lock for batch operation: {}", file_key);
let _guard = file_lock.lock().await;
info!("✅ File lock acquired: {}", file_key);
info!("📦 Processing {} documents for batch save...", documents.len());
for document in documents {
let (id, processed_doc) = self.add_document_internal(collection_name, document, false, true).await?;
ids.push(id);
processed_docs.push(processed_doc);
}
info!("💾 Batch saving {} documents to local file...", processed_docs.len());
self.save_user_collection_documents_batch(collection_name, &user_id, &processed_docs).await?;
info!("🗑️ Invalidating document cache for collection: {}", collection_name);
{
let mut cache = self.document_cache.write().await;
cache.remove(collection_name);
}
info!("🔢 Generating vector files for {} documents...", processed_docs.len());
if let Err(e) = self.generate_user_vector_files(&user_id, collection_name).await {
warn!("Failed to generate vector files for user {} collection {}: {}", user_id, collection_name, e);
}
info!("🔓 Releasing file lock: {}", file_key);
Ok(ids)
}
async fn search(
&self,
collection_name: &str,
query_vector: Vec<f32>,
options: SearchOptions,
) -> Result<Vec<SearchResult>> {
self.ensure_initialized().await?;
let limit = options.limit.unwrap_or(10);
let score_threshold = options.score_threshold.unwrap_or(0.0);
info!("🔍 ========== SEARCH FLOW START ==========");
info!(" Collection: {}", collection_name);
info!(" Limit: {}, Score Threshold: {}", limit, score_threshold);
let collection_docs = self.get_collection_documents(collection_name).await?;
let total_docs = collection_docs.len();
info!(" 📚 Total documents in collection: {}", total_docs);
let filtered_doc_ids: Vec<String> = if let Some(filter) = &options.filter {
info!(" 🎯 STEP 1: PRE-FILTERING");
if let Some(must_conditions) = &filter.must {
info!(" Filter conditions ({} conditions):", must_conditions.len());
for condition in must_conditions {
match &condition.r#match {
crate::types::MatchCondition::Value { value } => {
info!(" - {} = {:?}", condition.key, value);
}
_ => {
info!(" - {} (complex condition)", condition.key);
}
}
}
}
let mut filtered_ids = Vec::new();
let filter_start = std::time::Instant::now();
for (doc_id, document) in collection_docs.iter() {
if self.matches_filter(document, filter).await? {
filtered_ids.push(doc_id.clone());
}
}
let filter_duration = filter_start.elapsed();
info!(" ✅ PRE-FILTER complete: {} → {} documents (filtered out: {}) in {:?}",
total_docs, filtered_ids.len(), total_docs - filtered_ids.len(), filter_duration);
if filtered_ids.is_empty() {
warn!(" ⚠️ No documents match the filter criteria!");
}
filtered_ids
} else {
info!(" ⏭️ STEP 1: No filter provided, using all {} documents", total_docs);
collection_docs.keys().cloned().collect()
};
info!(" 🧠 STEP 2: SEMANTIC SEARCH on {} filtered documents", filtered_doc_ids.len());
let mut candidates: Vec<(String, f32)> = Vec::new();
let search_start = std::time::Instant::now();
for doc_id in filtered_doc_ids {
if let Some(document) = collection_docs.get(&doc_id) {
if let Some(doc_embedding) = &document.embedding {
let similarity = cosine_similarity(&query_vector, doc_embedding)?;
if similarity >= score_threshold {
candidates.push((doc_id, similarity));
}
}
}
}
let search_duration = search_start.elapsed();
info!(" ✅ SEMANTIC SEARCH complete: {} candidates above threshold in {:?}",
candidates.len(), search_duration);
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
let results_before_limit = candidates.len();
candidates.truncate(limit);
info!(" 📊 STEP 3: RANKING & LIMITING");
info!(" Sorted by similarity, limited to top {} (from {} candidates)",
candidates.len(), results_before_limit);
if !candidates.is_empty() {
info!(" Top results:");
for (idx, (doc_id, score)) in candidates.iter().enumerate().take(3) {
info!(" {}. {} (score: {:.4})", idx + 1, doc_id, score);
}
if candidates.len() > 3 {
info!(" ... and {} more", candidates.len() - 3);
}
}
let mut results = Vec::new();
for (doc_id, score) in candidates {
if let Some(document) = collection_docs.get(&doc_id) {
let mut payload: HashMap<String, serde_json::Value> = document.metadata.clone().into_iter().collect();
payload.insert("content".to_string(), serde_json::Value::String(document.content.clone()));
let result = SearchResult {
id: doc_id,
score,
document: Some(document.clone()),
payload: Some(payload),
};
results.push(result);
}
}
info!("✅ ========== SEARCH FLOW COMPLETE: {} results returned ==========\n", results.len());
Ok(results)
}
async fn get_document(&self, collection_name: &str, id: &str) -> Result<Option<Document>> {
self.ensure_initialized().await?;
let collection_docs = self.get_collection_documents(collection_name).await?;
Ok(collection_docs.get(id).cloned())
}
async fn update_document(&self, collection_name: &str, id: &str, mut document: Document) -> Result<()> {
self.ensure_initialized().await?;
document.id = id.to_string();
document.updated_at = Utc::now();
let mut collection_docs = self.get_collection_documents(collection_name).await?;
collection_docs.insert(id.to_string(), document);
{
let mut cache = self.document_cache.write().await;
cache.insert(collection_name.to_string(), collection_docs.clone());
}
let docs: Vec<Document> = collection_docs.values().cloned().collect();
self.save_documents(collection_name, &docs).await?;
Ok(())
}
async fn delete_document(&self, collection_name: &str, id: &str) -> Result<bool> {
self.ensure_initialized().await?;
let mut collection_docs = self.get_collection_documents(collection_name).await?;
let existed = collection_docs.remove(id).is_some();
if existed {
{
let mut cache = self.document_cache.write().await;
cache.insert(collection_name.to_string(), collection_docs.clone());
}
let docs: Vec<Document> = collection_docs.values().cloned().collect();
self.save_documents(collection_name, &docs).await?;
}
Ok(existed)
}
async fn list_documents(
&self,
collection_name: &str,
limit: Option<usize>,
filter: Option<SearchFilter>,
) -> Result<Vec<Document>> {
self.ensure_initialized().await?;
let collection_docs = self.get_collection_documents(collection_name).await?;
let mut results = Vec::new();
let limit = limit.unwrap_or(50);
for document in collection_docs.values() {
if let Some(filter) = &filter {
if !self.matches_filter(document, filter).await? {
continue;
}
}
results.push(document.clone());
if results.len() >= limit {
break;
}
}
Ok(results)
}
async fn create_collection(&self, name: &str, vector_size: usize) -> Result<()> {
let mut collections = self.collections.write().await;
let config = CollectionConfig {
name: name.to_string(),
dimensions: vector_size,
distance_metric: "Cosine".to_string(),
};
collections.insert(name.to_string(), config);
let mut collection_files = self.collection_files.write().await;
let base_dir = &self.base_path;
let collection_dir = base_dir.join("qdrant-data");
let files = CollectionFiles {
metadata_file: collection_dir.join(format!("{}-metadata.json", name)),
documents_file: collection_dir.join(format!("{}-documents.json", name)),
vectors_file: collection_dir.join(format!("{}-vectors.bin", name)),
vector_index_file: collection_dir.join(format!("{}-vector-index.json", name)),
user_id: "global".to_string(), };
collection_files.insert(name.to_string(), files);
info!("Created collection '{}' with user-specific directories and lazy loading", name);
Ok(())
}
async fn delete_collection(&self, name: &str) -> Result<bool> {
let mut collections = self.collections.write().await;
let existed = collections.remove(name).is_some();
if existed {
{
let mut doc_cache = self.document_cache.write().await;
doc_cache.remove(name);
}
{
let mut vec_cache = self.vector_cache.write().await;
vec_cache.remove(name);
}
{
let mut idx_cache = self.index_cache.write().await;
idx_cache.remove(name);
}
let collection_files = self.collection_files.read().await;
if let Some(files) = collection_files.get(name) {
let _ = tokio::fs::remove_file(&files.metadata_file).await;
let _ = tokio::fs::remove_file(&files.documents_file).await;
let _ = tokio::fs::remove_file(&files.vectors_file).await;
let _ = tokio::fs::remove_file(&files.vector_index_file).await;
}
}
Ok(existed)
}
async fn list_collections(&self) -> Result<Vec<String>> {
let collections = self.collections.read().await;
Ok(collections.keys().cloned().collect())
}
async fn get_collection_info(&self, name: &str) -> Result<Option<CollectionInfo>> {
let collections = self.collections.read().await;
if let Some(config) = collections.get(name) {
let collection_docs = self.get_collection_documents(name).await?;
let points_count = collection_docs.len();
let info = CollectionInfo {
name: name.to_string(),
vector_size: config.dimensions,
distance: config.distance_metric.clone(),
points_count,
segments_count: Some(1),
disk_data_size: None, ram_data_size: None,
};
Ok(Some(info))
} else {
Ok(None)
}
}
async fn scroll_collection(
&self,
collection_name: &str,
filter: Option<SearchFilter>,
limit: Option<usize>,
) -> Result<Vec<SearchResult>> {
let documents = self.list_documents(collection_name, limit, filter).await?;
let results = documents
.into_iter()
.map(|doc| SearchResult {
id: doc.id.clone(),
score: 1.0, document: Some(doc.clone()),
payload: Some(doc.metadata.into_iter().collect()),
})
.collect();
Ok(results)
}
async fn get_collections_health(&self) -> Result<HashMap<String, CollectionHealth>> {
let collections = self.collections.read().await;
let mut health_info = HashMap::new();
for (name, _config) in collections.iter() {
let collection_docs = self.get_collection_documents(name).await?;
let points_count = collection_docs.len();
let health = CollectionHealth {
name: name.clone(),
status: "green".to_string(),
points_count,
segments_count: 1,
disk_size: 0, ram_size: 0,
last_updated: Utc::now(),
};
health_info.insert(name.clone(), health);
}
Ok(health_info)
}
async fn shutdown(&self) -> Result<()> {
{
let mut doc_cache = self.document_cache.write().await;
doc_cache.clear();
}
{
let mut vec_cache = self.vector_cache.write().await;
vec_cache.clear();
}
{
let mut idx_cache = self.index_cache.write().await;
idx_cache.clear();
}
let mut initialized = self.initialized.write().await;
*initialized = false;
info!("Embedded Qdrant vector store shut down, caches cleared");
Ok(())
}
async fn clear_document_cache(&self) -> Result<()> {
info!("🗑️ Clearing document cache to force reload from disk");
{
let mut doc_cache = self.document_cache.write().await;
let cache_size = doc_cache.len();
doc_cache.clear();
info!("✅ Cleared {} collection(s) from document cache", cache_size);
}
{
let mut vec_cache = self.vector_cache.write().await;
vec_cache.clear();
info!("✅ Cleared vector cache");
}
{
let mut idx_cache = self.index_cache.write().await;
idx_cache.clear();
info!("✅ Cleared index cache");
}
Ok(())
}
async fn disable_optimizer(&self, _collection_name: &str) -> Result<()> {
info!("📝 Embedded mode: optimizer control not applicable (using local file storage)");
Ok(())
}
async fn enable_optimizer(&self, _collection_name: &str) -> Result<()> {
info!("📝 Embedded mode: optimizer control not applicable (using local file storage)");
Ok(())
}
}
impl EmbeddedQdrantVectorStore {
async fn add_document_internal(&self, collection_name: &str, document: Document, generate_files: bool, skip_save: bool) -> Result<(String, Document)> {
self.ensure_initialized().await?;
let mut collection_docs = self.get_collection_documents(collection_name).await?;
let doc_id = if document.id.is_empty() {
Uuid::new_v4().to_string()
} else {
document.id.clone()
};
let mut doc_to_insert = document;
doc_to_insert.id = doc_id.clone();
doc_to_insert.updated_at = Utc::now();
if doc_to_insert.embedding.is_none() {
let collections = self.collections.read().await;
if let Some(collection_config) = collections.get(collection_name) {
if collection_config.dimensions == 1 {
doc_to_insert.embedding = Some(generate_dummy_vector());
}
}
}
let mut stored_content = doc_to_insert.content.clone();
let mut stored_metadata = doc_to_insert.metadata.clone();
let already_encrypted = stored_metadata.get("_encrypted_content")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !stored_content.is_empty() && !already_encrypted {
match self.encryption_service.encrypt_content(&stored_content).await {
Ok(encrypted_content) => {
stored_content = encrypted_content;
stored_metadata.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
}
Err(e) => {
warn!("Failed to encrypt content: {}", e);
}
}
}
let metadata_already_encrypted = stored_metadata.contains_key("_encrypted_metadata");
if !stored_metadata.is_empty() && !metadata_already_encrypted {
let mut metadata_to_encrypt = stored_metadata.clone();
metadata_to_encrypt.shift_remove("_encrypted_content");
match serde_json::to_string(&metadata_to_encrypt) {
Ok(metadata_json) => {
match self.encryption_service.encrypt_content(&metadata_json).await {
Ok(encrypted_metadata) => {
stored_metadata.clear();
stored_metadata.insert("_encrypted_metadata".to_string(), serde_json::Value::String(encrypted_metadata));
stored_metadata.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
stored_metadata.insert("created_at".to_string(), serde_json::Value::String(Utc::now().to_rfc3339()));
stored_metadata.insert("updated_at".to_string(), serde_json::Value::String(Utc::now().to_rfc3339()));
}
Err(e) => {
warn!("Failed to encrypt metadata: {}", e);
}
}
}
Err(e) => {
warn!("Failed to serialize metadata for encryption: {}", e);
}
}
}
doc_to_insert.content = stored_content;
doc_to_insert.metadata = stored_metadata;
collection_docs.insert(doc_id.clone(), doc_to_insert.clone());
{
let mut cache = self.document_cache.write().await;
cache.insert(collection_name.to_string(), collection_docs.clone());
}
let user_id = if let Some(context) = self.current_user_context.read().await.as_ref() {
context.clone()
} else {
return Err(anyhow!("No user context set. Call set_user_context() before adding documents."));
};
if !skip_save {
self.save_user_collection_document(collection_name, &user_id, &doc_to_insert).await?;
}
if generate_files {
if let Err(e) = self.initialize_user_collection_files(&user_id).await {
warn!("Failed to initialize user collection files: {}", e);
}
if let Err(e) = self.generate_user_vector_files(&user_id, collection_name).await {
warn!("Failed to generate vector files for user {} collection {}: {}", user_id, collection_name, e);
}
}
Ok((doc_id, doc_to_insert))
}
}