use anyhow::{Result, anyhow};
use std::sync::Arc;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use regex::Regex;
use chrono::{DateTime, Utc};
use tokio::sync::broadcast;
use crate::types::{Document, SearchFilter, DocumentMetadata};
use crate::db::VectorStore;
use crate::services::{IndexingService, MappingService, SecurityService, EncryptionService};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatPoint {
pub id: String,
pub payload: ChatPayload,
pub vector: Vec<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EstatePoint {
pub id: String,
pub payload: EstatePayload,
pub vector: Vec<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatPayload {
pub content: String,
pub chat_id: String,
pub user_id: String,
pub message_type: String,
pub timestamp: DateTime<Utc>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EstatePayload {
pub content: String,
pub cloud_provider: String,
pub service_type: String,
pub region: String,
pub resource_type: String,
pub account_id: String,
pub resource_identifier: String,
pub encrypted_content: Option<String>,
pub encrypted_data: Option<String>,
pub iam_permissions: Option<Vec<String>>,
pub tags: Option<HashMap<String, String>>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchCreateResult {
pub successful: Vec<String>,
pub failed: Vec<BatchCreateError>,
pub total: usize,
pub success_count: usize,
pub failure_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchCreateError {
pub document: Document,
pub error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentEvent {
pub event_type: String,
pub document_id: Option<String>,
pub collection_type: String,
pub payload: Option<serde_json::Value>,
pub timestamp: DateTime<Utc>,
}
pub struct DocumentService {
vector_store: Arc<dyn VectorStore + Send + Sync>,
indexing_service: Arc<IndexingService>,
mapping_service: Arc<MappingService>,
security_service: Arc<SecurityService>,
encryption_service: Arc<EncryptionService>,
event_sender: broadcast::Sender<DocumentEvent>,
_event_receiver: broadcast::Receiver<DocumentEvent>,
}
impl DocumentService {
pub async fn new(
vector_store: Arc<dyn VectorStore + Send + Sync>,
indexing_service: Arc<IndexingService>,
mapping_service: Arc<MappingService>,
security_service: Arc<SecurityService>,
encryption_service: Arc<EncryptionService>,
) -> Result<Self> {
let (event_sender, event_receiver) = broadcast::channel(1000);
Ok(Self {
vector_store,
indexing_service,
mapping_service,
security_service,
encryption_service,
event_sender,
_event_receiver: event_receiver,
})
}
pub fn generate_estate_point_id(&self, document: &Document) -> String {
let metadata = &document.metadata;
if let (Some(account_id), Some(region), Some(resource_id)) = (
metadata.get("account_id").and_then(|v| v.as_str()),
metadata.get("region").and_then(|v| v.as_str()),
metadata.get("resource_identifier").and_then(|v| v.as_str())
) {
return format!("{}-{}-{}", account_id, region, resource_id);
}
document.id.clone()
}
pub fn generate_chat_point_id(&self) -> String {
Uuid::new_v4().to_string()
}
pub fn generate_point_id(&self, document: &Document) -> String {
let doc_type = self.classify_document_type(document);
match doc_type.as_str() {
"chat" => self.generate_chat_point_id(),
"aws_estate" => self.generate_estate_point_id(document),
_ => document.id.clone(),
}
}
pub fn classify_document_type(&self, document: &Document) -> String {
let id = &document.id;
if id.starts_with("arn:aws:") {
return "aws_estate".to_string();
}
if id.starts_with("/subscriptions/") {
return "aws_estate".to_string();
}
if id.starts_with("projects/") {
return "aws_estate".to_string();
}
if id.starts_with("session_") || id.starts_with("prompt_") ||
id.starts_with("response_") || id.starts_with("test_doc_") {
return "chat".to_string();
}
"legacy".to_string()
}
pub fn detect_cloud_provider(&self, document: &Document) -> Option<String> {
let id = &document.id;
if id.starts_with("arn:aws:") {
return Some("aws".to_string());
}
if id.starts_with("/subscriptions/") {
return Some("azure".to_string());
}
if id.starts_with("projects/") {
return Some("gcp".to_string());
}
None
}
pub fn extract_service_type(&self, document: &Document) -> Option<String> {
let id = &document.id;
if let Some(captures) = Regex::new(r"^arn:aws:([^:]+):").unwrap().captures(id) {
return captures.get(1).map(|m| m.as_str().to_string());
}
if id.contains("/providers/") {
if let Some(provider_part) = id.split("/providers/").nth(1) {
if let Some(service) = provider_part.split('/').next() {
return Some(service.to_string());
}
}
}
if id.contains("compute") { return Some("compute".to_string()); }
if id.contains("storage") { return Some("storage".to_string()); }
None
}
pub fn extract_region(&self, document: &Document) -> Option<String> {
let metadata = &document.metadata;
if let Some(region) = metadata.get("region").and_then(|v| v.as_str()) {
return Some(region.to_string());
}
if let Some(captures) = Regex::new(r"^arn:aws:[^:]+:([^:]+):").unwrap().captures(&document.id) {
return captures.get(1).map(|m| m.as_str().to_string());
}
None
}
pub async fn create_chat_point(&self, document: &Document) -> Result<ChatPoint> {
let point_id = self.generate_chat_point_id();
let vector = vec![1.0];
let payload = ChatPayload {
content: document.content.clone(),
chat_id: document.metadata
.get("chat_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
user_id: document.metadata
.get("user_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
message_type: document.metadata
.get("message_type")
.and_then(|v| v.as_str())
.unwrap_or("user")
.to_string(),
timestamp: Utc::now(),
metadata: document.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
};
Ok(ChatPoint {
id: point_id,
payload,
vector,
})
}
pub async fn create_estate_point(&self, document: &Document) -> Result<EstatePoint> {
let point_id = self.generate_estate_point_id(document);
let content = self.generate_content(document);
let vector = self.indexing_service.generate_embedding(&content).await?;
let cloud_provider = self.detect_cloud_provider(document).unwrap_or("unknown".to_string());
let service_type = self.extract_service_type(document).unwrap_or("unknown".to_string());
let region = self.extract_region(document).unwrap_or("unknown".to_string());
let payload = EstatePayload {
content,
cloud_provider,
service_type,
region,
resource_type: document.metadata
.get("resource_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
account_id: document.metadata
.get("account_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
resource_identifier: document.metadata
.get("resource_identifier")
.and_then(|v| v.as_str())
.unwrap_or(&document.id)
.to_string(),
encrypted_content: None, encrypted_data: None,
iam_permissions: document.metadata
.get("iam_permissions")
.and_then(|v| serde_json::from_value(v.clone()).ok()),
tags: document.metadata
.get("tags")
.and_then(|v| serde_json::from_value(v.clone()).ok()),
metadata: document.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
};
Ok(EstatePoint {
id: point_id,
payload,
vector,
})
}
pub fn generate_content(&self, document: &Document) -> String {
let mut content_parts = Vec::new();
content_parts.push(document.content.clone());
if let Some(cloud) = self.detect_cloud_provider(document) {
content_parts.push(format!("Cloud Provider: {}", cloud));
}
if let Some(service) = self.extract_service_type(document) {
content_parts.push(format!("Service: {}", service));
}
if let Some(region) = self.extract_region(document) {
content_parts.push(format!("Region: {}", region));
}
let metadata = &document.metadata;
for (key, value) in metadata {
if !["encrypted_content", "encrypted_data", "iam_permissions"].contains(&key.as_str()) {
if let Some(str_val) = value.as_str() {
content_parts.push(format!("{}: {}", key, str_val));
}
}
}
content_parts.join(" | ")
}
pub fn enrich_document(&self, mut document: Document) -> Document {
let cloud_provider = self.detect_cloud_provider(&document);
let service_type = self.extract_service_type(&document);
let region = self.extract_region(&document);
let doc_type = self.classify_document_type(&document);
let metadata = &mut document.metadata;
if let Some(cloud) = cloud_provider {
metadata.insert("cloud_provider".to_string(), serde_json::Value::String(cloud));
}
if let Some(service) = service_type {
metadata.insert("service_type".to_string(), serde_json::Value::String(service));
}
if let Some(region) = region {
metadata.insert("region".to_string(), serde_json::Value::String(region));
}
metadata.insert("document_type".to_string(), serde_json::Value::String(doc_type));
document
}
pub async fn add_document(&self, collection_type: &str, document: Document) -> Result<String> {
let enriched_document = self.enrich_document(document.clone());
let doc_type = self.classify_document_type(&enriched_document);
let document_with_embeddings = match (collection_type, doc_type.as_str()) {
("chat_history", "chat") | ("chat_history", _) => {
let chat_point = self.create_chat_point(&enriched_document).await?;
let mut enhanced_doc = enriched_document;
enhanced_doc.embedding = Some(chat_point.vector);
enhanced_doc.metadata = serde_json::to_value(chat_point.payload)?.as_object().unwrap().clone().into_iter().map(|(k, v)| (k, v)).collect();
enhanced_doc
},
("aws_estate", "aws_estate") | ("aws_estate", _) => {
let estate_point = self.create_estate_point(&enriched_document).await?;
let mut enhanced_doc = enriched_document;
enhanced_doc.embedding = Some(estate_point.vector);
enhanced_doc.metadata = serde_json::to_value(estate_point.payload)?.as_object().unwrap().clone().into_iter().map(|(k, v)| (k, v)).collect();
enhanced_doc
},
("escher_library", _) => {
let content = self.generate_content(&enriched_document);
let vector = self.indexing_service.generate_embedding(&content).await?;
let mut enhanced_doc = enriched_document;
enhanced_doc.embedding = Some(vector);
enhanced_doc
},
("escher_scripts", _) => {
let content = self.generate_content(&enriched_document);
let vector = self.indexing_service.generate_embedding(&content).await?;
let mut enhanced_doc = enriched_document;
enhanced_doc.embedding = Some(vector);
enhanced_doc
},
_ => {
let content = self.generate_content(&enriched_document);
let vector = self.indexing_service.generate_embedding(&content).await?;
let mut enhanced_doc = enriched_document;
enhanced_doc.embedding = Some(vector);
enhanced_doc
}
};
let validated_document = self.validate_document(&document_with_embeddings)?;
let document_id = self.vector_store.add_document(collection_type, validated_document).await?;
self.emit_event(DocumentEvent {
event_type: "document-created".to_string(),
document_id: Some(document_id.clone()),
collection_type: collection_type.to_string(),
payload: None,
timestamp: Utc::now(),
});
Ok(document_id)
}
pub async fn create_chat_message(&self, collection_type: &str, document: Document) -> Result<String> {
let chat_point = self.create_chat_point(&document).await?;
let mut enhanced_document = document;
enhanced_document.embedding = Some(chat_point.vector);
enhanced_document.metadata = serde_json::to_value(chat_point.payload)?.as_object().unwrap().clone().into_iter().map(|(k, v)| (k, v)).collect();
self.add_document(collection_type, enhanced_document).await
}
pub async fn create_estate_resource(&self, collection_type: &str, document: Document) -> Result<String> {
let estate_point = self.create_estate_point(&document).await?;
let mut enhanced_document = document;
enhanced_document.embedding = Some(estate_point.vector);
enhanced_document.metadata = serde_json::to_value(estate_point.payload)?.as_object().unwrap().clone().into_iter().map(|(k, v)| (k, v)).collect();
self.add_document(collection_type, enhanced_document).await
}
pub async fn create_documents(&self, collection_type: &str, documents: Vec<Document>) -> Result<BatchCreateResult> {
let mut successful = Vec::new();
let mut failed = Vec::new();
for document in documents {
match self.add_document(collection_type, document.clone()).await {
Ok(id) => successful.push(id),
Err(e) => failed.push(BatchCreateError {
document,
error: e.to_string(),
}),
}
}
let result = BatchCreateResult {
total: successful.len() + failed.len(),
success_count: successful.len(),
failure_count: failed.len(),
successful,
failed,
};
self.emit_event(DocumentEvent {
event_type: "batch-create-completed".to_string(),
document_id: None,
collection_type: collection_type.to_string(),
payload: Some(serde_json::to_value(&result)?),
timestamp: Utc::now(),
});
Ok(result)
}
pub async fn get_document(&self, collection_type: &str, id: &str) -> Result<Option<Document>> {
self.vector_store.get_document(collection_type, id).await
}
pub async fn update_document(&self, collection_type: &str, id: &str, document: Document) -> Result<()> {
let enriched_document = self.enrich_document(document);
let validated_document = self.validate_document(&enriched_document)?;
self.vector_store.update_document(collection_type, id, validated_document).await?;
self.emit_event(DocumentEvent {
event_type: "document-updated".to_string(),
document_id: Some(id.to_string()),
collection_type: collection_type.to_string(),
payload: None,
timestamp: Utc::now(),
});
Ok(())
}
pub async fn update_metadata(&self, collection_type: &str, id: &str, metadata: HashMap<String, serde_json::Value>) -> Result<()> {
if let Some(mut document) = self.get_document(collection_type, id).await? {
document.metadata = metadata.into_iter().collect();
self.update_document(collection_type, id, document).await
} else {
Err(anyhow!("Document not found: {}", id))
}
}
pub async fn delete_document(&self, collection_type: &str, id: &str) -> Result<bool> {
let result = self.vector_store.delete_document(collection_type, id).await?;
if result {
self.emit_event(DocumentEvent {
event_type: "document-deleted".to_string(),
document_id: Some(id.to_string()),
collection_type: collection_type.to_string(),
payload: None,
timestamp: Utc::now(),
});
}
Ok(result)
}
pub async fn delete_documents(&self, collection_type: &str, ids: Vec<String>) -> Result<usize> {
let mut deleted_count = 0;
for id in ids {
if self.delete_document(collection_type, &id).await? {
deleted_count += 1;
}
}
self.emit_event(DocumentEvent {
event_type: "batch-delete-completed".to_string(),
document_id: None,
collection_type: collection_type.to_string(),
payload: Some(serde_json::json!({ "deleted_count": deleted_count })),
timestamp: Utc::now(),
});
Ok(deleted_count)
}
pub async fn delete_by_filter(&self, collection_type: &str, filter: SearchFilter) -> Result<usize> {
let documents = self.list_documents(collection_type, None, Some(filter.clone())).await?;
let ids: Vec<String> = documents.into_iter().map(|d| d.id).collect();
let count = self.delete_documents(collection_type, ids).await?;
self.emit_event(DocumentEvent {
event_type: "filter-delete-completed".to_string(),
document_id: None,
collection_type: collection_type.to_string(),
payload: Some(serde_json::json!({ "deleted_count": count, "filter": filter })),
timestamp: Utc::now(),
});
Ok(count)
}
pub async fn list_documents(&self, collection_type: &str, limit: Option<usize>, filter: Option<SearchFilter>) -> Result<Vec<Document>> {
let documents = self.vector_store.list_documents(collection_type, limit, filter.clone()).await?;
self.emit_event(DocumentEvent {
event_type: "documents-listed".to_string(),
document_id: None,
collection_type: collection_type.to_string(),
payload: Some(serde_json::json!({ "count": documents.len(), "filter": filter })),
timestamp: Utc::now(),
});
Ok(documents)
}
pub async fn get_document_count(&self, collection_type: &str, filter: Option<SearchFilter>) -> Result<usize> {
let documents = self.list_documents(collection_type, None, filter).await?;
Ok(documents.len())
}
pub fn validate_document(&self, document: &Document) -> Result<Document> {
if document.id.is_empty() {
return Err(anyhow!("Document ID cannot be empty"));
}
if document.content.is_empty() {
return Err(anyhow!("Document content cannot be empty"));
}
let doc_type = self.classify_document_type(document);
if doc_type == "aws_estate" {
if self.detect_cloud_provider(document).is_none() {
return Err(anyhow!("Estate document must have valid cloud provider format"));
}
}
Ok(document.clone())
}
fn emit_event(&self, event: DocumentEvent) {
let _ = self.event_sender.send(event);
}
pub fn subscribe_to_events(&self) -> broadcast::Receiver<DocumentEvent> {
self.event_sender.subscribe()
}
pub fn parse_concatenated_json(&self, response: &str) -> Result<Vec<serde_json::Value>> {
let mut results = Vec::new();
let lines: Vec<&str> = response.lines().collect();
for line in lines {
if !line.trim().is_empty() {
match serde_json::from_str(line.trim()) {
Ok(value) => results.push(value),
Err(_) => continue, }
}
}
Ok(results)
}
pub fn extract_content_from_streaming_response(&self, response: &str) -> String {
response.lines()
.filter_map(|line| {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(line.trim()) {
json.get("content").and_then(|c| c.as_str()).map(|s| s.to_string())
} else {
None
}
})
.collect::<Vec<String>>()
.join(" ")
}
pub async fn initialize(&self) -> Result<()> {
self.indexing_service.initialize().await?;
self.mapping_service.initialize().await?;
self.security_service.initialize().await?;
self.encryption_service.initialize().await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
self.indexing_service.shutdown().await?;
self.mapping_service.shutdown().await?;
self.security_service.shutdown().await?;
self.encryption_service.shutdown().await?;
Ok(())
}
}