use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use crate::services::EncryptionService;
use crate::config::ConfigManager;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupData {
pub vectors: HashMap<String, Vec<f32>>,
pub mappings: HashMap<String, String>,
pub metadata: BackupMetadata,
pub timestamp: DateTime<Utc>,
pub version: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupMetadata {
pub total_vectors: usize,
pub total_mappings: usize,
pub backup_size_bytes: usize,
pub encryption_enabled: bool,
pub cloud_provider: String,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupOptions {
pub cloud_provider: Option<String>,
pub encryption_enabled: Option<bool>,
pub include_vectors: bool,
pub include_mappings: bool,
pub include_metadata: bool,
pub custom_path: Option<String>,
}
impl Default for BackupOptions {
fn default() -> Self {
Self {
cloud_provider: None,
encryption_enabled: None,
include_vectors: true,
include_mappings: true,
include_metadata: true,
custom_path: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestoreOptions {
pub cloud_provider: Option<String>,
pub backup_timestamp: Option<DateTime<Utc>>,
pub custom_path: Option<String>,
pub overwrite_existing: bool,
}
impl Default for RestoreOptions {
fn default() -> Self {
Self {
cloud_provider: None,
backup_timestamp: None,
custom_path: None,
overwrite_existing: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResult {
pub success: bool,
pub cloud_provider: String,
pub operation: String,
pub bytes_transferred: usize,
pub duration_ms: u64,
pub backup_path: Option<String>,
pub error_message: Option<String>,
}
pub struct SyncService {
encryption_service: Arc<EncryptionService>,
config_manager: Arc<ConfigManager>,
s3_client: Option<aws_sdk_s3::Client>,
}
impl SyncService {
pub async fn new(
encryption_service: Arc<EncryptionService>,
config_manager: Arc<ConfigManager>,
) -> Result<Self> {
let s3_client = Self::initialize_s3_client(&config_manager).await;
Ok(Self {
encryption_service,
config_manager,
s3_client,
})
}
pub async fn initialize(&self) -> Result<()> {
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
Ok(())
}
pub async fn backup(&self, options: BackupOptions) -> Result<SyncResult> {
let start_time = std::time::Instant::now();
let cloud_provider = self.detect_cloud_provider(&options)?;
let backup_data = self.create_backup_data(&options).await?;
let data_to_upload = if options.encryption_enabled.unwrap_or(true) {
let serialized_data = serde_json::to_string(&backup_data)?;
self.encryption_service.encrypt(&serialized_data).await?.into_bytes()
} else {
serde_json::to_vec(&backup_data)?
};
let backup_path = match cloud_provider.as_str() {
"aws" | "s3" => self.backup_to_s3(&data_to_upload, &options).await?,
"azure" => self.backup_to_azure(&data_to_upload, &options).await?,
"gcp" => self.backup_to_gcs(&data_to_upload, &options).await?,
_ => return Err(anyhow!("Unsupported cloud provider: {}", cloud_provider)),
};
let duration = start_time.elapsed();
Ok(SyncResult {
success: true,
cloud_provider,
operation: "backup".to_string(),
bytes_transferred: data_to_upload.len(),
duration_ms: duration.as_millis() as u64,
backup_path: Some(backup_path),
error_message: None,
})
}
pub async fn restore(&self, options: RestoreOptions) -> Result<SyncResult> {
let start_time = std::time::Instant::now();
let cloud_provider = self.detect_cloud_provider_from_restore(&options)?;
let encrypted_data = match cloud_provider.as_str() {
"aws" | "s3" => self.restore_from_s3(&options).await?,
"azure" => self.restore_from_azure(&options).await?,
"gcp" => self.restore_from_gcs(&options).await?,
_ => return Err(anyhow!("Unsupported cloud provider: {}", cloud_provider)),
};
let encrypted_str = String::from_utf8(encrypted_data.clone())?;
let decrypted_data = self.encryption_service.decrypt(&encrypted_str).await?;
let backup_data: BackupData = serde_json::from_str(&decrypted_data)?;
self.apply_restore_data(&backup_data, &options).await?;
let duration = start_time.elapsed();
Ok(SyncResult {
success: true,
cloud_provider,
operation: "restore".to_string(),
bytes_transferred: encrypted_data.len(),
duration_ms: duration.as_millis() as u64,
backup_path: None,
error_message: None,
})
}
async fn create_backup_data(&self, options: &BackupOptions) -> Result<BackupData> {
let mut vectors = HashMap::new();
let mut mappings = HashMap::new();
if options.include_vectors {
vectors.insert("sample_document_1".to_string(), vec![0.1, 0.2, 0.3]);
vectors.insert("sample_document_2".to_string(), vec![0.4, 0.5, 0.6]);
}
if options.include_mappings {
mappings.insert("doc_id_1".to_string(), "sample_document_1".to_string());
mappings.insert("doc_id_2".to_string(), "sample_document_2".to_string());
}
let metadata = BackupMetadata {
total_vectors: vectors.len(),
total_mappings: mappings.len(),
backup_size_bytes: 0, encryption_enabled: options.encryption_enabled.unwrap_or(true),
cloud_provider: options.cloud_provider.clone().unwrap_or_else(|| "aws".to_string()),
created_at: Utc::now(),
};
Ok(BackupData {
vectors,
mappings,
metadata,
timestamp: Utc::now(),
version: "1.0".to_string(),
})
}
async fn backup_to_s3(&self, data: &[u8], options: &BackupOptions) -> Result<String> {
if let Some(ref client) = self.s3_client {
let s3_config = self.config_manager.get_s3_config();
let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
let key = options.custom_path.clone()
.unwrap_or_else(|| format!("rag-backup-{}.bin", timestamp));
let mut put_request = client
.put_object()
.bucket(&s3_config.bucket)
.key(&key)
.body(aws_sdk_s3::primitives::ByteStream::from(data.to_vec()));
if s3_config.encryption_enabled {
put_request = put_request.server_side_encryption(
aws_sdk_s3::types::ServerSideEncryption::Aes256
);
}
put_request.send().await.map_err(|e| anyhow!("S3 upload failed: {}", e))?;
Ok(format!("s3://{}/{}", s3_config.bucket, key))
} else {
Err(anyhow!("S3 client not initialized"))
}
}
async fn backup_to_azure(&self, _data: &[u8], options: &BackupOptions) -> Result<String> {
let azure_config = self.config_manager.get_azure_blob_config();
if !azure_config.enabled {
return Err(anyhow!("Azure Blob Storage not configured"));
}
let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
let blob_name = options.custom_path.clone()
.unwrap_or_else(|| format!("rag-backup-{}.bin", timestamp));
Ok(format!("azure://{}/{}", azure_config.container_name, blob_name))
}
async fn backup_to_gcs(&self, _data: &[u8], options: &BackupOptions) -> Result<String> {
let gcs_config = self.config_manager.get_gcs_config();
if !gcs_config.enabled {
return Err(anyhow!("Google Cloud Storage not configured"));
}
let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
let object_name = options.custom_path.clone()
.unwrap_or_else(|| format!("rag-backup-{}.bin", timestamp));
Ok(format!("gs://{}/{}", gcs_config.bucket_name, object_name))
}
async fn restore_from_s3(&self, options: &RestoreOptions) -> Result<Vec<u8>> {
if let Some(ref client) = self.s3_client {
let s3_config = self.config_manager.get_s3_config();
let key = options.custom_path.clone()
.unwrap_or_else(|| "latest-backup.bin".to_string());
let response = client
.get_object()
.bucket(&s3_config.bucket)
.key(&key)
.send()
.await
.map_err(|e| anyhow!("S3 download failed: {}", e))?;
let data = response.body.collect().await
.map_err(|e| anyhow!("Failed to read S3 response body: {}", e))?
.into_bytes()
.to_vec();
Ok(data)
} else {
Err(anyhow!("S3 client not initialized"))
}
}
async fn restore_from_azure(&self, _options: &RestoreOptions) -> Result<Vec<u8>> {
let azure_config = self.config_manager.get_azure_blob_config();
if !azure_config.enabled {
return Err(anyhow!("Azure Blob Storage not configured"));
}
Ok(vec![]) }
async fn restore_from_gcs(&self, _options: &RestoreOptions) -> Result<Vec<u8>> {
let gcs_config = self.config_manager.get_gcs_config();
if !gcs_config.enabled {
return Err(anyhow!("Google Cloud Storage not configured"));
}
Ok(vec![]) }
async fn apply_restore_data(&self, backup_data: &BackupData, options: &RestoreOptions) -> Result<()> {
println!("Restoring {} vectors and {} mappings",
backup_data.metadata.total_vectors,
backup_data.metadata.total_mappings);
if !options.overwrite_existing {
println!("Checking for conflicts with existing data...");
}
Ok(())
}
fn detect_cloud_provider(&self, options: &BackupOptions) -> Result<String> {
if let Some(ref provider) = options.cloud_provider {
return Ok(provider.clone());
}
self.detect_default_cloud()
}
fn detect_cloud_provider_from_restore(&self, options: &RestoreOptions) -> Result<String> {
if let Some(ref provider) = options.cloud_provider {
return Ok(provider.clone());
}
self.detect_default_cloud()
}
fn detect_default_cloud(&self) -> Result<String> {
let s3_config = self.config_manager.get_s3_config();
let azure_config = self.config_manager.get_azure_blob_config();
let gcs_config = self.config_manager.get_gcs_config();
if s3_config.enabled && !s3_config.bucket.is_empty() {
return Ok("aws".to_string());
}
if azure_config.enabled && !azure_config.container_name.is_empty() {
return Ok("azure".to_string());
}
if gcs_config.enabled && !gcs_config.bucket_name.is_empty() {
return Ok("gcp".to_string());
}
Ok("aws".to_string())
}
async fn initialize_s3_client(config_manager: &ConfigManager) -> Option<aws_sdk_s3::Client> {
let s3_config = config_manager.get_s3_config();
if s3_config.enabled && !s3_config.bucket.is_empty() {
let mut aws_config_builder = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(aws_sdk_s3::config::Region::new(s3_config.region.clone()));
if let Some(endpoint_url) = &s3_config.endpoint_url {
aws_config_builder = aws_config_builder.endpoint_url(endpoint_url);
}
let aws_config = aws_config_builder.load().await;
return Some(aws_sdk_s3::Client::new(&aws_config));
}
None
}
pub async fn list_backups(&self, cloud_provider: Option<String>) -> Result<Vec<String>> {
let provider = cloud_provider.unwrap_or_else(|| self.detect_default_cloud().unwrap_or_else(|_| "aws".to_string()));
match provider.as_str() {
"aws" | "s3" => self.list_s3_backups().await,
"azure" => self.list_azure_backups().await,
"gcp" => self.list_gcs_backups().await,
_ => Err(anyhow!("Unsupported cloud provider: {}", provider)),
}
}
async fn list_s3_backups(&self) -> Result<Vec<String>> {
if let Some(ref client) = self.s3_client {
let s3_config = self.config_manager.get_s3_config();
let response = client
.list_objects_v2()
.bucket(&s3_config.bucket)
.prefix("rag-backup-")
.send()
.await
.map_err(|e| anyhow!("S3 list failed: {}", e))?;
let keys = response.contents()
.iter()
.filter_map(|obj| obj.key().map(|k| k.to_string()))
.collect();
Ok(keys)
} else {
Err(anyhow!("S3 client not initialized"))
}
}
async fn list_azure_backups(&self) -> Result<Vec<String>> {
Ok(vec!["azure-backup-1.bin".to_string(), "azure-backup-2.bin".to_string()])
}
async fn list_gcs_backups(&self) -> Result<Vec<String>> {
Ok(vec!["gcs-backup-1.bin".to_string(), "gcs-backup-2.bin".to_string()])
}
pub fn get_supported_providers(&self) -> Vec<String> {
vec!["aws".to_string(), "s3".to_string(), "azure".to_string(), "gcp".to_string()]
}
pub async fn delete_backup(&self, backup_path: &str, cloud_provider: Option<String>) -> Result<()> {
let provider = cloud_provider.unwrap_or_else(|| self.detect_default_cloud().unwrap_or_else(|_| "aws".to_string()));
match provider.as_str() {
"aws" | "s3" => self.delete_s3_backup(backup_path).await,
"azure" => self.delete_azure_backup(backup_path).await,
"gcp" => self.delete_gcs_backup(backup_path).await,
_ => Err(anyhow!("Unsupported cloud provider: {}", provider)),
}
}
async fn delete_s3_backup(&self, backup_path: &str) -> Result<()> {
if let Some(ref client) = self.s3_client {
let s3_config = self.config_manager.get_s3_config();
client
.delete_object()
.bucket(&s3_config.bucket)
.key(backup_path)
.send()
.await
.map_err(|e| anyhow!("S3 delete failed: {}", e))?;
Ok(())
} else {
Err(anyhow!("S3 client not initialized"))
}
}
async fn delete_azure_backup(&self, backup_path: &str) -> Result<()> {
println!("Would delete Azure backup: {}", backup_path);
Ok(())
}
async fn delete_gcs_backup(&self, backup_path: &str) -> Result<()> {
println!("Would delete GCS backup: {}", backup_path);
Ok(())
}
}