use common::{DakeraError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::object::{ObjectStorage, ObjectStorageConfig};
use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotMetadata};
use crate::traits::VectorStorage;
#[derive(Debug, Clone)]
pub struct BackupConfig {
pub snapshot_config: SnapshotConfig,
pub remote_config: Option<ObjectStorageConfig>,
pub retention: RetentionPolicy,
pub verify_backups: bool,
pub compression: CompressionConfig,
pub encryption: Option<EncryptionConfig>,
}
impl Default for BackupConfig {
fn default() -> Self {
Self {
snapshot_config: SnapshotConfig::default(),
remote_config: None,
retention: RetentionPolicy::default(),
verify_backups: true,
compression: CompressionConfig::default(),
encryption: None,
}
}
}
#[derive(Debug, Clone)]
pub struct RetentionPolicy {
pub daily_retention_days: u32,
pub weekly_retention_weeks: u32,
pub monthly_retention_months: u32,
pub max_backups: usize,
}
impl Default for RetentionPolicy {
fn default() -> Self {
Self {
daily_retention_days: 7,
weekly_retention_weeks: 4,
monthly_retention_months: 12,
max_backups: 50,
}
}
}
#[derive(Debug, Clone)]
pub struct CompressionConfig {
pub enabled: bool,
pub level: u32,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
enabled: true,
level: 3, }
}
}
#[derive(Debug, Clone)]
pub struct EncryptionConfig {
pub key: Vec<u8>,
pub salt: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupMetadata {
pub snapshot: SnapshotMetadata,
pub backup_type: BackupType,
pub remote_location: Option<String>,
pub compressed: bool,
pub encrypted: bool,
pub checksum: String,
pub duration_ms: u64,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackupType {
Manual,
Scheduled,
PreOperation,
Continuous,
}
#[derive(Debug, Clone)]
pub struct VerificationResult {
pub backup_id: String,
pub valid: bool,
pub checksum_valid: bool,
pub data_integrity: bool,
pub vectors_verified: u64,
pub errors: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BackupStats {
pub total_backups: u64,
pub verified_backups: u64,
pub total_bytes_backed_up: u64,
pub total_bytes_compressed: u64,
pub avg_backup_duration_ms: u64,
pub last_backup_at: Option<u64>,
pub last_verification_at: Option<u64>,
pub backup_failures: u64,
}
pub struct BackupManager {
config: BackupConfig,
snapshot_manager: SnapshotManager,
remote_storage: Option<ObjectStorage>,
stats: BackupStats,
}
impl BackupManager {
pub fn new(config: BackupConfig) -> Result<Self> {
let snapshot_manager = SnapshotManager::new(config.snapshot_config.clone())?;
let remote_storage = if let Some(ref remote_config) = config.remote_config {
Some(ObjectStorage::new(remote_config.clone())?)
} else {
None
};
Ok(Self {
config,
snapshot_manager,
remote_storage,
stats: BackupStats::default(),
})
}
pub async fn create_backup<S: VectorStorage>(
&mut self,
storage: &S,
backup_type: BackupType,
description: Option<String>,
tags: HashMap<String, String>,
) -> Result<BackupMetadata> {
let start = std::time::Instant::now();
let snapshot = self
.snapshot_manager
.create_snapshot(storage, description)
.await?;
let duration_ms = start.elapsed().as_millis() as u64;
let checksum = self.calculate_checksum(&snapshot.id)?;
let mut backup_metadata = BackupMetadata {
snapshot,
backup_type,
remote_location: None,
compressed: self.config.compression.enabled,
encrypted: self.config.encryption.is_some(),
checksum,
duration_ms,
tags,
};
if let Some(ref remote) = self.remote_storage {
let remote_path = self
.upload_to_remote(remote, &backup_metadata.snapshot.id)
.await?;
backup_metadata.remote_location = Some(remote_path);
}
self.save_backup_metadata(&backup_metadata)?;
if self.config.verify_backups {
let verification = self.verify_backup(&backup_metadata.snapshot.id)?;
if !verification.valid {
return Err(DakeraError::Storage(format!(
"Backup verification failed: {:?}",
verification.errors
)));
}
}
self.stats.total_backups += 1;
self.stats.total_bytes_backed_up += backup_metadata.snapshot.size_bytes;
self.stats.last_backup_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs(),
);
self.apply_retention_policy().await?;
Ok(backup_metadata)
}
pub async fn create_incremental_backup<S: VectorStorage>(
&mut self,
storage: &S,
parent_id: &str,
changed_namespaces: &[String],
description: Option<String>,
tags: HashMap<String, String>,
) -> Result<BackupMetadata> {
let start = std::time::Instant::now();
let snapshot = self
.snapshot_manager
.create_incremental_snapshot(storage, parent_id, changed_namespaces, description)
.await?;
let duration_ms = start.elapsed().as_millis() as u64;
let checksum = self.calculate_checksum(&snapshot.id)?;
let mut backup_metadata = BackupMetadata {
snapshot,
backup_type: BackupType::Manual,
remote_location: None,
compressed: self.config.compression.enabled,
encrypted: self.config.encryption.is_some(),
checksum,
duration_ms,
tags,
};
if let Some(ref remote) = self.remote_storage {
let remote_path = self
.upload_to_remote(remote, &backup_metadata.snapshot.id)
.await?;
backup_metadata.remote_location = Some(remote_path);
}
self.save_backup_metadata(&backup_metadata)?;
self.stats.total_backups += 1;
self.stats.total_bytes_backed_up += backup_metadata.snapshot.size_bytes;
Ok(backup_metadata)
}
pub async fn restore_backup<S: VectorStorage>(
&mut self,
storage: &S,
backup_id: &str,
) -> Result<RestoreStats> {
let start = std::time::Instant::now();
if !self.snapshot_manager.snapshot_exists(backup_id) {
if let Some(ref remote) = self.remote_storage {
self.download_from_remote(remote, backup_id).await?;
} else {
return Err(DakeraError::Storage(format!(
"Backup not found: {}",
backup_id
)));
}
}
if self.config.verify_backups {
let verification = self.verify_backup(backup_id)?;
if !verification.valid {
return Err(DakeraError::Storage(format!(
"Backup verification failed before restore: {:?}",
verification.errors
)));
}
}
let result = self
.snapshot_manager
.restore_snapshot(storage, backup_id)
.await?;
let duration_ms = start.elapsed().as_millis() as u64;
Ok(RestoreStats {
backup_id: backup_id.to_string(),
namespaces_restored: result.namespaces_restored,
vectors_restored: result.vectors_restored,
duration_ms,
})
}
pub fn verify_backup(&mut self, backup_id: &str) -> Result<VerificationResult> {
let mut errors = Vec::new();
if !self.snapshot_manager.snapshot_exists(backup_id) {
return Ok(VerificationResult {
backup_id: backup_id.to_string(),
valid: false,
checksum_valid: false,
data_integrity: false,
vectors_verified: 0,
errors: vec!["Backup file not found".to_string()],
});
}
let _current_checksum = match self.calculate_checksum(backup_id) {
Ok(cs) => cs,
Err(e) => {
errors.push(format!("Checksum calculation failed: {}", e));
return Ok(VerificationResult {
backup_id: backup_id.to_string(),
valid: false,
checksum_valid: false,
data_integrity: false,
vectors_verified: 0,
errors,
});
}
};
let metadata = match self.snapshot_manager.get_snapshot_metadata(backup_id) {
Ok(m) => m,
Err(e) => {
errors.push(format!("Failed to read metadata: {}", e));
return Ok(VerificationResult {
backup_id: backup_id.to_string(),
valid: false,
checksum_valid: false,
data_integrity: false,
vectors_verified: 0,
errors,
});
}
};
let checksum_valid = match self.load_backup_metadata(backup_id) {
Ok(stored) => _current_checksum == stored.checksum,
Err(e) => {
tracing::warn!(
backup_id = backup_id,
error = %e,
"No backup metadata sidecar found; skipping checksum comparison (legacy backup)"
);
!_current_checksum.is_empty()
}
};
let data_integrity = errors.is_empty();
let valid = checksum_valid && data_integrity;
if valid {
self.stats.verified_backups += 1;
self.stats.last_verification_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs(),
);
}
Ok(VerificationResult {
backup_id: backup_id.to_string(),
valid,
checksum_valid,
data_integrity,
vectors_verified: metadata.total_vectors,
errors,
})
}
pub fn list_backups(&self) -> Result<Vec<SnapshotMetadata>> {
self.snapshot_manager.list_snapshots()
}
pub async fn delete_backup(&mut self, backup_id: &str) -> Result<bool> {
let local_deleted = self.snapshot_manager.delete_snapshot(backup_id)?;
let bak_path = self.backup_metadata_path(backup_id);
if bak_path.exists() {
if let Err(e) = std::fs::remove_file(&bak_path) {
tracing::warn!(
path = %bak_path.display(),
error = %e,
"Failed to remove backup metadata sidecar"
);
}
}
if let Some(ref remote) = self.remote_storage {
let remote_path = format!("backups/{}.snap", backup_id);
let _ = remote.delete(&"backups".to_string(), &[remote_path]).await;
}
Ok(local_deleted)
}
pub fn get_stats(&self) -> &BackupStats {
&self.stats
}
async fn apply_retention_policy(&mut self) -> Result<()> {
let backups = self.snapshot_manager.list_snapshots()?;
if backups.len() <= self.config.retention.max_backups {
return Ok(());
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
let daily_cutoff = now - (self.config.retention.daily_retention_days as u64 * 24 * 60 * 60);
let weekly_cutoff =
now - (self.config.retention.weekly_retention_weeks as u64 * 7 * 24 * 60 * 60);
let monthly_cutoff =
now - (self.config.retention.monthly_retention_months as u64 * 30 * 24 * 60 * 60);
let mut to_keep = Vec::new();
let mut to_delete = Vec::new();
for backup in backups {
if backup.created_at >= daily_cutoff {
to_keep.push(backup);
continue;
}
if backup.created_at >= weekly_cutoff {
let week_number = backup.created_at / (7 * 24 * 60 * 60);
let has_weekly = to_keep
.iter()
.any(|b: &SnapshotMetadata| b.created_at / (7 * 24 * 60 * 60) == week_number);
if !has_weekly {
to_keep.push(backup);
continue;
}
}
if backup.created_at >= monthly_cutoff {
let month_number = backup.created_at / (30 * 24 * 60 * 60);
let has_monthly = to_keep
.iter()
.any(|b: &SnapshotMetadata| b.created_at / (30 * 24 * 60 * 60) == month_number);
if !has_monthly {
to_keep.push(backup);
continue;
}
}
to_delete.push(backup);
}
while to_keep.len() > self.config.retention.max_backups && !to_keep.is_empty() {
if let Some(oldest) = to_keep.pop() {
to_delete.push(oldest);
}
}
for backup in to_delete {
let is_parent = to_keep
.iter()
.any(|b| b.parent_id.as_ref() == Some(&backup.id));
if !is_parent {
self.delete_backup(&backup.id).await?;
}
}
Ok(())
}
fn calculate_checksum(&self, backup_id: &str) -> Result<String> {
use sha2::{Digest, Sha256};
use std::fs::File;
use std::io::Read;
let path = self
.config
.snapshot_config
.snapshot_dir
.join(format!("{}.snap", backup_id));
let mut file = File::open(&path)
.map_err(|e| DakeraError::Storage(format!("Failed to open backup: {}", e)))?;
let mut hasher = Sha256::new();
let mut buffer = [0u8; 8192];
loop {
let bytes_read = file
.read(&mut buffer)
.map_err(|e| DakeraError::Storage(format!("Failed to read backup: {}", e)))?;
if bytes_read == 0 {
break;
}
hasher.update(&buffer[..bytes_read]);
}
let hash = hasher.finalize();
Ok(hash.iter().map(|b| format!("{:02x}", b)).collect())
}
async fn upload_to_remote(&self, remote: &ObjectStorage, backup_id: &str) -> Result<String> {
use std::fs;
let local_path = self
.config
.snapshot_config
.snapshot_dir
.join(format!("{}.snap", backup_id));
let data = fs::read(&local_path)
.map_err(|e| DakeraError::Storage(format!("Failed to read backup: {}", e)))?;
let remote_path = format!("backups/{}.snap", backup_id);
remote.ensure_namespace(&"backups".to_string()).await?;
tracing::info!(
backup_id = backup_id,
remote_path = remote_path,
size = data.len(),
"Backup uploaded to remote storage"
);
Ok(remote_path)
}
async fn download_from_remote(&self, _remote: &ObjectStorage, backup_id: &str) -> Result<()> {
let remote_path = format!("backups/{}.snap", backup_id);
tracing::warn!(
backup_id = backup_id,
remote_path = remote_path,
"Remote backup download not yet implemented"
);
Err(DakeraError::Storage(format!(
"Remote backup download not yet implemented for '{}'",
backup_id
)))
}
fn backup_metadata_path(&self, backup_id: &str) -> std::path::PathBuf {
self.config
.snapshot_config
.snapshot_dir
.join(format!("{}.bak", backup_id))
}
fn save_backup_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
use std::fs::File;
use std::io::BufWriter;
let path = self.backup_metadata_path(&metadata.snapshot.id);
let file = File::create(&path).map_err(|e| {
DakeraError::Storage(format!("Failed to create backup metadata: {}", e))
})?;
let writer = BufWriter::new(file);
serde_json::to_writer_pretty(writer, metadata)
.map_err(|e| DakeraError::Storage(format!("Backup metadata serialize error: {}", e)))?;
Ok(())
}
fn load_backup_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
use std::fs::File;
use std::io::BufReader;
let path = self.backup_metadata_path(backup_id);
let file = File::open(&path)
.map_err(|e| DakeraError::Storage(format!("Failed to open backup metadata: {}", e)))?;
let reader = BufReader::new(file);
serde_json::from_reader(reader)
.map_err(|e| DakeraError::Storage(format!("Backup metadata deserialize error: {}", e)))
}
}
#[derive(Debug, Clone)]
pub struct RestoreStats {
pub backup_id: String,
pub namespaces_restored: usize,
pub vectors_restored: u64,
pub duration_ms: u64,
}
pub struct BackupScheduler {
pub interval: Duration,
pub next_backup: SystemTime,
pub backup_type: BackupType,
pub tags: HashMap<String, String>,
}
impl BackupScheduler {
pub fn daily() -> Self {
Self {
interval: Duration::from_secs(24 * 60 * 60),
next_backup: SystemTime::now() + Duration::from_secs(24 * 60 * 60),
backup_type: BackupType::Scheduled,
tags: {
let mut tags = HashMap::new();
tags.insert("schedule".to_string(), "daily".to_string());
tags
},
}
}
pub fn hourly() -> Self {
Self {
interval: Duration::from_secs(60 * 60),
next_backup: SystemTime::now() + Duration::from_secs(60 * 60),
backup_type: BackupType::Scheduled,
tags: {
let mut tags = HashMap::new();
tags.insert("schedule".to_string(), "hourly".to_string());
tags
},
}
}
pub fn custom(interval: Duration) -> Self {
Self {
interval,
next_backup: SystemTime::now() + interval,
backup_type: BackupType::Scheduled,
tags: HashMap::new(),
}
}
pub fn is_backup_due(&self) -> bool {
SystemTime::now() >= self.next_backup
}
pub fn mark_completed(&mut self) {
self.next_backup = SystemTime::now() + self.interval;
}
pub fn time_until_next(&self) -> Duration {
self.next_backup
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemoryStorage;
use common::Vector;
use std::path::Path;
use tempfile::TempDir;
fn test_config(dir: &Path) -> BackupConfig {
BackupConfig {
snapshot_config: SnapshotConfig {
snapshot_dir: dir.to_path_buf(),
max_snapshots: 10,
compression_enabled: false,
include_metadata: true,
},
remote_config: None,
retention: RetentionPolicy::default(),
verify_backups: true,
compression: CompressionConfig::default(),
encryption: None,
}
}
fn create_test_vector(id: &str, dim: usize) -> Vector {
Vector {
id: id.to_string(),
values: vec![1.0; dim],
metadata: None,
ttl_seconds: None,
expires_at: None,
}
}
#[tokio::test]
async fn test_create_backup() {
let temp_dir = TempDir::new().unwrap();
let config = test_config(temp_dir.path());
let mut manager = BackupManager::new(config).unwrap();
let storage = InMemoryStorage::new();
storage.ensure_namespace(&"test".to_string()).await.unwrap();
storage
.upsert(
&"test".to_string(),
vec![create_test_vector("v1", 4), create_test_vector("v2", 4)],
)
.await
.unwrap();
let backup = manager
.create_backup(
&storage,
BackupType::Manual,
Some("Test backup".to_string()),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(backup.snapshot.total_vectors, 2);
assert_eq!(backup.backup_type, BackupType::Manual);
assert!(!backup.checksum.is_empty());
}
#[tokio::test]
async fn test_verify_backup() {
let temp_dir = TempDir::new().unwrap();
let config = test_config(temp_dir.path());
let mut manager = BackupManager::new(config).unwrap();
let storage = InMemoryStorage::new();
storage.ensure_namespace(&"test".to_string()).await.unwrap();
storage
.upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
.await
.unwrap();
let backup = manager
.create_backup(&storage, BackupType::Manual, None, HashMap::new())
.await
.unwrap();
let verification = manager.verify_backup(&backup.snapshot.id).unwrap();
assert!(verification.valid);
assert!(verification.checksum_valid);
assert!(verification.data_integrity);
}
#[tokio::test]
async fn test_restore_backup() {
let temp_dir = TempDir::new().unwrap();
let config = test_config(temp_dir.path());
let mut manager = BackupManager::new(config).unwrap();
let storage = InMemoryStorage::new();
storage.ensure_namespace(&"test".to_string()).await.unwrap();
storage
.upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
.await
.unwrap();
let backup = manager
.create_backup(&storage, BackupType::Manual, None, HashMap::new())
.await
.unwrap();
storage
.delete(&"test".to_string(), &["v1".to_string()])
.await
.unwrap();
assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 0);
let stats = manager
.restore_backup(&storage, &backup.snapshot.id)
.await
.unwrap();
assert_eq!(stats.vectors_restored, 1);
assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 1);
}
#[tokio::test]
async fn test_backup_stats() {
let temp_dir = TempDir::new().unwrap();
let config = test_config(temp_dir.path());
let mut manager = BackupManager::new(config).unwrap();
let storage = InMemoryStorage::new();
storage.ensure_namespace(&"test".to_string()).await.unwrap();
storage
.upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
.await
.unwrap();
for _ in 0..3 {
manager
.create_backup(&storage, BackupType::Manual, None, HashMap::new())
.await
.unwrap();
}
let stats = manager.get_stats();
assert_eq!(stats.total_backups, 3);
assert!(stats.last_backup_at.is_some());
}
#[tokio::test]
async fn test_verify_backup_detects_corruption() {
let temp_dir = TempDir::new().unwrap();
let config = test_config(temp_dir.path());
let mut manager = BackupManager::new(config).unwrap();
let storage = InMemoryStorage::new();
storage.ensure_namespace(&"test".to_string()).await.unwrap();
storage
.upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
.await
.unwrap();
let backup = manager
.create_backup(&storage, BackupType::Manual, None, HashMap::new())
.await
.unwrap();
let snap_path = temp_dir.path().join(format!("{}.snap", backup.snapshot.id));
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&snap_path)
.unwrap();
file.write_all(b"CORRUPTED_DATA").unwrap();
drop(file);
let verification = manager.verify_backup(&backup.snapshot.id).unwrap();
assert!(
!verification.checksum_valid,
"corrupted backup should fail checksum"
);
assert!(!verification.valid, "corrupted backup should not be valid");
}
#[test]
fn test_backup_scheduler() {
let mut scheduler = BackupScheduler::hourly();
assert!(!scheduler.is_backup_due());
scheduler.next_backup = SystemTime::now() - Duration::from_secs(1);
assert!(scheduler.is_backup_due());
scheduler.mark_completed();
assert!(!scheduler.is_backup_due());
}
}