pub mod config;
pub mod db;
pub mod services;
pub mod types;
pub mod utils;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use anyhow::Result;
use tracing::{info, error, debug, warn};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use sha2::{Sha256, Digest};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptedChatDocument {
pub id: String,
#[serde(rename = "vectorId")]
pub vector_id: String,
pub content: String, pub embedding: Vec<f32>,
pub metadata: EncryptedDocumentMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptedDocumentMetadata {
#[serde(rename = "_encrypted_metadata")]
pub encrypted_metadata: String, #[serde(rename = "_encrypted_content")]
pub encrypted_content: bool,
pub created_at: String, pub updated_at: String, }
pub use config::ConfigManager;
pub use db::{VectorStore, LocalFileVectorStore};
pub use services::*;
pub use types::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMessage {
pub id: String,
pub context_id: String,
pub message_index: u32,
pub role: MessageRole,
pub content: String,
pub timestamp: DateTime<Utc>,
pub chat_title: String,
pub metadata: Option<serde_json::Value>,
pub request_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessageRole {
User,
Assistant,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResponsePair {
pub query: SessionMessage,
pub response: SessionMessage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionHistoryResult {
pub context_id: String,
pub chat_title: String,
pub pairs: Vec<QueryResponsePair>,
pub total_pairs: usize,
pub is_empty: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContextInfo {
pub context_id: String,
pub chat_title: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatHistoryJson {
pub context_id: String,
pub chat_title: String,
pub total_messages: usize,
pub conversation: Vec<ConversationTurn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConversationTurn {
pub turn_number: usize,
pub prompt: MessageJson,
pub response: MessageJson,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageJson {
pub message_id: String,
pub content: String,
pub timestamp: DateTime<Utc>,
pub message_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateResult {
pub created: usize,
pub failed: Vec<String>,
}
#[derive(Clone)]
pub struct RagModule {
base_path: PathBuf,
initialized: Arc<RwLock<bool>>,
pub config_manager: Arc<config::ConfigManager>,
pub embedding_service: Arc<services::EmbeddingService>,
pub vector_store: Arc<dyn db::VectorStore + Send + Sync>,
pub document_service: Arc<services::DocumentService>,
pub search_service: Arc<services::SearchService>,
pub encryption_service: Arc<services::EncryptionService>,
pub security_service: Arc<services::SecurityService>,
pub mapping_service: Arc<services::MappingService>,
pub sync_service: Arc<services::SyncService>,
pub indexing_service: Arc<services::IndexingService>,
pub kb_service: Arc<services::KBService>,
pub iam_service: Arc<services::IAMService>,
pub operation_context_service: Arc<services::OperationContextService>,
pub data_filtering_service: Arc<services::DataFilteringService>,
pub collection_manager: Arc<services::CollectionManager>,
pub aws_estate_service: Arc<services::AwsEstateService>,
messages_file: PathBuf,
qdrant_data_path: PathBuf, aws_estate_file: PathBuf,
message_counters: Arc<RwLock<HashMap<String, u32>>>,
}
impl RagModule {
pub async fn new(base_path: impl Into<PathBuf>) -> Result<Self> {
let base_path = base_path.into();
let mut config_manager = config::ConfigManager::new(&base_path).await?;
let mut config = config_manager.get_config();
let mut config_updated = false;
if let Ok(qdrant_url) = std::env::var("QDRANT_URL") {
info!("🔧 QDRANT_URL environment variable detected: {}", qdrant_url);
config.vector_store.backend = "qdrant-embedded".to_string();
config.vector_store.server_sync_url = Some(qdrant_url.clone());
config.vector_store.enable_server_sync = true;
config_updated = true;
info!("✅ Using qdrant-embedded with server sync to: {}", qdrant_url);
} else {
if config.vector_store.backend == "qdrant-server" {
info!("🔄 Auto-upgrading from 'qdrant-server' to 'qdrant-embedded' with server sync");
config.vector_store.backend = "qdrant-embedded".to_string();
if config.vector_store.server_sync_url.is_none() {
config.vector_store.server_sync_url = Some(config.vector_store.connection.url.clone());
}
config.vector_store.enable_server_sync = true;
config_updated = true;
info!(" ✅ Upgraded to DUAL mode: Embedded (local) + Server sync");
info!(" 📍 Local: ./qdrant-data");
info!(" 📡 Sync: {}", config.vector_store.server_sync_url.as_ref().unwrap());
}
}
if config_updated {
config_manager.update_config(config.clone()).await?;
info!("💾 Config saved");
}
let config_manager = Arc::new(config_manager);
if let Ok(val) = std::env::var("RAG_DISABLE_ENCRYPTION") {
info!("🔧 RAG_DISABLE_ENCRYPTION environment variable detected: {} - Disabling encryption for server-side operations", val);
config.encryption.enable_content_encryption = false;
config.encryption.enable_metadata_encryption = false;
config.encryption.enable_embedding_encryption = false;
info!("✅ Encryption disabled: content={}, metadata={}, embedding={}",
config.encryption.enable_content_encryption,
config.encryption.enable_metadata_encryption,
config.encryption.enable_embedding_encryption);
} else {
warn!("❌ RAG_DISABLE_ENCRYPTION environment variable not found - encryption remains enabled");
}
let mut encryption_service = services::EncryptionService::new(&config.encryption, base_path.to_str().unwrap()).await?;
encryption_service.initialize().await?;
let encryption_service = Arc::new(encryption_service);
let vector_store: Arc<dyn db::VectorStore + Send + Sync> = match config.vector_store.backend.as_str() {
"qdrant-embedded" => {
if config.vector_store.enable_server_sync {
if let Some(server_url) = &config.vector_store.server_sync_url {
info!("💾🔄 Using Qdrant DUAL mode: Embedded (primary) + Server sync ({})", server_url);
let embedded = db::EmbeddedQdrantVectorStore::new(
&base_path,
encryption_service.clone(),
).await?;
let server = db::QdrantServerVectorStore::new(
server_url,
config.vector_store.connection.api_key.clone(),
&base_path.join("qdrant-data"),
encryption_service.clone(),
).await?;
Arc::new(db::DualVectorStore::new(
embedded,
server,
false, ).await?)
} else {
warn!("⚠️ Server sync enabled but no serverSyncUrl configured. Using embedded only.");
info!("💾 Using Qdrant embedded mode");
Arc::new(db::EmbeddedQdrantVectorStore::new(
&base_path,
encryption_service.clone(),
).await?)
}
} else {
info!("💾 Using Qdrant embedded mode (server sync disabled)");
Arc::new(db::EmbeddedQdrantVectorStore::new(
&base_path,
encryption_service.clone(),
).await?)
}
}
"qdrant-server" => {
info!("🌐 Using Qdrant server mode: {}", config.vector_store.connection.url);
Arc::new(db::QdrantServerVectorStore::new(
&config.vector_store.connection.url,
config.vector_store.connection.api_key.clone(),
&base_path.join("qdrant-data"),
encryption_service.clone(),
).await?)
}
"local-files" => {
info!("📁 Using local file storage mode");
Arc::new(db::LocalFileVectorStore::new(
&base_path.join("vector-data"),
encryption_service.clone(),
).await?)
}
_ => return Err(anyhow::anyhow!(
"Unsupported vector store backend: '{}'. Supported backends: 'qdrant-embedded', 'qdrant-server', 'local-files'. \
Use 'qdrant-embedded' for embedded mode (default) or set QDRANT_URL environment variable to use server mode.",
config.vector_store.backend
)),
};
let models_path = base_path.join("models");
let embedding_service = {
let service = services::EmbeddingService::new(&config.embedding, &models_path).await?;
service.initialize().await?;
Arc::new(service)
};
let dimensions = embedding_service.get_dimensions().await?;
vector_store.set_dimensions(dimensions).await?;
let security_service = Arc::new(
services::SecurityService::new(config_manager.as_ref().clone()).await?
);
let mapping_service = Arc::new(
services::MappingService::new(&config.privacy).await?
);
let sync_service = Arc::new(
services::SyncService::new(
encryption_service.clone(),
config_manager.clone()
).await?
);
let indexing_service = Arc::new(
services::IndexingService::new(
embedding_service.clone(),
encryption_service.clone(),
).await?
);
let document_service = Arc::new(
services::DocumentService::new(
vector_store.clone(),
indexing_service.clone(),
mapping_service.clone(),
security_service.clone(),
encryption_service.clone(),
).await?
);
let kb_service = Arc::new(
services::KBService::new().await?
);
let iam_service = Arc::new(
services::IAMService::new(&config.iam).await?
);
let operation_context_service = Arc::new(
services::OperationContextService::new().await?
);
let data_filtering_service = Arc::new(
services::DataFilteringService::new().await?
);
let collection_manager = Arc::new(
services::CollectionManager::new(vector_store.clone())
);
let mut aws_estate_service = services::AwsEstateService::new(document_service.clone());
aws_estate_service.register_parser(Box::new(services::aws_parsers::Ec2Parser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::RdsParser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::S3Parser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::LambdaParser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::EbsParser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::VpcParser::new()));
aws_estate_service.register_parser(Box::new(services::aws_parsers::IamParser::new()));
let aws_estate_service = Arc::new(aws_estate_service);
let mut search_service_instance = services::SearchService::new_with_services(
vector_store.clone(),
embedding_service.clone(),
Some(security_service.clone()),
Some(encryption_service.clone()),
Some(mapping_service.clone()),
None, ).await?;
search_service_instance.set_base_path(base_path.clone());
let search_service = Arc::new(search_service_instance);
let storage_path = base_path.join("sessions");
tokio::fs::create_dir_all(&storage_path).await?;
let messages_file = storage_path.join("messages.jsonl");
let qdrant_data_path = base_path.join("qdrant-data");
let aws_estate_file = qdrant_data_path.join("aws_estate-metadata.json");
Ok(Self {
base_path,
initialized: Arc::new(RwLock::new(false)),
config_manager,
embedding_service,
vector_store,
document_service,
search_service,
encryption_service,
security_service,
mapping_service,
sync_service,
indexing_service,
kb_service,
iam_service,
operation_context_service,
data_filtering_service,
collection_manager,
aws_estate_service,
messages_file,
qdrant_data_path,
aws_estate_file,
message_counters: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn initialize(&self) -> Result<()> {
let mut initialized = self.initialized.write().await;
if *initialized {
return Ok(());
}
info!("Initializing RAG Module at path: {:?}", self.base_path);
self.create_directory_structure().await?;
self.vector_store.initialize().await?;
if !self.encryption_service.is_initialized() {
warn!("Encryption service not initialized, initializing now");
}
self.embedding_service.initialize().await?;
self.document_service.initialize().await?;
self.search_service.initialize().await?;
self.security_service.initialize().await?;
self.mapping_service.initialize().await?;
self.sync_service.initialize().await?;
self.indexing_service.initialize().await?;
self.kb_service.initialize().await?;
self.iam_service.initialize().await?;
self.operation_context_service.initialize().await?;
self.data_filtering_service.initialize().await?;
self.collection_manager.initialize().await?;
*initialized = true;
info!("RAG Module initialized successfully");
Ok(())
}
pub async fn is_initialized(&self) -> bool {
*self.initialized.read().await
}
pub async fn reset_state(&self) -> Result<()> {
info!("🔄 Resetting RAG Module state...");
self.embedding_service.reset_state().await?;
info!("🗑️ Clearing vector store document cache...");
self.vector_store.clear_document_cache().await?;
info!("✅ RAG Module state reset complete");
Ok(())
}
async fn create_directory_structure(&self) -> Result<()> {
let directories = [
"config",
"data",
"models",
"cache",
"sync",
"keys",
"qdrant-data", ];
tokio::fs::create_dir_all(&self.base_path).await?;
for dir in directories.iter() {
let dir_path = self.base_path.join(dir);
tokio::fs::create_dir_all(&dir_path).await?;
debug!("Created directory: {:?}", dir_path);
}
let gitignore_content = r#"# RAG Module - Exclude sensitive data
keys/
sync/
cache/
*.key
*.encrypted
config.local.yaml
"#;
let gitignore_path = self.base_path.join(".gitignore");
tokio::fs::write(&gitignore_path, gitignore_content).await?;
debug!("Created .gitignore file: {:?}", gitignore_path);
info!("Directory structure created successfully");
Ok(())
}
pub async fn add_document(&self, collection_type: &str, document: Document) -> Result<String> {
if !self.is_initialized().await {
self.initialize().await?;
}
debug!("Adding document to collection: {}", collection_type);
self.document_service.add_document(collection_type, document).await
}
pub async fn search(&self, collection_type: &str, query: &str, user_id: &str, options: SearchOptions) -> Result<Vec<SearchResult>> {
if !self.is_initialized().await {
self.initialize().await?;
}
debug!("Searching in collection: {} with query: {} for user: {}", collection_type, query, user_id);
if collection_type == "chat_history" || collection_type == "chat" {
let chat_options = services::search_service::ChatSearchOptions {
context_id: None,
role: None,
from_timestamp: None,
to_timestamp: None,
from_message_index: None,
to_message_index: None,
limit: options.limit,
include_metadata: false,
user_id: Some(user_id.to_string()),
};
let results = self.search_service.search_chat_history(chat_options).await?;
Ok(results.into_iter().enumerate().map(|(i, result)| {
let payload = if let serde_json::Value::Object(map) = result {
Some(map.into_iter().collect())
} else {
None
};
types::SearchResult {
id: payload.as_ref()
.and_then(|p: &std::collections::HashMap<String, serde_json::Value>| p.get("id"))
.and_then(|v| v.as_str())
.unwrap_or(&i.to_string()).to_string(),
score: payload.as_ref()
.and_then(|p: &std::collections::HashMap<String, serde_json::Value>| p.get("score"))
.and_then(|v| v.as_f64())
.unwrap_or(0.0) as f32,
document: None,
payload,
}
}).collect())
} else if collection_type == "escher_library" || collection_type.starts_with("tenant_") {
info!("📚 ROUTE: Playbook Search ({})", collection_type);
info!("🔍 Calling search_service.search_playbooks()...");
let results = self.search_service.search_playbooks(collection_type, query, &options).await?;
info!("✅ Playbook search completed: {} results", results.len());
Ok(results)
} else if collection_type == "aws_estate" || collection_type.ends_with("_estate") {
info!("🏗️ ROUTE: Estate Search ({})", collection_type);
let requested_parameters = options.parameters.clone();
let estate_options = services::search_service::EstateSearchOptions {
resource_types: None,
account_ids: None,
regions: None,
services: None,
states: None,
environment: None,
application: None,
synced_after: None,
limit: options.limit,
score_threshold: options.score_threshold,
include_metadata: false,
use_anonymous_ids: true,
parameters: options.parameters, };
let results = self.search_service.search_estate_resources(collection_type, query, estate_options, None, user_id).await?;
Ok(results.into_iter().enumerate().map(|(i, result)| {
let mut payload: Option<std::collections::HashMap<String, serde_json::Value>> = if let serde_json::Value::Object(map) = result {
Some(map.into_iter().collect())
} else {
None
};
if let Some(ref mut p) = payload {
if let Some(encrypted_metadata_str) = p.get("_encrypted_metadata").and_then(|v| v.as_str()) {
match serde_json::from_str::<serde_json::Value>(encrypted_metadata_str) {
Ok(metadata_json) => {
if let Some(metadata_obj) = metadata_json.as_object() {
if let Some(ref params) = requested_parameters {
info!("📋 Extracting {} requested fields from Qdrant metadata", params.len());
for (field_name, _) in params {
if let Some(field_value) = metadata_obj.get(field_name) {
p.insert(field_name.clone(), field_value.clone());
debug!(" ✅ Extracted field '{}' from Qdrant", field_name);
} else {
debug!(" ⚠️ Field '{}' not found in Qdrant metadata", field_name);
}
}
} else {
info!("📋 Extracting all {} fields from Qdrant metadata", metadata_obj.len());
for (key, value) in metadata_obj {
p.insert(key.clone(), value.clone());
}
}
debug!("✅ Extracted metadata from Qdrant for result {}", i);
} else {
warn!("⚠️ Qdrant metadata for result {} is not a JSON object", i);
}
},
Err(e) => {
warn!("⚠️ Failed to parse Qdrant metadata for result {}: {}", i, e);
}
}
} else {
debug!("⚠️ No _encrypted_metadata field found in Qdrant payload for result {}", i);
}
}
types::SearchResult {
id: payload.as_ref()
.and_then(|p: &std::collections::HashMap<String, serde_json::Value>| p.get("id"))
.and_then(|v: &serde_json::Value| v.as_str())
.unwrap_or(&i.to_string()).to_string(),
score: payload.as_ref()
.and_then(|p: &std::collections::HashMap<String, serde_json::Value>| p.get("score"))
.and_then(|v: &serde_json::Value| v.as_f64())
.unwrap_or(0.0) as f32,
document: None,
payload,
}
}).collect())
} else {
Err(anyhow::anyhow!("Unsupported collection type: {}", collection_type))
}
}
pub async fn get_document(&self, collection_type: &str, id: &str) -> Result<Option<Document>> {
if !self.is_initialized().await {
self.initialize().await?;
}
self.document_service.get_document(collection_type, id).await
}
pub async fn process_aws_estate(&self, data: serde_json::Value, user_id: &str, collection_name: &str) -> Result<Vec<String>> {
if !self.is_initialized().await {
self.initialize().await?;
}
info!("Processing AWS estate data for user: {}", user_id);
let document_ids = self.iam_service.process_estate_data(data.clone()).await?;
self.save_aws_estate_documents(&data, &document_ids, user_id, collection_name).await?;
Ok(document_ids)
}
fn generate_doc_id_from_arn(collection_name: &str, metadata_obj: &serde_json::Map<String, serde_json::Value>) -> String {
if let Some(arn) = metadata_obj.get("arnId") {
if let Some(arn_str) = arn.as_str() {
let mut hasher = Sha256::new();
hasher.update(arn_str.as_bytes());
let hash_result = hasher.finalize();
let hash_hex = format!("{:x}", hash_result);
let doc_id = format!("{}-{}", collection_name, &hash_hex[..16]);
info!("🆔 Generated deterministic doc_id from ARN: {} -> {}", arn_str, doc_id);
return doc_id;
} else {
warn!("⚠️ arnId field exists but is not a string, using UUID fallback");
}
} else {
warn!("⚠️ arnId field missing from metadata, using UUID fallback");
}
let fallback_id = format!("{}-{}", collection_name, uuid::Uuid::new_v4());
warn!("🔄 Using fallback UUID doc_id: {}", fallback_id);
fallback_id
}
pub async fn ingest_aws_estate(&self, estate_data: serde_json::Value, user_id: &str, collection_name:&str) -> Result<services::AwsEstateIngestResult> {
let start_time = std::time::Instant::now();
info!("🚀 Starting AWS estate ingestion - user_id: {}, collection: {}", user_id, collection_name);
if !self.is_initialized().await {
info!("📦 RAG module not initialized, initializing...");
match self.initialize().await {
Ok(_) => info!("✅ RAG module initialized successfully"),
Err(e) => {
error!("❌ Failed to initialize RAG module: {}", e);
return Err(e);
}
}
}
info!("👤 Setting user context for user_id: {}", user_id);
if let Err(e) = self.set_user_context(user_id).await {
error!("❌ Failed to set user context: {}", e);
return Err(e);
}
info!("📄 Extracting content from estate data...");
let content_str = match estate_data.get("content") {
Some(content) => {
match content.as_str() {
Some(s) => {
let content_len = s.len();
info!("✅ Content extracted successfully (length: {} chars)", content_len);
s.to_string()
},
None => {
error!("❌ Content field exists but is not a string: {:?}", content);
return Err(anyhow::anyhow!("content field must be a string"));
}
}
},
None => {
error!("❌ Content field missing from estate_data. Available fields: {:?}",
estate_data.as_object().map(|o| o.keys().collect::<Vec<_>>()));
return Err(anyhow::anyhow!("content field is required in the JSON object"));
}
};
info!("🧮 Generating embedding for content...");
let embedding = match self.embedding_service.generate_embedding(&content_str).await {
Ok(emb) => {
info!("✅ Embedding generated successfully (dimensions: {})", emb.len());
emb
},
Err(e) => {
error!("❌ Failed to generate embedding: {}", e);
return Err(e);
}
};
info!("📋 Processing metadata...");
let mut metadata_obj = match estate_data.as_object() {
Some(obj) => {
info!("✅ Metadata object extracted ({} fields)", obj.len());
let mut metadata_obj = obj.clone();
metadata_obj.remove("content");
metadata_obj
},
None => {
error!("❌ estate_data is not a JSON object");
return Err(anyhow::anyhow!("estate_data must be a JSON object"));
}
};
let doc_id = Self::generate_doc_id_from_arn(collection_name, &metadata_obj);
let metadata_str = match serde_json::to_string(&metadata_obj) {
Ok(s) => {
info!("✅ Metadata serialized successfully (length: {} bytes)", s.len());
s
},
Err(e) => {
error!("❌ Failed to serialize metadata: {}", e);
return Err(anyhow::anyhow!("Failed to serialize metadata: {}", e));
}
};
let mut metadata = indexmap::IndexMap::new();
metadata.insert("_encrypted_metadata".to_string(), serde_json::Value::String(metadata_str));
if let Some(val) = metadata_obj.get("profile") {
metadata.insert("profile".to_string(), val.clone());
}
if let Some(val) = metadata_obj.get("service") {
metadata.insert("service".to_string(), val.clone());
}
info!("📋 Qdrant metadata ready (hybrid: _encrypted_metadata + filterable fields [profile, service])");
info!("💾 Creating and storing document in Qdrant collection '{}'...", collection_name);
let document = types::Document::new(doc_id.clone(), content_str)
.with_embedding(embedding)
.with_metadata(metadata);
match self.vector_store.add_document(collection_name, document).await {
Ok(_) => {
let elapsed = start_time.elapsed();
info!("✅ Document stored successfully (doc_id: {}) in {:?}", doc_id, elapsed);
},
Err(e) => {
error!("❌ Failed to store document in Qdrant: {}", e);
return Err(e);
}
}
let result = services::AwsEstateIngestResult {
total_accounts: 1,
total_services: 1,
total_resources: 1,
parsed_resources: 1,
failed_resources: 0,
supported_services: vec![],
unsupported_services: vec![],
create_result: CreateResult {
created: 1,
failed: Vec::new(),
},
};
let total_time = start_time.elapsed();
info!("🎉 AWS estate ingestion completed successfully in {:?}", total_time);
Ok(result)
}
pub async fn ingest_aws_estate_batch(&self, estate_data_batch: Vec<serde_json::Value>, user_id: &str, collection_name: &str) -> Result<services::AwsEstateIngestResult> {
let start_time = std::time::Instant::now();
let batch_size = estate_data_batch.len();
info!("🚀 Starting batch AWS estate ingestion - user_id: {}, collection: {}, batch_size: {}",
user_id, collection_name, batch_size);
if !self.is_initialized().await {
info!("📦 RAG module not initialized, initializing...");
match self.initialize().await {
Ok(_) => info!("✅ RAG module initialized successfully"),
Err(e) => {
error!("❌ Failed to initialize RAG module: {}", e);
return Err(e);
}
}
}
info!("👤 Setting user context for user_id: {}", user_id);
if let Err(e) = self.set_user_context(user_id).await {
error!("❌ Failed to set user context: {}", e);
return Err(e);
}
let mut documents = Vec::new();
let mut content_strings = Vec::new();
let mut failed_resources = 0;
let mut failed_errors = Vec::new();
info!("📄 Processing {} documents in batch...", batch_size);
for (idx, estate_data) in estate_data_batch.iter().enumerate() {
debug!("Processing document {}/{}", idx + 1, batch_size);
let content_str = match estate_data.get("content") {
Some(content) => {
match content.as_str() {
Some(s) => s.to_string(),
None => {
failed_resources += 1;
let error_msg = format!("Document {}/{}: content field must be a string", idx + 1, batch_size);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
continue;
}
}
},
None => {
failed_resources += 1;
let error_msg = format!("Document {}/{}: content field is required", idx + 1, batch_size);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
continue;
}
};
let metadata_obj = match estate_data.as_object() {
Some(obj) => {
let mut metadata_obj = obj.clone();
metadata_obj.remove("content");
metadata_obj
},
None => {
failed_resources += 1;
let error_msg = format!("Document {}/{}: estate_data must be a JSON object", idx + 1, batch_size);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
continue;
}
};
let doc_id = Self::generate_doc_id_from_arn(collection_name, &metadata_obj);
debug!("🆔 Document {}/{}: Generated doc_id: {}", idx + 1, batch_size, doc_id);
let metadata_str = match serde_json::to_string(&metadata_obj) {
Ok(s) => s,
Err(e) => {
failed_resources += 1;
let error_msg = format!("Document {}/{}: Failed to serialize metadata: {}", idx + 1, batch_size, e);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
continue;
}
};
let mut metadata = indexmap::IndexMap::new();
metadata.insert("_encrypted_metadata".to_string(), serde_json::Value::String(metadata_str));
if let Some(val) = metadata_obj.get("profile") {
metadata.insert("profile".to_string(), val.clone());
}
if let Some(val) = metadata_obj.get("service") {
metadata.insert("service".to_string(), val.clone());
}
documents.push((doc_id, content_str.clone(), metadata));
content_strings.push(content_str);
}
let processed_count = documents.len();
info!("✅ Successfully processed {}/{} documents", processed_count, batch_size);
if documents.is_empty() {
error!("❌ No valid documents to process. All {} documents failed.", batch_size);
error!("Errors encountered: {:?}", failed_errors);
return Ok(services::AwsEstateIngestResult {
total_accounts: 0,
total_services: 0,
total_resources: batch_size,
parsed_resources: 0,
failed_resources,
supported_services: vec![],
unsupported_services: vec![],
create_result: CreateResult {
created: 0,
failed: failed_errors,
},
});
}
info!("🧮 Generating embeddings for {} documents...", processed_count);
let embedding_start = std::time::Instant::now();
let embeddings = match self.embedding_service.generate_embeddings_batch(&content_strings).await {
Ok(embs) => {
let embedding_duration = embedding_start.elapsed();
info!("✅ Batch embeddings generated successfully ({} embeddings in {:?})", embs.len(), embedding_duration);
embs
},
Err(e) => {
warn!("⚠️ Batch embedding generation failed, falling back to individual generation: {}", e);
let mut individual_embeddings = Vec::new();
for (idx, content) in content_strings.iter().enumerate() {
match self.embedding_service.generate_embedding(content).await {
Ok(embedding) => {
debug!("✅ Generated embedding for document {}/{}", idx + 1, content_strings.len());
individual_embeddings.push(embedding);
},
Err(e) => {
failed_resources += 1;
let error_msg = format!("Document {}/{}: Failed to generate embedding: {}", idx + 1, content_strings.len(), e);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
continue;
}
}
}
let embedding_duration = embedding_start.elapsed();
info!("✅ Individual embeddings generated ({} successful, {} failed in {:?})",
individual_embeddings.len(), content_strings.len() - individual_embeddings.len(), embedding_duration);
individual_embeddings
}
};
info!("📦 Creating final documents with embeddings...");
let mut final_documents = Vec::new();
for (i, (doc_id, content_str, metadata)) in documents.into_iter().enumerate() {
if i < embeddings.len() {
let document = types::Document::new(doc_id, content_str)
.with_embedding(embeddings[i].clone())
.with_metadata(metadata);
final_documents.push(document);
} else {
failed_resources += 1;
let error_msg = format!("Document {}: Missing embedding (embedding count mismatch)", i + 1);
error!("❌ {}", error_msg);
failed_errors.push(error_msg);
}
}
info!("✅ Created {} final documents ready for storage", final_documents.len());
info!("🛑 Disabling optimizer before bulk insert (prevents segment merge corruption)");
if let Err(e) = self.vector_store.disable_optimizer(collection_name).await {
warn!("⚠️ Failed to disable optimizer (non-critical): {}", e);
}
info!("💾 Storing {} documents to collection '{}'...", final_documents.len(), collection_name);
let storage_start = std::time::Instant::now();
let created = match self.vector_store.add_documents(collection_name, final_documents.clone()).await {
Ok(document_ids) => {
let count = document_ids.len();
let storage_duration = storage_start.elapsed();
info!("✅ Successfully stored {} documents to vector store in {:?}", count, storage_duration);
count
}
Err(e) => {
let storage_duration = storage_start.elapsed();
error!("❌ Batch insertion failed after {:?}: {}", storage_duration, e);
error!(" Collection: {}", collection_name);
error!(" Attempted to store: {} documents", final_documents.len());
error!(" Error details: {}", e);
failed_resources += final_documents.len();
failed_errors.push(format!("Batch insertion to vector store failed: {}", e));
0
}
};
info!("✅ Re-enabling optimizer after bulk insert");
if let Err(e) = self.vector_store.enable_optimizer(collection_name).await {
warn!("⚠️ Failed to re-enable optimizer (non-critical): {}", e);
}
let total_duration = start_time.elapsed();
info!("🎉 Batch AWS estate ingestion completed in {:?}", total_duration);
info!(" Total resources: {}", estate_data_batch.len());
info!(" Successfully created: {}", created);
info!(" Failed: {}", failed_resources);
if failed_resources > 0 {
warn!("⚠️ {} documents failed during batch ingestion:", failed_resources);
for (idx, error) in failed_errors.iter().enumerate() {
warn!(" {}. {}", idx + 1, error);
}
}
let result = services::AwsEstateIngestResult {
total_accounts: 1,
total_services: 1,
total_resources: estate_data_batch.len(),
parsed_resources: created,
failed_resources,
supported_services: vec![],
unsupported_services: vec![],
create_result: CreateResult {
created,
failed: failed_errors,
},
};
Ok(result)
}
pub async fn add_prompt(&self, context_id: &str, prompt: &str, user_id: &str, chat_title: Option<&str>, request_id: &str) -> Result<String> {
if !self.is_initialized().await {
self.initialize().await?;
}
if context_id.is_empty() || prompt.is_empty() {
return Err(anyhow::anyhow!("context_id and prompt are required"));
}
let message_index = self.get_next_message_index(context_id).await;
let title = chat_title.unwrap_or("Chat").to_string();
let chat_message = SessionMessage {
id: Uuid::new_v4().to_string(),
context_id: context_id.to_string(),
message_index,
role: MessageRole::User,
content: prompt.to_string(),
timestamp: Utc::now(),
chat_title: title.clone(),
metadata: None,
request_id: Some(request_id.to_string()),
};
let document_content = serde_json::json!({
"message_id": chat_message.id,
"i": context_id,
"r": "0",
"c": prompt,
"t": chat_message.timestamp,
"m": message_index,
"ct": title,
"ri": request_id });
let content_str = serde_json::to_string(&document_content)?;
let metadata = serde_json::json!({
"i": context_id,
"m": message_index,
"r": "0",
"t": chat_message.timestamp.to_rfc3339(),
"type": "chat_prompt",
"ct": title
});
let mut metadata_map = indexmap::IndexMap::new();
if let Some(obj) = metadata.as_object() {
for (k, v) in obj {
metadata_map.insert(k.clone(), v.clone());
}
}
let document = types::Document::new_with_vector_id(
chat_message.id.clone(), Uuid::new_v4().to_string(), content_str, )
.with_metadata(metadata_map)
.with_embedding(vec![0.1]);
if let Some(server_store) = self.vector_store.as_any().downcast_ref::<db::QdrantServerVectorStore>() {
server_store.set_user_context(user_id).await;
} else if let Some(dual_store) = self.vector_store.as_any().downcast_ref::<db::DualVectorStore>() {
dual_store.set_user_context(user_id).await;
} else if let Some(embedded_store) = self.vector_store.as_any().downcast_ref::<db::EmbeddedQdrantVectorStore>() {
embedded_store.set_user_context(user_id).await;
} else {
return Err(anyhow::anyhow!("Unsupported vector store type for chat operations"));
}
self.vector_store.add_document("chat_history", document).await?;
self.store_message(&chat_message).await?;
debug!("Added prompt for context: {}", context_id);
Ok(chat_message.id)
}
pub async fn add_response(&self, context_id: &str, raw_response: &str, user_id: &str, chat_title: Option<&str>, request_id: &str) -> Result<String> {
if !self.is_initialized().await {
self.initialize().await?;
}
if context_id.is_empty() || raw_response.is_empty() {
return Err(anyhow::anyhow!("context_id and rawResponse are required"));
}
let clean_content = self.extract_content_from_streaming_response(raw_response);
let message_index = self.get_next_message_index(context_id).await;
let title = chat_title.unwrap_or("Chat").to_string();
let chat_message = SessionMessage {
id: Uuid::new_v4().to_string(),
context_id: context_id.to_string(),
message_index,
role: MessageRole::Assistant,
content: clean_content.clone(),
timestamp: Utc::now(),
chat_title: title.clone(),
metadata: None,
request_id: Some(request_id.to_string()),
};
let document_content = serde_json::json!({
"message_id": chat_message.id,
"i": context_id,
"r": "1",
"c": clean_content,
"t": chat_message.timestamp,
"m": message_index,
"ct": title,
"ri": request_id });
let content_str = serde_json::to_string(&document_content)?;
let metadata = serde_json::json!({
"i": context_id,
"messageIndex": message_index,
"r": "1",
"t": chat_message.timestamp.to_rfc3339(),
"type": "chat_response",
"ct": title
});
let mut metadata_map = indexmap::IndexMap::new();
if let Some(obj) = metadata.as_object() {
for (k, v) in obj {
metadata_map.insert(k.clone(), v.clone());
}
}
let document = types::Document::new_with_vector_id(
chat_message.id.clone(), Uuid::new_v4().to_string(), content_str, )
.with_metadata(metadata_map)
.with_embedding(vec![0.1]);
if let Some(server_store) = self.vector_store.as_any().downcast_ref::<db::QdrantServerVectorStore>() {
server_store.set_user_context(user_id).await;
} else if let Some(dual_store) = self.vector_store.as_any().downcast_ref::<db::DualVectorStore>() {
dual_store.set_user_context(user_id).await;
} else if let Some(embedded_store) = self.vector_store.as_any().downcast_ref::<db::EmbeddedQdrantVectorStore>() {
embedded_store.set_user_context(user_id).await;
} else {
return Err(anyhow::anyhow!("Unsupported vector store type for chat operations"));
}
self.vector_store.add_document("chat_history", document).await?;
self.store_message(&chat_message).await?;
debug!("Added response for context: {}", context_id);
Ok(chat_message.id)
}
pub async fn get_session_chat_history(&self, context_id: &str) -> Result<SessionHistoryResult> {
if !self.is_initialized().await {
self.initialize().await?;
}
if context_id.is_empty() {
return Err(anyhow::anyhow!("contextId is required"));
}
let messages = self.load_messages_by_context(context_id).await?;
let mut prompts: Vec<SessionMessage> = messages
.iter()
.filter(|m| matches!(m.role, MessageRole::User))
.cloned()
.collect();
let mut responses: Vec<SessionMessage> = messages
.iter()
.filter(|m| matches!(m.role, MessageRole::Assistant))
.cloned()
.collect();
prompts.sort_by(|a, b| a.message_index.cmp(&b.message_index));
responses.sort_by(|a, b| a.message_index.cmp(&b.message_index));
let mut pairs = Vec::new();
let pair_count = std::cmp::min(prompts.len(), responses.len());
for i in 0..pair_count {
pairs.push(QueryResponsePair {
query: prompts[i].clone(),
response: responses[i].clone(),
});
}
Ok(SessionHistoryResult {
context_id: context_id.to_string(),
chat_title: "Retrieved Chat History".to_string(),
pairs,
total_pairs: pair_count,
is_empty: pair_count == 0,
})
}
pub async fn get_query_response_pairs(
&self,
context_id: &str,
max_pairs: Option<usize>
) -> Result<SessionHistoryResult> {
if !self.is_initialized().await {
self.initialize().await?;
}
if context_id.is_empty() {
return Err(anyhow::anyhow!("contextId is required"));
}
let messages = self.load_messages_by_context(context_id).await?;
let mut prompts: Vec<SessionMessage> = messages
.iter()
.filter(|m| matches!(m.role, MessageRole::User))
.cloned()
.collect();
let mut responses: Vec<SessionMessage> = messages
.iter()
.filter(|m| matches!(m.role, MessageRole::Assistant))
.cloned()
.collect();
prompts.sort_by(|a, b| a.message_index.cmp(&b.message_index));
responses.sort_by(|a, b| a.message_index.cmp(&b.message_index));
let pair_count = std::cmp::min(prompts.len(), responses.len());
let actual_max_pairs = max_pairs.unwrap_or(pair_count).min(pair_count);
let mut pairs = Vec::new();
let start_index = pair_count.saturating_sub(actual_max_pairs);
for i in start_index..pair_count {
pairs.push(QueryResponsePair {
query: prompts[i].clone(),
response: responses[i].clone(),
});
}
Ok(SessionHistoryResult {
context_id: context_id.to_string(),
chat_title: format!("Chat History ({} pairs)", pairs.len()),
pairs,
total_pairs: pair_count,
is_empty: pair_count == 0,
})
}
pub async fn get_all_contexts(&self, user_id: &str) -> Result<Vec<ContextInfo>> {
if !self.is_initialized().await {
self.initialize().await?;
}
let all_contexts_data = self.get_decrypted_chat_history(user_id).await?;
let mut contexts: Vec<ContextInfo> = all_contexts_data.iter().filter_map(|ctx| {
let context_id = ctx.get("i")?.as_str()?.to_string();
let chat_title = ctx.get("ct")?.as_str().unwrap_or("Chat").to_string();
let timestamp_str = ctx.get("t")?.as_str()?;
let timestamp = chrono::DateTime::parse_from_rfc3339(timestamp_str)
.ok()?
.with_timezone(&chrono::Utc);
Some(ContextInfo {
context_id,
chat_title,
created_at: timestamp,
updated_at: timestamp,
})
}).collect();
contexts.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
info!("Found {} unique contexts", contexts.len());
Ok(contexts)
}
pub async fn get_chat_history_json(&self, user_id: &str, context_id: &str) -> Result<serde_json::Value> {
if !self.is_initialized().await {
self.initialize().await?;
}
if context_id.is_empty() {
return Err(anyhow::anyhow!("context_id is required"));
}
let all_contexts = self.get_decrypted_chat_history(user_id).await?;
let context_data = all_contexts.iter()
.find(|ctx| {
ctx.get("i")
.and_then(|v| v.as_str())
.map(|id| id == context_id)
.unwrap_or(false)
});
match context_data {
Some(data) => {
info!("Retrieved chat history for context: {}", context_id);
Ok(data.clone())
}
None => {
Err(anyhow::anyhow!("No messages found for context_id: {}", context_id))
}
}
}
pub async fn delete_context_from_chat_history(&self, context_id: &str, user_id: &str) -> Result<usize> {
let start_time = std::time::Instant::now();
info!("🗑️ Starting context deletion - context_id: {}, user_id: {}", context_id, user_id);
if context_id.is_empty() {
error!("❌ Context ID cannot be empty");
return Err(anyhow::anyhow!("context_id cannot be empty"));
}
if user_id.is_empty() {
error!("❌ User ID cannot be empty");
return Err(anyhow::anyhow!("user_id cannot be empty"));
}
let mut total_deleted = 0;
info!("📄 Step 1: Processing chat_history-documents.json");
let user_documents_file = self.get_user_encrypted_documents_path(user_id);
if !user_documents_file.exists() {
warn!("⚠️ Chat history file not found at: {}", user_documents_file.display());
info!(" No documents to delete for user {}", user_id);
} else {
info!("✅ Found chat history file at: {}", user_documents_file.display());
let encrypted_docs = match self.get_encrypted_chat_documents(user_id).await {
Ok(docs) => {
info!("✅ Loaded {} encrypted documents", docs.len());
docs
}
Err(e) => {
error!("❌ Failed to load encrypted documents: {}", e);
return Err(anyhow::anyhow!("Failed to load chat documents: {}", e));
}
};
let original_count = encrypted_docs.len();
info!("📊 Original document count: {}", original_count);
let mut filtered_docs = Vec::new();
let mut deleted_count = 0;
for (idx, doc) in encrypted_docs.iter().enumerate() {
let decrypted_content = match self.encryption_service.decrypt_content(&doc.content).await {
Ok(content) => content,
Err(e) => {
warn!("⚠️ Document {}/{}: Failed to decrypt content: {}", idx + 1, original_count, e);
warn!(" Skipping document (id: {})", doc.id);
filtered_docs.push(doc.clone());
continue;
}
};
let content_json = match serde_json::from_str::<serde_json::Value>(&decrypted_content) {
Ok(json) => json,
Err(e) => {
warn!("⚠️ Document {}/{}: Failed to parse decrypted content: {}", idx + 1, original_count, e);
warn!(" Skipping document (id: {})", doc.id);
filtered_docs.push(doc.clone());
continue;
}
};
let doc_context_id = content_json
.get("i")
.and_then(|v| v.as_str())
.unwrap_or("");
if doc_context_id == context_id {
deleted_count += 1;
info!("🗑️ Document {}/{}: Marked for deletion (context_id: {}, doc_id: {})",
idx + 1, original_count, doc_context_id, doc.id);
} else {
filtered_docs.push(doc.clone());
}
}
if deleted_count == 0 {
info!("ℹ️ No documents found matching context_id: {}", context_id);
} else {
info!("📊 Deletion summary:");
info!(" - Documents to delete: {}", deleted_count);
info!(" - Documents to keep: {}", filtered_docs.len());
info!("💾 Saving filtered documents to file...");
let wrapper = serde_json::json!({
"documents": filtered_docs,
"count": filtered_docs.len(),
"lastModified": Utc::now().to_rfc3339(),
});
match serde_json::to_string_pretty(&wrapper) {
Ok(documents_content) => {
match fs::write(&user_documents_file, documents_content).await {
Ok(_) => {
let elapsed = start_time.elapsed();
info!("✅ Successfully saved {} documents (deleted {} documents) in {:?}",
filtered_docs.len(), deleted_count, elapsed);
total_deleted += deleted_count;
}
Err(e) => {
error!("❌ Failed to write filtered documents to file: {}", e);
error!(" File path: {}", user_documents_file.display());
return Err(anyhow::anyhow!("Failed to save filtered documents: {}", e));
}
}
}
Err(e) => {
error!("❌ Failed to serialize filtered documents: {}", e);
return Err(anyhow::anyhow!("Failed to serialize documents: {}", e));
}
}
}
}
info!("📄 Step 2: Processing messages.jsonl");
if !self.messages_file.exists() {
warn!("⚠️ Messages file not found at: {}", self.messages_file.display());
info!(" No messages to delete");
} else {
info!("✅ Found messages file at: {}", self.messages_file.display());
match fs::read_to_string(&self.messages_file).await {
Ok(content) => {
let lines: Vec<&str> = content.lines().collect();
let original_message_count = lines.len();
info!("📊 Original message count: {}", original_message_count);
let mut filtered_lines = Vec::new();
let mut deleted_message_count = 0;
for (idx, line) in lines.iter().enumerate() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<serde_json::Value>(line) {
Ok(message) => {
let msg_context_id = message
.get("context_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if msg_context_id == context_id {
deleted_message_count += 1;
debug!("🗑️ Message {}/{}: Marked for deletion (context_id: {})",
idx + 1, original_message_count, msg_context_id);
} else {
filtered_lines.push(line.to_string());
}
}
Err(e) => {
warn!("⚠️ Message {}/{}: Failed to parse JSON: {}", idx + 1, original_message_count, e);
warn!(" Keeping message (safer to not delete unparseable data)");
filtered_lines.push(line.to_string());
}
}
}
if deleted_message_count == 0 {
info!("ℹ️ No messages found matching context_id: {}", context_id);
} else {
info!("📊 Message deletion summary:");
info!(" - Messages to delete: {}", deleted_message_count);
info!(" - Messages to keep: {}", filtered_lines.len());
info!("💾 Saving filtered messages to file...");
let new_content = filtered_lines.join("\n");
let new_content_with_newline = if !new_content.is_empty() {
format!("{}\n", new_content)
} else {
new_content
};
match fs::write(&self.messages_file, new_content_with_newline).await {
Ok(_) => {
info!("✅ Successfully saved {} messages (deleted {} messages)",
filtered_lines.len(), deleted_message_count);
}
Err(e) => {
error!("❌ Failed to write filtered messages to file: {}", e);
error!(" File path: {}", self.messages_file.display());
return Err(anyhow::anyhow!("Failed to save filtered messages: {}", e));
}
}
}
}
Err(e) => {
error!("❌ Failed to read messages file: {}", e);
error!(" File path: {}", self.messages_file.display());
return Err(anyhow::anyhow!("Failed to read messages file: {}", e));
}
}
}
info!("🧹 Step 3: Cleaning up in-memory message counter");
{
let mut counters = self.message_counters.write().await;
if counters.remove(context_id).is_some() {
info!("✅ Removed in-memory counter for context_id: {}", context_id);
} else {
info!("ℹ️ No in-memory counter found for context_id: {}", context_id);
}
}
let total_time = start_time.elapsed();
info!("🎉 Context deletion completed successfully in {:?}", total_time);
info!("📊 Final summary:");
info!(" - Context ID: {}", context_id);
info!(" - User ID: {}", user_id);
info!(" - Total documents deleted: {}", total_deleted);
info!(" - Duration: {:?}", total_time);
Ok(total_deleted)
}
async fn get_next_message_index(&self, context_id: &str) -> u32 {
let mut counters = self.message_counters.write().await;
if let Some(¤t_index) = counters.get(context_id) {
let next_index = current_index + 1;
counters.insert(context_id.to_string(), next_index);
return current_index;
}
let max_index_result = self.get_max_index_from_local_file(context_id).await;
match max_index_result {
Ok(0) => {
counters.insert(context_id.to_string(), 1);
0
},
Ok(max_index) => {
counters.insert(context_id.to_string(), max_index + 2);
max_index + 1
},
Err(_) => {
counters.insert(context_id.to_string(), 1);
0
}
}
}
async fn get_max_index_from_local_file(&self, context_id: &str) -> Result<u32> {
use tokio::fs;
if !self.messages_file.exists() {
return Ok(0);
}
let content = fs::read_to_string(&self.messages_file).await?;
let mut max_index = 0u32;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(msg_context) = msg.get("context_id").and_then(|v| v.as_str()) {
if msg_context == context_id {
if let Some(idx) = msg.get("message_index").and_then(|v| v.as_u64()) {
max_index = max_index.max(idx as u32);
}
}
}
}
}
Ok(max_index)
}
async fn get_max_message_index_from_db(&self, context_id: &str, user_id: &str) -> Result<u32> {
let chat_options = services::search_service::ChatSearchOptions {
context_id: Some(context_id.to_string()),
role: None,
from_timestamp: None,
to_timestamp: None,
from_message_index: None,
to_message_index: None,
limit: Some(1000), include_metadata: false,
user_id: Some(user_id.to_string()),
};
let results = self.search_service.search_chat_history(chat_options).await?;
let mut max_index = 0u32;
for result in results {
if let Some(messages) = result.get("m").and_then(|v| v.as_array()) {
max_index = max_index.max(messages.len() as u32);
} else if let Some(index) = result.get("m").and_then(|v| v.as_i64()) {
max_index = max_index.max(index as u32);
} else if let Some(index) = result.get("message_index").and_then(|v| v.as_i64()) {
max_index = max_index.max(index as u32);
}
}
Ok(max_index)
}
fn extract_content_from_streaming_response(&self, raw_response: &str) -> String {
raw_response
.lines()
.filter(|line| !line.trim().is_empty())
.filter(|line| !line.starts_with("data:"))
.collect::<Vec<_>>()
.join("\n")
.trim()
.to_string()
}
async fn store_message(&self, message: &SessionMessage) -> Result<()> {
let json_line = serde_json::to_string(message)? + "\n";
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.messages_file)
.await?;
file.write_all(json_line.as_bytes()).await?;
file.flush().await?;
Ok(())
}
async fn load_messages_by_context(&self, context_id: &str) -> Result<Vec<SessionMessage>> {
let all_messages = self.load_all_messages().await?;
Ok(all_messages
.into_iter()
.filter(|m| m.context_id == context_id)
.collect())
}
async fn load_all_messages(&self) -> Result<Vec<SessionMessage>> {
self.load_jsonl_file(&self.messages_file).await
}
async fn load_jsonl_file<T: for<'de> Deserialize<'de>>(&self, file_path: &PathBuf) -> Result<Vec<T>> {
if !file_path.exists() {
return Ok(Vec::new());
}
let mut file = fs::File::open(file_path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
let mut items = Vec::new();
let mut failed_count = 0;
for (line_num, line) in contents.lines().enumerate() {
if !line.trim().is_empty() {
match serde_json::from_str::<T>(line) {
Ok(item) => items.push(item),
Err(e) => {
failed_count += 1;
error!("Failed to parse line {}: {}, error: {}", line_num + 1, line, e);
}
}
}
}
if failed_count > 0 {
error!("Total failed to parse: {} lines from {:?}", failed_count, file_path);
}
Ok(items)
}
async fn save_encrypted_chat_document(&self, message: &SessionMessage, user_id: &str) -> Result<()> {
let mut document_content = serde_json::json!({
"message_id": message.id,
"i": message.context_id,
"r": match message.role {
MessageRole::User => "0",
MessageRole::Assistant => "1"
},
"c": message.content,
"t": message.timestamp,
"m": message.message_index,
"ct": message.chat_title
});
if let Some(ref ri) = message.request_id {
document_content["ri"] = serde_json::Value::String(ri.clone());
}
let content_str = serde_json::to_string(&document_content)?;
let embedding = vec![0.1];
let encrypted_content = self.encryption_service.encrypt_content(&content_str).await?;
let metadata = serde_json::json!({
"i": message.context_id,
"r": match message.role {
MessageRole::User => "0",
MessageRole::Assistant => "1"
},
"m": message.message_index,
"t": message.timestamp,
"ct": message.chat_title
});
let encrypted_metadata_obj = self.encryption_service.encrypt_metadata(&metadata).await?;
let encrypted_metadata_str = encrypted_metadata_obj
.get("_encrypted_metadata")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Failed to extract encrypted metadata"))?
.to_string();
let encrypted_doc = EncryptedChatDocument {
id: Uuid::new_v4().to_string(),
vector_id: Uuid::new_v4().to_string(),
content: encrypted_content,
embedding, metadata: EncryptedDocumentMetadata {
encrypted_metadata: encrypted_metadata_str,
encrypted_content: true,
created_at: message.timestamp.to_rfc3339(),
updated_at: Utc::now().to_rfc3339(),
},
};
let user_documents_file = self.get_user_encrypted_documents_path(user_id);
let mut existing_docs: Vec<EncryptedChatDocument> = if user_documents_file.exists() {
let content = fs::read_to_string(&user_documents_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
serde_json::from_value(documents.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
existing_docs.push(encrypted_doc);
let user_directory = self.get_user_directory_path(user_id);
fs::create_dir_all(&user_directory).await?;
#[derive(serde::Serialize)]
struct DocumentsWrapper<'a> {
documents: &'a Vec<EncryptedChatDocument>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = DocumentsWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: Utc::now().to_rfc3339(),
};
let documents_content = serde_json::to_string_pretty(&wrapper)?;
fs::write(&user_documents_file, documents_content).await?;
Ok(())
}
async fn save_encrypted_document_to_collection(&self, collection_name: &str, content: &serde_json::Value, metadata: &serde_json::Value, user_id: &str, embedding: Option<Vec<f32>>) -> Result<String> {
let content_str = serde_json::to_string(content)?;
let encrypted_content = self.encryption_service.encrypt_content(&content_str).await?;
let encrypted_metadata_obj = self.encryption_service.encrypt_metadata(metadata).await?;
let encrypted_metadata_str = encrypted_metadata_obj
.get("_encrypted_metadata")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Failed to extract encrypted metadata"))?
.to_string();
let final_embedding = if let Some(emb) = embedding {
emb
} else if collection_name == "chat_history" {
vec![0.1] } else {
self.embedding_service.generate_embedding(&content_str).await
.unwrap_or_else(|_| {
use rand::Rng;
let mut rng = rand::thread_rng();
(0..1024).map(|_| rng.gen::<f64>() as f32).collect()
})
};
let document_id = Uuid::new_v4().to_string();
let encrypted_doc = EncryptedChatDocument {
id: document_id.clone(),
vector_id: Uuid::new_v4().to_string(),
content: encrypted_content,
embedding: final_embedding,
metadata: EncryptedDocumentMetadata {
encrypted_metadata: encrypted_metadata_str,
encrypted_content: true,
created_at: Utc::now().to_rfc3339(),
updated_at: Utc::now().to_rfc3339(),
},
};
let user_documents_file = self.get_user_collection_documents_path(user_id, collection_name);
let mut existing_docs: Vec<EncryptedChatDocument> = if user_documents_file.exists() {
let content = fs::read_to_string(&user_documents_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
serde_json::from_value(documents.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
let mut metadata_map = indexmap::IndexMap::new();
metadata_map.insert("_encrypted_metadata".to_string(), serde_json::Value::String(encrypted_doc.metadata.encrypted_metadata));
metadata_map.insert("_encrypted_content".to_string(), serde_json::Value::Bool(true));
metadata_map.insert("created_at".to_string(), serde_json::Value::String(encrypted_doc.metadata.created_at));
metadata_map.insert("updated_at".to_string(), serde_json::Value::String(encrypted_doc.metadata.updated_at));
let vector_doc = types::Document::new_with_vector_id(
encrypted_doc.id,
encrypted_doc.vector_id,
encrypted_doc.content
)
.with_metadata(metadata_map)
.with_embedding(encrypted_doc.embedding);
if let Some(embedded_store) = self.vector_store.as_any().downcast_ref::<db::EmbeddedQdrantVectorStore>() {
embedded_store.set_user_context(user_id).await;
}
else if let Some(server_store) = self.vector_store.as_any().downcast_ref::<db::QdrantServerVectorStore>() {
server_store.set_user_context(user_id).await;
}
self.vector_store.add_document(collection_name, vector_doc).await?;
Ok(document_id)
}
pub fn get_user_collection_documents_path(&self, user_id: &str, collection_name: &str) -> std::path::PathBuf {
self.get_user_directory_path(user_id).join(format!("{}-documents.json", collection_name))
}
pub async fn add_document_to_collection(&self, collection_name: &str, content: &serde_json::Value, metadata: &serde_json::Value, user_id: &str) -> Result<String> {
if !self.is_initialized().await {
self.initialize().await?;
}
let document_id = self.save_encrypted_document_to_collection(collection_name, content, metadata, user_id, None).await?;
info!("Added document {} to collection {} for user {}", document_id, collection_name, user_id);
Ok(document_id)
}
pub async fn get_collection_documents(&self, collection_name: &str, user_id: &str) -> Result<Vec<EncryptedChatDocument>> {
let user_documents_file = self.get_user_collection_documents_path(user_id, collection_name);
if !user_documents_file.exists() {
return Ok(Vec::new());
}
let content = fs::read_to_string(&user_documents_file).await?;
if content.trim().is_empty() {
return Ok(Vec::new());
}
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
let docs: Vec<EncryptedChatDocument> = serde_json::from_value(documents.clone())?;
return Ok(docs);
}
}
let docs: Vec<EncryptedChatDocument> = serde_json::from_str(&content).unwrap_or_default();
Ok(docs)
}
#[allow(dead_code)]
pub async fn sync_chat_history_from_qdrant(&self, user_id: &str) -> Result<()> {
if !self.is_initialized().await {
self.initialize().await?;
}
let user_documents_file = self.get_user_encrypted_documents_path(user_id);
let mut existing_docs: Vec<EncryptedChatDocument> = if user_documents_file.exists() {
let content = fs::read_to_string(&user_documents_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
serde_json::from_value(documents.clone()).unwrap_or_default()
} else {
serde_json::from_str(&content).unwrap_or_default()
}
} else {
Vec::new()
}
} else {
Vec::new()
}
} else {
Vec::new()
};
let qdrant_documents = self.vector_store.list_documents("chat_history", None, None).await?;
for doc in qdrant_documents {
let mut belongs_to_user = false;
if let Some(encrypted_metadata) = doc.metadata.get("_encrypted_metadata").and_then(|v| v.as_str()) {
if let Ok(decrypted_metadata) = self.encryption_service.decrypt_metadata(&serde_json::json!({"_encrypted_metadata": encrypted_metadata})).await {
if let Some(doc_session_id) = decrypted_metadata.get("sessionId").and_then(|v| v.as_str()) {
belongs_to_user = true; }
}
}
if !belongs_to_user {
continue; }
let encrypted_metadata_str = doc.metadata.get("_encrypted_metadata")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let encrypted_content_flag = doc.metadata.get("_encrypted_content")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let created_at = doc.metadata.get("created_at")
.and_then(|v| v.as_str())
.unwrap_or(&doc.created_at.to_rfc3339())
.to_string();
let updated_at = doc.metadata.get("updated_at")
.and_then(|v| v.as_str())
.unwrap_or(&doc.updated_at.to_rfc3339())
.to_string();
let encrypted_doc = EncryptedChatDocument {
id: doc.id.clone(),
vector_id: doc.vector_id.clone(),
content: doc.content.clone(),
embedding: doc.embedding.unwrap_or_else(|| vec![0.1]),
metadata: EncryptedDocumentMetadata {
encrypted_metadata: encrypted_metadata_str,
encrypted_content: encrypted_content_flag,
created_at,
updated_at,
},
};
if let Some(existing_index) = existing_docs.iter().position(|d| d.id == doc.id) {
existing_docs[existing_index] = encrypted_doc;
} else {
existing_docs.push(encrypted_doc);
}
}
#[derive(serde::Serialize)]
struct ChatHistoryWrapper<'a> {
documents: &'a Vec<EncryptedChatDocument>,
count: usize,
#[serde(rename = "lastModified")]
last_modified: String,
}
let wrapper = ChatHistoryWrapper {
documents: &existing_docs,
count: existing_docs.len(),
last_modified: Utc::now().to_rfc3339(),
};
let user_directory = self.get_user_directory_path(user_id);
fs::create_dir_all(&user_directory).await?;
let content = serde_json::to_string_pretty(&wrapper)?;
fs::write(&user_documents_file, content).await?;
info!("Updated chat history JSON file for user {} with {} documents from Qdrant", user_id, existing_docs.len());
Ok(())
}
async fn save_aws_estate_documents(&self, data: &serde_json::Value, _document_ids: &[String], user_id: &str, collection_name: &str) -> Result<()> {
let content_str = if let Some(content) = data.get("content") {
content.as_str()
.ok_or_else(|| anyhow::anyhow!("content field must be a string"))?
.to_string()
} else {
return Err(anyhow::anyhow!("content field is required in the JSON object"));
};
let mut metadata_obj = data.as_object()
.ok_or_else(|| anyhow::anyhow!("data must be a JSON object"))?
.clone();
metadata_obj.remove("content");
let embedding = self.embedding_service.generate_embedding(&content_str).await?;
let doc_id = Self::generate_doc_id_from_arn(collection_name, &metadata_obj);
let metadata_str = serde_json::to_string(&metadata_obj)?;
let mut metadata = indexmap::IndexMap::new();
metadata.insert("_encrypted_metadata".to_string(), serde_json::Value::String(metadata_str));
let document = types::Document::new(doc_id, content_str)
.with_embedding(embedding)
.with_metadata(metadata);
self.vector_store.add_document(collection_name, document).await?;
Ok(())
}
pub async fn get_encrypted_chat_documents(&self, user_id: &str) -> Result<Vec<EncryptedChatDocument>> {
let user_documents_file = self.get_user_encrypted_documents_path(user_id);
if user_documents_file.exists() {
let content = fs::read_to_string(&user_documents_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
let docs: Vec<EncryptedChatDocument> = serde_json::from_value(documents.clone())?;
Ok(docs)
} else {
let docs: Vec<EncryptedChatDocument> = serde_json::from_str(&content)?;
Ok(docs)
}
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
}
pub async fn get_encrypted_aws_estate_documents(&self) -> Result<Vec<EncryptedChatDocument>> {
if self.aws_estate_file.exists() {
let content = fs::read_to_string(&self.aws_estate_file).await?;
if !content.trim().is_empty() {
if let Ok(wrapper) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(documents) = wrapper.get("documents") {
let docs: Vec<EncryptedChatDocument> = serde_json::from_value(documents.clone())?;
Ok(docs)
} else {
let docs: Vec<EncryptedChatDocument> = serde_json::from_str(&content)?;
Ok(docs)
}
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
}
pub fn get_user_encrypted_documents_path(&self, user_id: &str) -> PathBuf {
self.qdrant_data_path
.join(user_id)
.join("chat_history-documents.json")
}
fn get_user_directory_path(&self, user_id: &str) -> PathBuf {
self.qdrant_data_path.join(user_id)
}
pub async fn get_decrypted_chat_history(&self, user_id: &str) -> Result<Vec<serde_json::Value>> {
let encrypted_docs = match self.get_encrypted_chat_documents(user_id).await {
Ok(docs) => docs,
Err(e) => {
println!("Warning: Could not load encrypted documents for user {}: {}", user_id, e);
return Ok(Vec::new());
}
};
if encrypted_docs.is_empty() {
return Ok(Vec::new());
}
use indexmap::IndexMap;
let mut contexts: IndexMap<String, serde_json::Value> = IndexMap::new();
for doc in encrypted_docs {
let decrypted_content = match self.encryption_service.decrypt_content(&doc.content).await {
Ok(content) => content,
Err(e) => {
println!("Warning: Failed to decrypt content for document {}: {}", doc.id, e);
continue;
}
};
let (message_content, content_json) = match serde_json::from_str::<serde_json::Value>(&decrypted_content) {
Ok(json) => {
let content = json.get("c")
.or_else(|| json.get("content"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| {
String::new()
});
(content, Some(json))
},
Err(_) => {
(decrypted_content, None)
}
};
let metadata_json = serde_json::json!({"_encrypted_metadata": doc.metadata.encrypted_metadata});
let decrypted_metadata = match self.encryption_service.decrypt_metadata(&metadata_json).await {
Ok(metadata) => metadata,
Err(e) => {
println!("Warning: Failed to decrypt metadata for document {}: {}", doc.id, e);
continue;
}
};
let context_id = if let Some(ref json) = content_json {
json.get("i")
.or_else(|| json.get("i"))
.and_then(|v| v.as_str())
.or_else(|| {
decrypted_metadata.get("i")
.or_else(|| decrypted_metadata.get("i"))
.and_then(|v| v.as_str())
})
.unwrap_or("unknown")
.to_string()
} else {
decrypted_metadata.get("i")
.or_else(|| decrypted_metadata.get("i"))
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string()
};
let role = if let Some(ref json) = content_json {
json.get("r")
.and_then(|v| v.as_str())
.or_else(|| {
decrypted_metadata.get("r")
.and_then(|v| v.as_str())
})
.unwrap_or("unknown")
} else {
decrypted_metadata.get("r")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
};
let timestamp = if let Some(ref json) = content_json {
json.get("t")
.and_then(|v| v.as_str())
.or_else(|| {
decrypted_metadata.get("t")
.or_else(|| decrypted_metadata.get("t"))
.or_else(|| decrypted_metadata.get("t"))
.and_then(|v| v.as_str())
})
.unwrap_or("")
.to_string()
} else {
decrypted_metadata.get("t")
.or_else(|| decrypted_metadata.get("t"))
.or_else(|| decrypted_metadata.get("t"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
};
let chat_title = if let Some(ref json) = content_json {
json.get("ct")
.or_else(|| json.get("ct"))
.and_then(|v| v.as_str())
.or_else(|| {
decrypted_metadata.get("ct")
.or_else(|| decrypted_metadata.get("ct"))
.and_then(|v| v.as_str())
})
.map(|s| s.to_string())
} else {
decrypted_metadata.get("ct")
.or_else(|| decrypted_metadata.get("ct"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
};
if !contexts.contains_key(&context_id) {
let mut context_obj = IndexMap::new();
context_obj.insert("i".to_string(), serde_json::Value::String(context_id.clone()));
context_obj.insert("a".to_string(), serde_json::Value::String(user_id.to_string()));
context_obj.insert("ct".to_string(), chat_title.clone().map(serde_json::Value::String).unwrap_or(serde_json::Value::Null));
context_obj.insert("m".to_string(), serde_json::Value::Array(Vec::new()));
context_obj.insert("t".to_string(), serde_json::Value::String("".to_string())); context_obj.insert("l".to_string(), serde_json::Value::Number(0.into()));
contexts.insert(context_id.clone(), serde_json::Value::Object(context_obj.into_iter().collect()));
} else if chat_title.is_some() {
if let Some(context_entry) = contexts.get_mut(&context_id) {
if context_entry.get("ct").and_then(|v| v.as_str()).is_none() {
if let Some(obj) = context_entry.as_object_mut() {
obj.insert("ct".to_string(), serde_json::Value::String(chat_title.clone().unwrap()));
}
}
}
}
let message_index = if let Some(ref json) = content_json {
json.get("m")
.or_else(|| json.get("message_index"))
.and_then(|v| v.as_u64())
.unwrap_or(0)
} else {
0
};
let request_id = if let Some(ref json) = content_json {
json.get("ri")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
};
let mut message_obj = IndexMap::new();
message_obj.insert("r".to_string(), serde_json::Value::String(role.to_string()));
message_obj.insert("c".to_string(), serde_json::Value::String(message_content));
if let Some(ri) = request_id {
message_obj.insert("ri".to_string(), serde_json::Value::String(ri));
}
message_obj.insert("_t".to_string(), serde_json::Value::String(timestamp.clone())); message_obj.insert("_i".to_string(), serde_json::Value::Number(message_index.into()));
let context_entry = contexts.get_mut(&context_id).unwrap();
if let Some(messages) = context_entry.get_mut("m").and_then(|m| m.as_array_mut()) {
messages.push(serde_json::Value::Object(message_obj.into_iter().collect()));
}
}
for (_context_id, context_data) in contexts.iter_mut() {
if let Some(messages) = context_data.get_mut("m").and_then(|m| m.as_array_mut()) {
messages.sort_by(|a, b| {
let a_idx = a.get("_i").and_then(|i| i.as_u64()).unwrap_or(0);
let b_idx = b.get("_i").and_then(|i| i.as_u64()).unwrap_or(0);
match a_idx.cmp(&b_idx) {
std::cmp::Ordering::Equal => {
let a_time = a.get("_t").and_then(|t| t.as_str()).unwrap_or("");
let b_time = b.get("_t").and_then(|t| t.as_str()).unwrap_or("");
a_time.cmp(b_time)
},
other => other
}
});
let latest_timestamp = messages.iter()
.max_by_key(|msg| msg.get("_i").and_then(|i| i.as_u64()).unwrap_or(0))
.and_then(|msg| msg.get("_t").and_then(|t| t.as_str()))
.map(|s| s.to_string());
let cleaned_messages: Vec<serde_json::Value> = messages.iter().map(|message| {
let mut clean_msg = IndexMap::new();
clean_msg.insert("r".to_string(), message.get("r").cloned().unwrap_or(serde_json::Value::Null));
clean_msg.insert("c".to_string(), message.get("c").cloned().unwrap_or(serde_json::Value::Null));
if let Some(ri) = message.get("ri") {
clean_msg.insert("ri".to_string(), ri.clone());
}
serde_json::Value::Object(clean_msg.into_iter().collect())
}).collect();
*messages = cleaned_messages;
let message_count = messages.len();
if let Some(obj) = context_data.as_object_mut() {
obj.insert("l".to_string(), serde_json::Value::Number(message_count.into()));
if let Some(ts) = latest_timestamp {
obj.insert("t".to_string(), serde_json::Value::String(ts));
}
}
}
}
let result: Vec<serde_json::Value> = contexts.into_iter()
.map(|(_key, value)| value)
.collect();
Ok(result)
}
pub fn get_aws_estate_path(&self) -> &std::path::Path {
&self.aws_estate_file
}
pub async fn clear_message_storage(&self) -> Result<()> {
if self.messages_file.exists() {
fs::remove_file(&self.messages_file).await?;
}
{
let mut counters = self.message_counters.write().await;
counters.clear();
}
info!("Cleared all session storage files");
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down RAG Module");
self.vector_store.shutdown().await?;
let mut initialized = self.initialized.write().await;
*initialized = false;
info!("RAG Module shutdown complete");
Ok(())
}
pub async fn create(&self, documents: Vec<Document>, collection_name: Option<&str>) -> Result<CreateResult> {
if !self.is_initialized().await {
self.initialize().await?;
}
let mut created = 0;
let mut failed = Vec::new();
let default_collection = collection_name.unwrap_or("core_estate");
for document in documents {
let collection_type = document.metadata.get("collection_type")
.and_then(|v| v.as_str())
.unwrap_or(default_collection)
.to_string();
match self.document_service.add_document(&collection_type, document).await {
Ok(_) => created += 1,
Err(e) => {
failed.push(format!("Failed to create document: {}", e));
}
}
}
Ok(CreateResult { created, failed })
}
pub async fn get_document_count(&self, collection_name: Option<&str>, filter: Option<serde_json::Value>) -> Result<usize> {
if !self.is_initialized().await {
self.initialize().await?;
}
let collection = collection_name.unwrap_or("core_estate");
let search_filter = None; self.document_service.get_document_count(collection, search_filter).await
}
pub async fn list_documents(&self, collection_name: Option<&str>, options: Option<serde_json::Value>) -> Result<Vec<Document>> {
if !self.is_initialized().await {
self.initialize().await?;
}
let collection = collection_name.unwrap_or("core_estate");
let limit = options.as_ref()
.and_then(|o| o.get("limit"))
.and_then(|l| l.as_u64())
.map(|l| l as usize);
let search_filter = None; self.document_service.list_documents(collection, limit, search_filter).await
}
pub async fn set_user_context(&self, user_id: &str) -> Result<()> {
if let Some(embedded_store) = self.vector_store.as_any().downcast_ref::<db::EmbeddedQdrantVectorStore>() {
embedded_store.set_user_context(user_id).await;
} else if let Some(server_store) = self.vector_store.as_any().downcast_ref::<db::QdrantServerVectorStore>() {
server_store.set_user_context(user_id).await;
} else if let Some(dual_store) = self.vector_store.as_any().downcast_ref::<db::DualVectorStore>() {
dual_store.set_user_context(user_id).await;
}
Ok(())
}
pub async fn delete_collection(&self, collection_name: &str, user_id: &str) -> Result<CollectionDeleteResult> {
if !self.is_initialized().await {
self.initialize().await?;
}
info!("🗑️ Deleting collection '{}' for user '{}'", collection_name, user_id);
let mut result = CollectionDeleteResult {
collection_name: collection_name.to_string(),
user_id: user_id.to_string(),
collection_deleted: false,
files_removed: 0,
removed_files: Vec::new(),
errors: Vec::new(),
};
self.set_user_context(user_id).await?;
match self.vector_store.delete_collection(collection_name).await {
Ok(_) => {
result.collection_deleted = true;
info!("✅ Collection '{}' deleted from vector store", collection_name);
}
Err(e) => {
let error_msg = format!("Failed to delete collection from vector store: {}", e);
error!("{}", error_msg);
result.errors.push(error_msg);
}
}
let user_data_path = self.base_path.join("qdrant-data").join(user_id);
let files_to_remove = vec![
format!("{}-documents.json", collection_name),
format!("{}-vectors.bin", collection_name),
format!("{}-vector-index.json", collection_name),
format!("{}-metadata.json", collection_name),
];
for file_name in files_to_remove {
let file_path = user_data_path.join(&file_name);
if file_path.exists() {
match fs::remove_file(&file_path).await {
Ok(_) => {
result.files_removed += 1;
result.removed_files.push(file_name.clone());
info!("✅ Removed file: {}", file_name);
}
Err(e) => {
let error_msg = format!("Failed to remove file '{}': {}", file_name, e);
error!("{}", error_msg);
result.errors.push(error_msg);
}
}
} else {
debug!("File '{}' does not exist, skipping", file_name);
}
}
if let Err(e) = fs::remove_dir(&user_data_path).await {
debug!("Could not remove user directory (likely not empty): {}", e);
}
if result.errors.is_empty() {
info!("🎉 Successfully deleted collection '{}': {} files removed",
collection_name, result.files_removed);
} else {
warn!("⚠️ Collection '{}' deletion completed with {} errors",
collection_name, result.errors.len());
}
Ok(result)
}
}
unsafe impl Send for RagModule {}
unsafe impl Sync for RagModule {}
pub async fn create_rag_module(base_path: impl Into<PathBuf>) -> Result<RagModule> {
RagModule::new(base_path).await
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_rag_module_creation() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
assert!(!rag.is_initialized().await);
}
#[tokio::test]
async fn test_rag_module_initialization() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
assert!(rag.is_initialized().await);
}
#[tokio::test]
async fn test_session_management() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let context_id = "test_context";
let user_id = "test_user";
let chat_title = Some("Test Chat");
let prompt_id = rag.add_prompt(context_id, "What is the weather?", user_id, chat_title).await.unwrap();
let response_id = rag.add_response(context_id, "It's sunny today!", user_id, chat_title).await.unwrap();
assert!(!prompt_id.is_empty());
assert!(!response_id.is_empty());
let history = rag.get_session_chat_history(context_id).await.unwrap();
assert_eq!(history.total_pairs, 1);
assert!(!history.is_empty);
assert_eq!(history.pairs[0].query.content, "What is the weather?");
assert_eq!(history.pairs[0].response.content, "It's sunny today!");
}
#[tokio::test]
async fn test_query_response_pairs() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let context_id = "test_context_2";
let user_id = "test_user";
let chat_title = Some("Test Chat");
rag.add_prompt(context_id, "Hello", user_id, chat_title).await.unwrap();
rag.add_response(context_id, "Hi there!", user_id, chat_title).await.unwrap();
rag.add_prompt(context_id, "How are you?", user_id, chat_title).await.unwrap();
rag.add_response(context_id, "I'm doing well!", user_id, chat_title).await.unwrap();
let pairs = rag.get_query_response_pairs(context_id, Some(1)).await.unwrap();
assert_eq!(pairs.total_pairs, 2);
assert_eq!(pairs.pairs.len(), 1); assert_eq!(pairs.pairs[0].query.content, "How are you?");
assert_eq!(pairs.pairs[0].response.content, "I'm doing well!");
let all_pairs = rag.get_query_response_pairs(context_id, None).await.unwrap();
assert_eq!(all_pairs.total_pairs, 2);
assert_eq!(all_pairs.pairs.len(), 2);
}
#[tokio::test]
async fn test_batch_aws_estate_ingestion() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let user_id = "test_user_batch";
let collection_name = "test_aws_estate";
let batch_data = vec![
serde_json::json!({
"content": "EC2 instance i-1234567890abcdef0 running in us-west-2",
"resource_type": "ec2_instance",
"instance_id": "i-1234567890abcdef0",
"region": "us-west-2",
"state": "running"
}),
serde_json::json!({
"content": "S3 bucket my-test-bucket with 1000 objects",
"resource_type": "s3_bucket",
"bucket_name": "my-test-bucket",
"region": "us-east-1",
"object_count": 1000
}),
serde_json::json!({
"content": "RDS database mydb-prod running MySQL 8.0",
"resource_type": "rds_instance",
"db_identifier": "mydb-prod",
"engine": "mysql",
"version": "8.0"
}),
serde_json::json!({
"content": "Lambda function process-orders with Python 3.9 runtime",
"resource_type": "lambda_function",
"function_name": "process-orders",
"runtime": "python3.9",
"timeout": 300
}),
serde_json::json!({
"content": "VPC vpc-12345678 with 3 subnets in us-west-2",
"resource_type": "vpc",
"vpc_id": "vpc-12345678",
"subnet_count": 3,
"region": "us-west-2"
})
];
let result = rag.ingest_aws_estate_batch(batch_data.clone(), user_id, collection_name).await.unwrap();
assert_eq!(result.total_resources, 5);
assert_eq!(result.parsed_resources, 5);
assert_eq!(result.failed_resources, 0);
assert_eq!(result.create_result.created, 5);
assert!(result.create_result.failed.is_empty());
println!("✅ Batch ingestion test completed: {} documents processed", result.parsed_resources);
}
#[tokio::test]
async fn test_batch_ingestion_with_invalid_data() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let user_id = "test_user_invalid";
let collection_name = "test_invalid_estate";
let batch_data = vec![
serde_json::json!({
"content": "Valid EC2 instance",
"resource_type": "ec2_instance",
"instance_id": "i-valid123"
}),
serde_json::json!({
"resource_type": "s3_bucket",
"bucket_name": "invalid-bucket"
}),
serde_json::json!({
"content": "Valid RDS instance",
"resource_type": "rds_instance",
"db_identifier": "valid-db"
}),
serde_json::json!({
"content": 12345, "resource_type": "lambda_function"
})
];
let result = rag.ingest_aws_estate_batch(batch_data, user_id, collection_name).await.unwrap();
assert_eq!(result.total_resources, 4);
assert_eq!(result.parsed_resources, 2);
assert_eq!(result.failed_resources, 2);
assert_eq!(result.create_result.created, 2);
assert_eq!(result.create_result.failed.len(), 2);
println!("✅ Invalid data test completed: {} valid, {} failed", result.parsed_resources, result.failed_resources);
}
#[tokio::test]
async fn test_empty_batch_ingestion() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let user_id = "test_user_empty";
let collection_name = "test_empty_estate";
let batch_data: Vec<serde_json::Value> = vec![];
let result = rag.ingest_aws_estate_batch(batch_data, user_id, collection_name).await.unwrap();
assert_eq!(result.total_resources, 0);
assert_eq!(result.parsed_resources, 0);
assert_eq!(result.failed_resources, 0);
assert_eq!(result.create_result.created, 0);
assert!(result.create_result.failed.is_empty());
println!("✅ Empty batch test completed");
}
#[tokio::test]
async fn test_large_batch_ingestion() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let user_id = "test_user_large";
let collection_name = "test_large_estate";
let mut batch_data = Vec::new();
for i in 0..32 {
batch_data.push(serde_json::json!({
"content": format!("AWS resource {} - EC2 instance with automated deployment", i),
"resource_type": "ec2_instance",
"instance_id": format!("i-{:016x}", i),
"region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
"state": "running",
"tags": {
"Environment": if i % 3 == 0 { "prod" } else { "dev" },
"Application": format!("app-{}", i % 5)
}
}));
}
let start_time = std::time::Instant::now();
let result = rag.ingest_aws_estate_batch(batch_data, user_id, collection_name).await.unwrap();
let duration = start_time.elapsed();
assert_eq!(result.total_resources, 32);
assert_eq!(result.parsed_resources, 32);
assert_eq!(result.failed_resources, 0);
assert_eq!(result.create_result.created, 32);
assert!(result.create_result.failed.is_empty());
println!("✅ Large batch test completed: {} documents in {:?}", result.parsed_resources, duration);
}
#[tokio::test]
async fn test_single_vs_batch_comparison() {
let temp_dir = TempDir::new().unwrap();
let rag = RagModule::new(temp_dir.path()).await.unwrap();
rag.initialize().await.unwrap();
let user_id = "test_user_comparison";
let test_documents = vec![
serde_json::json!({
"content": "Test EC2 instance for comparison",
"resource_type": "ec2_instance",
"instance_id": "i-comparison1"
}),
serde_json::json!({
"content": "Test S3 bucket for comparison",
"resource_type": "s3_bucket",
"bucket_name": "comparison-bucket"
}),
serde_json::json!({
"content": "Test RDS instance for comparison",
"resource_type": "rds_instance",
"db_identifier": "comparison-db"
})
];
let start_single = std::time::Instant::now();
for (i, doc) in test_documents.iter().enumerate() {
let collection_name = format!("test_single_{}", i);
let _result = rag.ingest_aws_estate(doc.clone(), user_id, &collection_name).await.unwrap();
}
let single_duration = start_single.elapsed();
let start_batch = std::time::Instant::now();
let batch_result = rag.ingest_aws_estate_batch(test_documents.clone(), user_id, "test_batch").await.unwrap();
let batch_duration = start_batch.elapsed();
assert_eq!(batch_result.parsed_resources, 3);
assert_eq!(batch_result.failed_resources, 0);
println!("✅ Performance comparison:");
println!(" Single ingestion (3 docs): {:?}", single_duration);
println!(" Batch ingestion (3 docs): {:?}", batch_duration);
println!(" Batch is {}x faster", single_duration.as_millis() as f64 / batch_duration.as_millis() as f64);
}
}