use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde::{Serialize, Deserialize};
use indexmap::IndexMap;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use chrono::Utc;
use tracing::{info, warn, error};
use qdrant_client::{Qdrant, Payload};
use qdrant_client::qdrant::{
CreateCollectionBuilder, VectorParamsBuilder, Distance, PointStruct, PointId,
UpsertPointsBuilder, SearchPointsBuilder, GetPointsBuilder, DeletePointsBuilder,
ScrollPointsBuilder, Value, Filter, Condition, Range, vectors_config,
vectors, point_id,
};
use crate::types::{Document, SearchOptions, SearchResult, SearchFilter};
use crate::services::EncryptionService;
use super::vector_store::{VectorStore, CollectionInfo, CollectionHealth};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionConfig {
pub name: String,
pub dimensions: usize,
pub distance_metric: String,
}
pub struct QdrantServerVectorStore {
client: Qdrant,
base_path: PathBuf,
encryption_service: Arc<EncryptionService>,
current_user_context: Arc<RwLock<Option<String>>>,
collections: Arc<RwLock<HashMap<String, CollectionConfig>>>,
initialized: Arc<RwLock<bool>>,
file_locks: Arc<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
}
impl QdrantServerVectorStore {
pub async fn new(
qdrant_url: &str,
api_key: Option<String>,
base_path: impl AsRef<Path>,
encryption_service: Arc<EncryptionService>,
) -> Result<Self> {
let mut client_builder = Qdrant::from_url(qdrant_url);
if let Some(key) = api_key {
client_builder = client_builder.api_key(key);
}
let client = client_builder.build()?;
client.health_check().await.map_err(|e|
anyhow!("Failed to connect to Qdrant server at {}: {}", qdrant_url, e)
)?;
info!("✅ Connected to Qdrant server at: {}", qdrant_url);
let mut collections = HashMap::new();
collections.insert(
"chat_history".to_string(),
CollectionConfig {
name: "chat_history".to_string(),
dimensions: 1,
distance_metric: "Cosine".to_string(),
},
);
Ok(Self {
client,
base_path: base_path.as_ref().to_path_buf(),
encryption_service,
current_user_context: Arc::new(RwLock::new(None)),
collections: Arc::new(RwLock::new(collections)),
initialized: Arc::new(RwLock::new(false)),
file_locks: Arc::new(std::sync::Mutex::new(HashMap::new())),
})
}
pub async fn set_user_context(&self, user_id: &str) {
let mut context = self.current_user_context.write().await;
*context = Some(user_id.to_string());
}
async fn save_document_to_local_file(&self, collection_name: &str, user_id: &str, document: &Document) -> Result<()> {
let user_dir = self.base_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let collection_file = user_dir.join(format!("{}-documents.json", collection_name));
let file_key = format!("{}_{}", user_id, collection_name);
let file_lock = {
let mut locks = self.file_locks.lock().unwrap();
locks.entry(file_key).or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))).clone()
};
let _guard = file_lock.lock().await;
let mut existing_docs: Vec<Document> = if collection_file.exists() {
let content = tokio::fs::read_to_string(&collection_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
serde_json::from_value(documents.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
existing_docs.retain(|d| d.id != document.id);
existing_docs.push(document.clone());
#[derive(serde::Serialize)]
struct DocumentsWrapper<'a> {
documents: &'a Vec<Document>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = DocumentsWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: chrono::Utc::now().to_rfc3339(),
};
let content = serde_json::to_string_pretty(&wrapper)?;
tokio::fs::write(&collection_file, content).await?;
Ok(())
}
async fn save_documents_batch_to_local_file(&self, collection_name: &str, user_id: &str, documents: &[Document]) -> Result<()> {
if documents.is_empty() {
return Ok(());
}
let batch_start = std::time::Instant::now();
let user_dir = self.base_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let collection_file = user_dir.join(format!("{}-documents.json", collection_name));
let file_key = format!("{}_{}", user_id, collection_name);
let file_lock = {
let mut locks = self.file_locks.lock().unwrap();
locks.entry(file_key).or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))).clone()
};
let _guard = file_lock.lock().await;
let mut existing_docs: Vec<Document> = if collection_file.exists() {
let content = tokio::fs::read_to_string(&collection_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(docs) = wrapper.get("documents") {
serde_json::from_value(docs.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
let original_count = existing_docs.len();
let new_ids: std::collections::HashSet<String> = documents.iter().map(|d| d.id.clone()).collect();
existing_docs.retain(|d| !new_ids.contains(&d.id));
existing_docs.extend_from_slice(documents);
#[derive(serde::Serialize)]
struct DocumentsWrapper<'a> {
documents: &'a Vec<Document>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = DocumentsWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: chrono::Utc::now().to_rfc3339(),
};
let content = serde_json::to_string_pretty(&wrapper)?;
tokio::fs::write(&collection_file, content).await?;
let duration = batch_start.elapsed();
info!("✅ BATCH SAVE SUCCESS: Saved {} documents to local file in {:?} (was: {}, now: {})",
documents.len(), duration, original_count, existing_docs.len());
Ok(())
}
async fn save_collection_metadata(&self, collection_name: &str, user_id: &str, document_count: usize) -> Result<()> {
let user_dir = self.base_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let metadata_file = user_dir.join(format!("{}-metadata.json", collection_name));
let collections = self.collections.read().await;
let collection = collections.get(collection_name)
.ok_or_else(|| anyhow!("Collection {} not found", collection_name))?;
let metadata = serde_json::json!({
"collection_name": collection_name,
"user_id": user_id,
"dimensions": collection.dimensions,
"distance_metric": collection.distance_metric,
"document_count": document_count,
"created_at": chrono::Utc::now().to_rfc3339(),
"hnsw_config": {
"m": 16,
"ef_construction": 100,
"ef_search": 50,
"max_connections": 32
}
});
let content = serde_json::to_string_pretty(&metadata)?;
tokio::fs::write(&metadata_file, content).await?;
Ok(())
}
async fn save_vectors_binary(&self, collection_name: &str, user_id: &str, vectors: &[Vec<f32>]) -> Result<()> {
let user_dir = self.base_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let vectors_file = user_dir.join(format!("{}-vectors.bin", collection_name));
let mut binary_data = Vec::new();
binary_data.extend_from_slice(&(vectors.len() as u32).to_le_bytes());
if !vectors.is_empty() {
binary_data.extend_from_slice(&(vectors[0].len() as u32).to_le_bytes());
} else {
binary_data.extend_from_slice(&0u32.to_le_bytes());
}
for vector in vectors {
for &value in vector {
binary_data.extend_from_slice(&value.to_le_bytes());
}
}
tokio::fs::write(&vectors_file, binary_data).await?;
Ok(())
}
async fn save_vector_index(&self, collection_name: &str, user_id: &str, vector_entries: &[(String, String)], dimensions: usize) -> Result<()> {
let user_dir = self.base_path.join(user_id);
tokio::fs::create_dir_all(&user_dir).await?;
let index_file = user_dir.join(format!("{}-vector-index.json", collection_name));
let vectors: Vec<serde_json::Value> = vector_entries.iter().enumerate().map(|(pos, (vector_id, doc_id))| {
serde_json::json!({
"vectorId": vector_id,
"documentId": doc_id,
"position": pos
})
}).collect();
let index = serde_json::json!({
"vectors": vectors,
"dimensions": dimensions,
"count": vector_entries.len(),
"lastUpdated": chrono::Utc::now().to_rfc3339(),
});
let content = serde_json::to_string_pretty(&index)?;
tokio::fs::write(&index_file, content).await?;
Ok(())
}
fn document_to_point(&self, document: &Document, user_id: &str) -> Result<PointStruct> {
let embedding = document.embedding.as_ref()
.ok_or_else(|| anyhow!("Document {} has no embedding", document.id))?;
let mut payload = Payload::new();
payload.insert("user_id", Value::from(user_id));
payload.insert("document_id", Value::from(document.id.clone()));
payload.insert("content", Value::from(document.content.clone()));
payload.insert("created_at", Value::from(document.created_at.to_rfc3339()));
payload.insert("updated_at", Value::from(document.updated_at.to_rfc3339()));
for (key, value) in &document.metadata {
payload.insert(key.clone(), Value::from(value.to_string()));
}
Ok(PointStruct::new(
document.vector_id.clone(),
embedding.clone(),
payload,
))
}
async fn matches_filter(&self, _document: &Document, _filter: &SearchFilter) -> Result<bool> {
Ok(true)
}
}
#[async_trait]
impl VectorStore for QdrantServerVectorStore {
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self) -> Result<()> {
tokio::fs::create_dir_all(&self.base_path).await?;
let collections = self.collections.read().await;
for (name, config) in collections.iter() {
let collection_exists = match self.client.collection_info(name).await {
Ok(_) => true,
Err(_) => false,
};
if !collection_exists {
let distance = match config.distance_metric.as_str() {
"Cosine" => Distance::Cosine,
"Dot" => Distance::Dot,
"Euclid" => Distance::Euclid,
_ => Distance::Cosine,
};
let create_collection = CreateCollectionBuilder::new(name)
.vectors_config(VectorParamsBuilder::new(config.dimensions as u64, distance))
.replication_factor(3) .write_consistency_factor(2);
self.client.create_collection(create_collection).await
.map_err(|e| anyhow!("Failed to create collection {}: {}", name, e))?;
info!("✅ Collection '{}' created successfully with replication factor 3", name);
} else {
info!("✅ Collection '{}' already exists", name);
}
}
let mut initialized = self.initialized.write().await;
*initialized = true;
info!("✅ Pre-registered collections initialized successfully");
Ok(())
}
async fn is_initialized(&self) -> bool {
*self.initialized.read().await
}
async fn set_dimensions(&self, _dimensions: usize) -> Result<()> {
Ok(())
}
async fn create_collection(&self, name: &str, dimension: usize) -> Result<()> {
let distance = if dimension == 1 {
Distance::Euclid } else {
Distance::Cosine };
let create_collection = CreateCollectionBuilder::new(name)
.vectors_config(VectorParamsBuilder::new(dimension as u64, distance))
.replication_factor(3) .write_consistency_factor(2);
self.client.create_collection(create_collection).await
.map_err(|e| anyhow!("Failed to create collection {}: {}", name, e))?;
info!("📁 Collection '{}' created with replication factor 3", name);
Ok(())
}
async fn delete_collection(&self, name: &str) -> Result<bool> {
match self.client.delete_collection(name).await {
Ok(_) => {
info!("Deleted collection: {}", name);
Ok(true)
},
Err(e) => {
error!("Failed to delete collection {}: {}", name, e);
Ok(false)
}
}
}
async fn list_collections(&self) -> Result<Vec<String>> {
let response = self.client.list_collections().await
.map_err(|e| anyhow!("Failed to list collections: {}", e))?;
Ok(response.collections.into_iter().map(|c| c.name).collect())
}
async fn get_collection_info(&self, name: &str) -> Result<Option<CollectionInfo>> {
match self.client.collection_info(name).await {
Ok(info) => {
let vector_size = info.result.as_ref()
.and_then(|r| r.config.as_ref())
.and_then(|c| c.params.as_ref())
.and_then(|p| p.vectors_config.as_ref())
.and_then(|vc| match vc.config.as_ref()? {
vectors_config::Config::Params(vp) => Some(vp.size as usize),
_ => None,
})
.unwrap_or(0);
let points_count = info.result.as_ref()
.map(|r| r.points_count.unwrap_or(0) as usize)
.unwrap_or(0);
let segments_count = info.result.as_ref()
.map(|r| r.segments_count as usize);
Ok(Some(CollectionInfo {
name: name.to_string(),
vector_size,
distance: "Cosine".to_string(),
points_count,
segments_count,
disk_data_size: None,
ram_data_size: None,
}))
},
Err(_) => Ok(None),
}
}
async fn get_collections_health(&self) -> Result<HashMap<String, CollectionHealth>> {
let collections = self.list_collections().await?;
let mut health_info = HashMap::new();
for collection_name in collections {
if let Ok(Some(info)) = self.get_collection_info(&collection_name).await {
let health = CollectionHealth {
name: collection_name.clone(),
status: "green".to_string(),
points_count: info.points_count,
segments_count: info.segments_count.unwrap_or(1),
disk_size: 0,
ram_size: 0,
last_updated: Utc::now(),
};
health_info.insert(collection_name, health);
}
}
Ok(health_info)
}
async fn add_document(&self, collection_name: &str, document: Document) -> Result<String> {
let user_id = self.current_user_context.read().await
.as_ref()
.ok_or_else(|| anyhow!("No user context set. Call set_user_context() before adding documents."))?
.clone();
let doc_id = if document.id.is_empty() {
Uuid::new_v4().to_string()
} else {
document.id.clone()
};
let mut doc_to_insert = document;
doc_to_insert.id = doc_id.clone();
doc_to_insert.updated_at = Utc::now();
if doc_to_insert.embedding.is_none() {
if let Ok(Some(info)) = self.get_collection_info(collection_name).await {
if info.vector_size == 1 {
doc_to_insert.embedding = Some(vec![0.0]);
}
}
}
let mut stored_content = doc_to_insert.content.clone();
let mut stored_metadata = doc_to_insert.metadata.clone();
let already_encrypted = stored_metadata.get("_encrypted_content")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !stored_content.is_empty() && !already_encrypted {
match self.encryption_service.encrypt_content(&stored_content).await {
Ok(encrypted_content) => {
stored_content = encrypted_content;
stored_metadata.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
}
Err(e) => {
warn!("Failed to encrypt content: {}", e);
}
}
}
let metadata_already_encrypted = stored_metadata.contains_key("_encrypted_metadata");
if !stored_metadata.is_empty() && !metadata_already_encrypted {
let mut metadata_to_encrypt = stored_metadata.clone();
metadata_to_encrypt.shift_remove("_encrypted_content");
match serde_json::to_string(&metadata_to_encrypt) {
Ok(metadata_json) => {
match self.encryption_service.encrypt_content(&metadata_json).await {
Ok(encrypted_metadata) => {
stored_metadata.clear();
stored_metadata.insert("_encrypted_metadata".to_string(), serde_json::Value::String(encrypted_metadata));
stored_metadata.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
stored_metadata.insert("created_at".to_string(), serde_json::Value::String(Utc::now().to_rfc3339()));
stored_metadata.insert("updated_at".to_string(), serde_json::Value::String(Utc::now().to_rfc3339()));
}
Err(e) => {
warn!("Failed to encrypt metadata: {}", e);
}
}
}
Err(e) => {
warn!("Failed to serialize metadata for encryption: {}", e);
}
}
}
doc_to_insert.content = stored_content;
doc_to_insert.metadata = stored_metadata;
let point = self.document_to_point(&doc_to_insert, &user_id)?;
self.client.upsert_points(
UpsertPointsBuilder::new(collection_name, vec![point])
.wait(true)
).await
.map_err(|e| anyhow!("Failed to upsert document to Qdrant: {}", e))?;
if let Err(e) = self.save_document_to_local_file(collection_name, &user_id, &doc_to_insert).await {
warn!("Failed to save document to local file: {}", e);
}
if let Some(embedding) = &doc_to_insert.embedding {
let mut all_vectors = Vec::new();
let mut all_vector_entries: Vec<(String, String)> = Vec::new();
let user_dir = self.base_path.join(&user_id);
let index_file = user_dir.join(format!("{}-vector-index.json", collection_name));
if index_file.exists() {
if let Ok(content) = tokio::fs::read_to_string(&index_file).await {
if let Ok(index) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(vectors_array) = index.get("vectors") {
if let Ok(entries) = serde_json::from_value::<Vec<serde_json::Value>>(vectors_array.clone()) {
for entry in entries {
if let (Some(vector_id), Some(doc_id)) = (entry.get("vectorId").and_then(|v| v.as_str()), entry.get("documentId").and_then(|v| v.as_str())) {
all_vector_entries.push((vector_id.to_string(), doc_id.to_string()));
}
}
}
}
}
}
let vectors_file = user_dir.join(format!("{}-vectors.bin", collection_name));
if vectors_file.exists() {
if let Ok(buffer) = tokio::fs::read(&vectors_file).await {
if buffer.len() >= 8 {
let count = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
let dimensions = u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize;
let mut offset = 8;
for _ in 0..count {
if offset + dimensions * 4 <= buffer.len() {
let mut vector = Vec::with_capacity(dimensions);
for _ in 0..dimensions {
let value = f32::from_le_bytes([buffer[offset], buffer[offset+1], buffer[offset+2], buffer[offset+3]]);
vector.push(value);
offset += 4;
}
all_vectors.push(vector);
}
}
}
}
}
}
let min_len = all_vectors.len().min(all_vector_entries.len());
all_vectors.truncate(min_len);
all_vector_entries.truncate(min_len);
let vector_id = doc_to_insert.vector_id.clone();
if let Some(pos) = all_vector_entries.iter().position(|(_, d_id)| d_id == &doc_id) {
all_vectors[pos] = embedding.clone();
all_vector_entries[pos] = (vector_id, doc_id.clone());
} else {
all_vector_entries.push((vector_id, doc_id.clone()));
all_vectors.push(embedding.clone());
}
let dimensions = if !all_vectors.is_empty() { all_vectors[0].len() } else { 0 };
if let Err(e) = self.save_vectors_binary(collection_name, &user_id, &all_vectors).await {
warn!("Failed to save vectors: {}", e);
}
if let Err(e) = self.save_vector_index(collection_name, &user_id, &all_vector_entries, dimensions).await {
warn!("Failed to save vector index: {}", e);
}
if let Err(e) = self.save_collection_metadata(collection_name, &user_id, all_vector_entries.len()).await {
warn!("Failed to save collection metadata: {}", e);
}
}
Ok(doc_id)
}
async fn add_documents(&self, collection_name: &str, documents: Vec<Document>) -> Result<Vec<String>> {
let user_id = self.current_user_context.read().await
.as_ref()
.ok_or_else(|| anyhow!("No user context set"))?
.clone();
let mut points = Vec::new();
let mut doc_ids = Vec::new();
let mut processed_docs = Vec::new();
for document in documents {
let doc_id = if document.id.is_empty() {
Uuid::new_v4().to_string()
} else {
document.id.clone()
};
let mut doc_to_store = document;
doc_to_store.id = doc_id.clone();
doc_to_store.updated_at = Utc::now();
let mut stored_content = doc_to_store.content.clone();
let mut stored_metadata = doc_to_store.metadata.clone();
if !stored_content.is_empty() {
match self.encryption_service.encrypt_content(&stored_content).await {
Ok(encrypted_content) => {
stored_content = encrypted_content;
stored_metadata.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
}
Err(e) => warn!("Failed to encrypt content: {}", e),
}
}
doc_to_store.content = stored_content;
doc_to_store.metadata = stored_metadata;
let point = self.document_to_point(&doc_to_store, &user_id)?;
points.push(point);
doc_ids.push(doc_id);
processed_docs.push(doc_to_store);
}
self.client.upsert_points(
UpsertPointsBuilder::new(collection_name, points)
.wait(true)
).await
.map_err(|e| anyhow!("Failed to batch upsert documents to Qdrant: {}", e))?;
info!("💾 Batch saving {} documents to local backup file...", processed_docs.len());
if let Err(e) = self.save_documents_batch_to_local_file(collection_name, &user_id, &processed_docs).await {
error!("❌ Failed to batch save documents to local file: {}", e);
error!(" Documents are safe in Qdrant, but local backup failed");
}
Ok(doc_ids)
}
async fn search(
&self,
collection_name: &str,
query_vector: Vec<f32>,
options: SearchOptions,
) -> Result<Vec<SearchResult>> {
let user_id = self.current_user_context.read().await
.as_ref()
.ok_or_else(|| anyhow!("No user context set"))?
.clone();
let limit = options.limit.unwrap_or(10) as u64;
let score_threshold = options.score_threshold;
info!("🔍 ========== QDRANT SERVER SEARCH FLOW START ==========");
info!(" Collection: {}", collection_name);
info!(" User: {}", user_id);
info!(" Limit: {}, Score Threshold: {:?}", limit, score_threshold);
let mut search_builder = SearchPointsBuilder::new(
collection_name,
query_vector,
limit
).with_payload(true);
let mut filter = Filter::default();
filter.must.push(Condition::matches("user_id", user_id.clone()));
info!(" 🔐 Multi-tenancy filter: user_id = {}", user_id);
let mut filter_count = 1; if let Some(search_filter) = &options.filter {
info!(" 🎯 STEP 1: PRE-FILTERING (Qdrant native filters)");
if let Some(must_conditions) = &search_filter.must {
info!(" Filter conditions ({} conditions):", must_conditions.len());
for condition in must_conditions {
match &condition.r#match {
crate::types::MatchCondition::Value { value } => {
info!(" - {} = {:?}", condition.key, value);
if let Some(s) = value.as_str() {
filter.must.push(Condition::matches(&condition.key, s.to_string()));
} else if let Some(n) = value.as_i64() {
filter.must.push(Condition::matches(&condition.key, n));
} else {
filter.must.push(Condition::matches(&condition.key, value.to_string()));
}
filter_count += 1;
}
_ => {
info!(" - {} (complex condition - skipped)", condition.key);
}
}
}
}
info!(" ✅ Total filters applied: {}", filter_count);
} else {
info!(" ⏭️ STEP 1: No additional filters (only user_id filter)");
}
search_builder = search_builder.filter(filter);
if let Some(threshold) = score_threshold {
search_builder = search_builder.score_threshold(threshold);
}
info!(" 🧠 STEP 2: SEMANTIC SEARCH (Qdrant server with pre-filters)");
let search_start = std::time::Instant::now();
let search_result = self.client.search_points(search_builder).await
.map_err(|e| anyhow!("Qdrant search failed: {}", e))?;
let search_duration = search_start.elapsed();
info!(" ✅ Qdrant search complete: {} results in {:?}", search_result.result.len(), search_duration);
let mut results = Vec::new();
for scored_point in search_result.result {
let point_id = scored_point.id.and_then(|id| id.point_id_options)
.map(|opts| match opts {
point_id::PointIdOptions::Num(n) => n.to_string(),
point_id::PointIdOptions::Uuid(u) => u,
})
.unwrap_or_default();
let content = scored_point.payload.get("content")
.and_then(|v| {
if let Some(s) = v.as_str() {
Some(s.to_string())
} else {
v.to_string().strip_prefix('"').and_then(|s| s.strip_suffix('"')).map(|s| s.to_string())
}
})
.unwrap_or_default();
let document_id = scored_point.payload.get("document_id")
.and_then(|v| {
if let Some(s) = v.as_str() {
Some(s.to_string())
} else {
v.to_string().strip_prefix('"').and_then(|s| s.strip_suffix('"')).map(|s| s.to_string())
}
})
.unwrap_or_else(|| point_id.clone());
let mut metadata = indexmap::IndexMap::new();
for (key, value) in &scored_point.payload {
if !["user_id", "content", "document_id", "created_at", "updated_at"].contains(&key.as_str()) {
metadata.insert(key.clone(), serde_json::to_value(value).unwrap_or_default());
}
}
let created_at = scored_point.payload.get("created_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|| chrono::Utc::now());
let updated_at = scored_point.payload.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|| chrono::Utc::now());
let document = Document {
id: document_id,
vector_id: point_id.clone(),
content: content.clone(),
embedding: None,
metadata: metadata.clone(),
created_at,
updated_at,
};
let mut payload: HashMap<String, serde_json::Value> = metadata.into_iter().collect();
payload.insert("content".to_string(), serde_json::Value::String(content));
let result = SearchResult {
id: point_id,
score: scored_point.score,
document: Some(document), payload: Some(payload),
};
results.push(result);
}
info!("✅ ========== QDRANT SERVER SEARCH FLOW COMPLETE: {} results returned ==========\n", results.len());
Ok(results)
}
async fn get_document(&self, collection_name: &str, id: &str) -> Result<Option<Document>> {
let user_id = self.current_user_context.read().await
.as_ref()
.ok_or_else(|| anyhow!("No user context set"))?
.clone();
let point_id = PointId {
point_id_options: Some(point_id::PointIdOptions::Uuid(id.to_string())),
};
let points = self.client.get_points(
GetPointsBuilder::new(collection_name, vec![point_id])
.with_payload(true)
).await.map_err(|e| anyhow!("Failed to get point: {}", e))?;
if let Some(point) = points.result.first() {
let mut payload_map = HashMap::new();
for (key, value) in &point.payload {
payload_map.insert(key.clone(), serde_json::to_value(value).unwrap_or_default());
}
let embedding = point.vectors.as_ref().and_then(|v| {
v.vectors_options.as_ref().and_then(|opts| {
use qdrant_client::qdrant::vectors_output::VectorsOptions;
match opts {
VectorsOptions::Vector(v) => Some(v.data.clone()),
_ => None,
}
})
});
let document = Document {
id: id.to_string(),
vector_id: id.to_string(), content: payload_map.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
embedding,
metadata: payload_map.clone().into_iter().collect(),
created_at: payload_map.get("created_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now),
updated_at: payload_map.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now),
};
Ok(Some(document))
} else {
Ok(None)
}
}
async fn update_document(&self, collection_name: &str, id: &str, document: Document) -> Result<()> {
self.add_document(collection_name, document).await?;
Ok(())
}
async fn delete_document(&self, collection_name: &str, id: &str) -> Result<bool> {
let point_id = PointId {
point_id_options: Some(point_id::PointIdOptions::Uuid(id.to_string())),
};
match self.client.delete_points(
DeletePointsBuilder::new(collection_name)
.points(vec![point_id])
.wait(true)
).await {
Ok(_) => Ok(true),
Err(e) => {
error!("Failed to delete document {}: {}", id, e);
Ok(false)
}
}
}
async fn list_documents(&self, collection_name: &str, limit: Option<usize>, _filter: Option<SearchFilter>) -> Result<Vec<Document>> {
let user_id = self.current_user_context.read().await
.as_ref()
.ok_or_else(|| anyhow!("No user context set"))?
.clone();
let mut filter = Filter::default();
filter.must.push(Condition::matches("user_id", user_id));
let scroll_result = self.client.scroll(
ScrollPointsBuilder::new(collection_name)
.limit(limit.unwrap_or(100) as u32)
.filter(filter)
.with_payload(true)
).await.map_err(|e| anyhow!("Failed to scroll documents: {}", e))?;
let mut documents = Vec::new();
for point in scroll_result.result {
let mut payload_map = HashMap::new();
for (key, value) in &point.payload {
payload_map.insert(key.clone(), serde_json::to_value(value).unwrap_or_default());
}
let point_id = point.id.and_then(|id| id.point_id_options)
.map(|opts| match opts {
point_id::PointIdOptions::Num(n) => n.to_string(),
point_id::PointIdOptions::Uuid(u) => u,
})
.unwrap_or_default();
let embedding = point.vectors.as_ref().and_then(|v| {
v.vectors_options.as_ref().and_then(|opts| {
use qdrant_client::qdrant::vectors_output::VectorsOptions;
match opts {
VectorsOptions::Vector(v) => Some(v.data.clone()),
_ => None,
}
})
});
let document = Document {
id: point_id.clone(),
vector_id: point_id, content: payload_map.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
embedding,
metadata: payload_map.clone().into_iter().collect(),
created_at: payload_map.get("created_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now),
updated_at: payload_map.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now),
};
documents.push(document);
}
Ok(documents)
}
async fn scroll_collection(&self, collection_name: &str, filter: Option<SearchFilter>, limit: Option<usize>) -> Result<Vec<SearchResult>> {
self.list_documents(collection_name, limit, filter).await?
.into_iter()
.map(|doc| Ok(SearchResult {
id: doc.id.clone(),
score: 1.0,
document: Some(doc),
payload: None,
}))
.collect()
}
async fn shutdown(&self) -> Result<()> {
info!("Qdrant server vector store shutdown requested");
Ok(())
}
async fn clear_document_cache(&self) -> Result<()> {
info!("📝 Server mode: document cache clear not applicable (server handles caching)");
Ok(())
}
async fn disable_optimizer(&self, collection_name: &str) -> Result<()> {
info!("🛑 Disabling Qdrant optimizer for collection: {} (prevents data loss during bulk insert)", collection_name);
use qdrant_client::qdrant::{UpdateCollectionBuilder, OptimizersConfigDiff};
self.client.update_collection(
UpdateCollectionBuilder::new(collection_name)
.optimizers_config(OptimizersConfigDiff {
max_optimization_threads: Some(0_u64.into()), indexing_threshold: Some(100000_u64.into()), ..Default::default()
})
).await.map_err(|e| anyhow!("Failed to disable optimizer: {}", e))?;
info!("✅ Optimizer disabled for collection: {}", collection_name);
Ok(())
}
async fn enable_optimizer(&self, collection_name: &str) -> Result<()> {
info!("✅ Re-enabling Qdrant optimizer for collection: {}", collection_name);
use qdrant_client::qdrant::{UpdateCollectionBuilder, OptimizersConfigDiff};
self.client.update_collection(
UpdateCollectionBuilder::new(collection_name)
.optimizers_config(OptimizersConfigDiff {
max_optimization_threads: Some(1_u64.into()), indexing_threshold: Some(10000_u64.into()), ..Default::default()
})
).await.map_err(|e| anyhow!("Failed to enable optimizer: {}", e))?;
info!("✅ Optimizer re-enabled for collection: {}", collection_name);
Ok(())
}
}