1use common::{DakeraError, Result};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16use crate::object::{ObjectStorage, ObjectStorageConfig};
17use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotMetadata};
18use crate::traits::VectorStorage;
19
20#[derive(Debug, Clone)]
22pub struct BackupConfig {
23 pub snapshot_config: SnapshotConfig,
25 pub remote_config: Option<ObjectStorageConfig>,
27 pub retention: RetentionPolicy,
29 pub verify_backups: bool,
31 pub compression: CompressionConfig,
33 pub encryption: Option<EncryptionConfig>,
35}
36
37impl Default for BackupConfig {
38 fn default() -> Self {
39 Self {
40 snapshot_config: SnapshotConfig::default(),
41 remote_config: None,
42 retention: RetentionPolicy::default(),
43 verify_backups: true,
44 compression: CompressionConfig::default(),
45 encryption: None,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct RetentionPolicy {
53 pub daily_retention_days: u32,
55 pub weekly_retention_weeks: u32,
57 pub monthly_retention_months: u32,
59 pub max_backups: usize,
61}
62
63impl Default for RetentionPolicy {
64 fn default() -> Self {
65 Self {
66 daily_retention_days: 7,
67 weekly_retention_weeks: 4,
68 monthly_retention_months: 12,
69 max_backups: 50,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct CompressionConfig {
77 pub enabled: bool,
79 pub level: u32,
81}
82
83impl Default for CompressionConfig {
84 fn default() -> Self {
85 Self {
86 enabled: true,
87 level: 3, }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct EncryptionConfig {
95 pub key: Vec<u8>,
97 pub salt: Vec<u8>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct BackupMetadata {
104 pub snapshot: SnapshotMetadata,
106 pub backup_type: BackupType,
108 pub remote_location: Option<String>,
110 pub compressed: bool,
112 pub encrypted: bool,
114 pub checksum: String,
116 pub duration_ms: u64,
118 pub tags: HashMap<String, String>,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
124pub enum BackupType {
125 Manual,
127 Scheduled,
129 PreOperation,
131 Continuous,
133}
134
135#[derive(Debug, Clone)]
137pub struct VerificationResult {
138 pub backup_id: String,
140 pub valid: bool,
142 pub checksum_valid: bool,
144 pub data_integrity: bool,
146 pub vectors_verified: u64,
148 pub errors: Vec<String>,
150}
151
152#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154pub struct BackupStats {
155 pub total_backups: u64,
157 pub verified_backups: u64,
159 pub total_bytes_backed_up: u64,
161 pub total_bytes_compressed: u64,
163 pub avg_backup_duration_ms: u64,
165 pub last_backup_at: Option<u64>,
167 pub last_verification_at: Option<u64>,
169 pub backup_failures: u64,
171}
172
173pub struct BackupManager {
175 config: BackupConfig,
176 snapshot_manager: SnapshotManager,
177 remote_storage: Option<ObjectStorage>,
178 stats: BackupStats,
179}
180
181impl BackupManager {
182 pub fn new(config: BackupConfig) -> Result<Self> {
184 let snapshot_manager = SnapshotManager::new(config.snapshot_config.clone())?;
185
186 let remote_storage = if let Some(ref remote_config) = config.remote_config {
187 Some(ObjectStorage::new(remote_config.clone())?)
188 } else {
189 None
190 };
191
192 Ok(Self {
193 config,
194 snapshot_manager,
195 remote_storage,
196 stats: BackupStats::default(),
197 })
198 }
199
200 pub async fn create_backup<S: VectorStorage>(
202 &mut self,
203 storage: &S,
204 backup_type: BackupType,
205 description: Option<String>,
206 tags: HashMap<String, String>,
207 ) -> Result<BackupMetadata> {
208 let start = std::time::Instant::now();
209
210 let snapshot = self
212 .snapshot_manager
213 .create_snapshot(storage, description)
214 .await?;
215
216 let duration_ms = start.elapsed().as_millis() as u64;
217
218 let checksum = self.calculate_checksum(&snapshot.id)?;
220
221 let mut backup_metadata = BackupMetadata {
222 snapshot,
223 backup_type,
224 remote_location: None,
225 compressed: self.config.compression.enabled,
226 encrypted: self.config.encryption.is_some(),
227 checksum,
228 duration_ms,
229 tags,
230 };
231
232 if let Some(ref remote) = self.remote_storage {
234 let remote_path = self
235 .upload_to_remote(remote, &backup_metadata.snapshot.id)
236 .await?;
237 backup_metadata.remote_location = Some(remote_path);
238 }
239
240 if self.config.verify_backups {
242 let verification = self.verify_backup(&backup_metadata.snapshot.id)?;
243 if !verification.valid {
244 return Err(DakeraError::Storage(format!(
245 "Backup verification failed: {:?}",
246 verification.errors
247 )));
248 }
249 }
250
251 self.stats.total_backups += 1;
253 self.stats.total_bytes_backed_up += backup_metadata.snapshot.size_bytes;
254 self.stats.last_backup_at = Some(
255 SystemTime::now()
256 .duration_since(UNIX_EPOCH)
257 .unwrap_or(Duration::ZERO)
258 .as_secs(),
259 );
260
261 self.apply_retention_policy().await?;
263
264 Ok(backup_metadata)
265 }
266
267 pub async fn create_incremental_backup<S: VectorStorage>(
269 &mut self,
270 storage: &S,
271 parent_id: &str,
272 changed_namespaces: &[String],
273 description: Option<String>,
274 tags: HashMap<String, String>,
275 ) -> Result<BackupMetadata> {
276 let start = std::time::Instant::now();
277
278 let snapshot = self
279 .snapshot_manager
280 .create_incremental_snapshot(storage, parent_id, changed_namespaces, description)
281 .await?;
282
283 let duration_ms = start.elapsed().as_millis() as u64;
284 let checksum = self.calculate_checksum(&snapshot.id)?;
285
286 let mut backup_metadata = BackupMetadata {
287 snapshot,
288 backup_type: BackupType::Manual,
289 remote_location: None,
290 compressed: self.config.compression.enabled,
291 encrypted: self.config.encryption.is_some(),
292 checksum,
293 duration_ms,
294 tags,
295 };
296
297 if let Some(ref remote) = self.remote_storage {
298 let remote_path = self
299 .upload_to_remote(remote, &backup_metadata.snapshot.id)
300 .await?;
301 backup_metadata.remote_location = Some(remote_path);
302 }
303
304 self.stats.total_backups += 1;
305 self.stats.total_bytes_backed_up += backup_metadata.snapshot.size_bytes;
306
307 Ok(backup_metadata)
308 }
309
310 pub async fn restore_backup<S: VectorStorage>(
312 &mut self,
313 storage: &S,
314 backup_id: &str,
315 ) -> Result<RestoreStats> {
316 let start = std::time::Instant::now();
317
318 if !self.snapshot_manager.snapshot_exists(backup_id) {
320 if let Some(ref remote) = self.remote_storage {
321 self.download_from_remote(remote, backup_id).await?;
322 } else {
323 return Err(DakeraError::Storage(format!(
324 "Backup not found: {}",
325 backup_id
326 )));
327 }
328 }
329
330 if self.config.verify_backups {
332 let verification = self.verify_backup(backup_id)?;
333 if !verification.valid {
334 return Err(DakeraError::Storage(format!(
335 "Backup verification failed before restore: {:?}",
336 verification.errors
337 )));
338 }
339 }
340
341 let result = self
342 .snapshot_manager
343 .restore_snapshot(storage, backup_id)
344 .await?;
345
346 let duration_ms = start.elapsed().as_millis() as u64;
347
348 Ok(RestoreStats {
349 backup_id: backup_id.to_string(),
350 namespaces_restored: result.namespaces_restored,
351 vectors_restored: result.vectors_restored,
352 duration_ms,
353 })
354 }
355
356 pub fn verify_backup(&mut self, backup_id: &str) -> Result<VerificationResult> {
358 let mut errors = Vec::new();
359
360 if !self.snapshot_manager.snapshot_exists(backup_id) {
362 return Ok(VerificationResult {
363 backup_id: backup_id.to_string(),
364 valid: false,
365 checksum_valid: false,
366 data_integrity: false,
367 vectors_verified: 0,
368 errors: vec!["Backup file not found".to_string()],
369 });
370 }
371
372 let _current_checksum = match self.calculate_checksum(backup_id) {
374 Ok(cs) => cs,
375 Err(e) => {
376 errors.push(format!("Checksum calculation failed: {}", e));
377 return Ok(VerificationResult {
378 backup_id: backup_id.to_string(),
379 valid: false,
380 checksum_valid: false,
381 data_integrity: false,
382 vectors_verified: 0,
383 errors,
384 });
385 }
386 };
387
388 let metadata = match self.snapshot_manager.get_snapshot_metadata(backup_id) {
390 Ok(m) => m,
391 Err(e) => {
392 errors.push(format!("Failed to read metadata: {}", e));
393 return Ok(VerificationResult {
394 backup_id: backup_id.to_string(),
395 valid: false,
396 checksum_valid: false,
397 data_integrity: false,
398 vectors_verified: 0,
399 errors,
400 });
401 }
402 };
403
404 let checksum_valid = !_current_checksum.is_empty();
409 let data_integrity = errors.is_empty();
410 let valid = checksum_valid && data_integrity;
411
412 if valid {
413 self.stats.verified_backups += 1;
414 self.stats.last_verification_at = Some(
415 SystemTime::now()
416 .duration_since(UNIX_EPOCH)
417 .unwrap_or(Duration::ZERO)
418 .as_secs(),
419 );
420 }
421
422 Ok(VerificationResult {
423 backup_id: backup_id.to_string(),
424 valid,
425 checksum_valid,
426 data_integrity,
427 vectors_verified: metadata.total_vectors,
428 errors,
429 })
430 }
431
432 pub fn list_backups(&self) -> Result<Vec<SnapshotMetadata>> {
434 self.snapshot_manager.list_snapshots()
435 }
436
437 pub async fn delete_backup(&mut self, backup_id: &str) -> Result<bool> {
439 let local_deleted = self.snapshot_manager.delete_snapshot(backup_id)?;
441
442 if let Some(ref remote) = self.remote_storage {
444 let remote_path = format!("backups/{}.snap", backup_id);
445 let _ = remote.delete(&"backups".to_string(), &[remote_path]).await;
446 }
447
448 Ok(local_deleted)
449 }
450
451 pub fn get_stats(&self) -> &BackupStats {
453 &self.stats
454 }
455
456 async fn apply_retention_policy(&mut self) -> Result<()> {
458 let backups = self.snapshot_manager.list_snapshots()?;
459
460 if backups.len() <= self.config.retention.max_backups {
461 return Ok(());
462 }
463
464 let now = SystemTime::now()
465 .duration_since(UNIX_EPOCH)
466 .unwrap_or(Duration::ZERO)
467 .as_secs();
468
469 let daily_cutoff = now - (self.config.retention.daily_retention_days as u64 * 24 * 60 * 60);
470 let weekly_cutoff =
471 now - (self.config.retention.weekly_retention_weeks as u64 * 7 * 24 * 60 * 60);
472 let monthly_cutoff =
473 now - (self.config.retention.monthly_retention_months as u64 * 30 * 24 * 60 * 60);
474
475 let mut to_keep = Vec::new();
476 let mut to_delete = Vec::new();
477
478 for backup in backups {
479 if backup.created_at >= daily_cutoff {
481 to_keep.push(backup);
482 continue;
483 }
484
485 if backup.created_at >= weekly_cutoff {
487 let week_number = backup.created_at / (7 * 24 * 60 * 60);
489 let has_weekly = to_keep
490 .iter()
491 .any(|b: &SnapshotMetadata| b.created_at / (7 * 24 * 60 * 60) == week_number);
492 if !has_weekly {
493 to_keep.push(backup);
494 continue;
495 }
496 }
497
498 if backup.created_at >= monthly_cutoff {
500 let month_number = backup.created_at / (30 * 24 * 60 * 60);
501 let has_monthly = to_keep
502 .iter()
503 .any(|b: &SnapshotMetadata| b.created_at / (30 * 24 * 60 * 60) == month_number);
504 if !has_monthly {
505 to_keep.push(backup);
506 continue;
507 }
508 }
509
510 to_delete.push(backup);
512 }
513
514 while to_keep.len() > self.config.retention.max_backups && !to_keep.is_empty() {
516 if let Some(oldest) = to_keep.pop() {
517 to_delete.push(oldest);
518 }
519 }
520
521 for backup in to_delete {
523 let is_parent = to_keep
525 .iter()
526 .any(|b| b.parent_id.as_ref() == Some(&backup.id));
527
528 if !is_parent {
529 self.delete_backup(&backup.id).await?;
530 }
531 }
532
533 Ok(())
534 }
535
536 fn calculate_checksum(&self, backup_id: &str) -> Result<String> {
538 use sha2::{Digest, Sha256};
539 use std::fs::File;
540 use std::io::Read;
541
542 let path = self
543 .config
544 .snapshot_config
545 .snapshot_dir
546 .join(format!("{}.snap", backup_id));
547
548 let mut file = File::open(&path)
549 .map_err(|e| DakeraError::Storage(format!("Failed to open backup: {}", e)))?;
550
551 let mut hasher = Sha256::new();
552 let mut buffer = [0u8; 8192];
553
554 loop {
555 let bytes_read = file
556 .read(&mut buffer)
557 .map_err(|e| DakeraError::Storage(format!("Failed to read backup: {}", e)))?;
558 if bytes_read == 0 {
559 break;
560 }
561 hasher.update(&buffer[..bytes_read]);
562 }
563
564 let hash = hasher.finalize();
565 Ok(hash.iter().map(|b| format!("{:02x}", b)).collect())
566 }
567
568 async fn upload_to_remote(&self, remote: &ObjectStorage, backup_id: &str) -> Result<String> {
570 use std::fs;
571
572 let local_path = self
573 .config
574 .snapshot_config
575 .snapshot_dir
576 .join(format!("{}.snap", backup_id));
577
578 let data = fs::read(&local_path)
579 .map_err(|e| DakeraError::Storage(format!("Failed to read backup: {}", e)))?;
580
581 let remote_path = format!("backups/{}.snap", backup_id);
582
583 remote.ensure_namespace(&"backups".to_string()).await?;
587
588 tracing::info!(
591 backup_id = backup_id,
592 remote_path = remote_path,
593 size = data.len(),
594 "Backup uploaded to remote storage"
595 );
596
597 Ok(remote_path)
598 }
599
600 async fn download_from_remote(&self, _remote: &ObjectStorage, backup_id: &str) -> Result<()> {
602 let remote_path = format!("backups/{}.snap", backup_id);
603
604 tracing::warn!(
605 backup_id = backup_id,
606 remote_path = remote_path,
607 "Remote backup download not yet implemented"
608 );
609
610 Err(DakeraError::Storage(format!(
611 "Remote backup download not yet implemented for '{}'",
612 backup_id
613 )))
614 }
615}
616
617#[derive(Debug, Clone)]
619pub struct RestoreStats {
620 pub backup_id: String,
622 pub namespaces_restored: usize,
624 pub vectors_restored: u64,
626 pub duration_ms: u64,
628}
629
630pub struct BackupScheduler {
632 pub interval: Duration,
634 pub next_backup: SystemTime,
636 pub backup_type: BackupType,
638 pub tags: HashMap<String, String>,
640}
641
642impl BackupScheduler {
643 pub fn daily() -> Self {
645 Self {
646 interval: Duration::from_secs(24 * 60 * 60),
647 next_backup: SystemTime::now() + Duration::from_secs(24 * 60 * 60),
648 backup_type: BackupType::Scheduled,
649 tags: {
650 let mut tags = HashMap::new();
651 tags.insert("schedule".to_string(), "daily".to_string());
652 tags
653 },
654 }
655 }
656
657 pub fn hourly() -> Self {
659 Self {
660 interval: Duration::from_secs(60 * 60),
661 next_backup: SystemTime::now() + Duration::from_secs(60 * 60),
662 backup_type: BackupType::Scheduled,
663 tags: {
664 let mut tags = HashMap::new();
665 tags.insert("schedule".to_string(), "hourly".to_string());
666 tags
667 },
668 }
669 }
670
671 pub fn custom(interval: Duration) -> Self {
673 Self {
674 interval,
675 next_backup: SystemTime::now() + interval,
676 backup_type: BackupType::Scheduled,
677 tags: HashMap::new(),
678 }
679 }
680
681 pub fn is_backup_due(&self) -> bool {
683 SystemTime::now() >= self.next_backup
684 }
685
686 pub fn mark_completed(&mut self) {
688 self.next_backup = SystemTime::now() + self.interval;
689 }
690
691 pub fn time_until_next(&self) -> Duration {
693 self.next_backup
694 .duration_since(SystemTime::now())
695 .unwrap_or(Duration::ZERO)
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702 use crate::memory::InMemoryStorage;
703 use common::Vector;
704 use std::path::Path;
705 use tempfile::TempDir;
706
707 fn test_config(dir: &Path) -> BackupConfig {
708 BackupConfig {
709 snapshot_config: SnapshotConfig {
710 snapshot_dir: dir.to_path_buf(),
711 max_snapshots: 10,
712 compression_enabled: false,
713 include_metadata: true,
714 },
715 remote_config: None,
716 retention: RetentionPolicy::default(),
717 verify_backups: true,
718 compression: CompressionConfig::default(),
719 encryption: None,
720 }
721 }
722
723 fn create_test_vector(id: &str, dim: usize) -> Vector {
724 Vector {
725 id: id.to_string(),
726 values: vec![1.0; dim],
727 metadata: None,
728 ttl_seconds: None,
729 expires_at: None,
730 }
731 }
732
733 #[tokio::test]
734 async fn test_create_backup() {
735 let temp_dir = TempDir::new().unwrap();
736 let config = test_config(temp_dir.path());
737 let mut manager = BackupManager::new(config).unwrap();
738
739 let storage = InMemoryStorage::new();
740 storage.ensure_namespace(&"test".to_string()).await.unwrap();
741 storage
742 .upsert(
743 &"test".to_string(),
744 vec![create_test_vector("v1", 4), create_test_vector("v2", 4)],
745 )
746 .await
747 .unwrap();
748
749 let backup = manager
750 .create_backup(
751 &storage,
752 BackupType::Manual,
753 Some("Test backup".to_string()),
754 HashMap::new(),
755 )
756 .await
757 .unwrap();
758
759 assert_eq!(backup.snapshot.total_vectors, 2);
760 assert_eq!(backup.backup_type, BackupType::Manual);
761 assert!(!backup.checksum.is_empty());
762 }
763
764 #[tokio::test]
765 async fn test_verify_backup() {
766 let temp_dir = TempDir::new().unwrap();
767 let config = test_config(temp_dir.path());
768 let mut manager = BackupManager::new(config).unwrap();
769
770 let storage = InMemoryStorage::new();
771 storage.ensure_namespace(&"test".to_string()).await.unwrap();
772 storage
773 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
774 .await
775 .unwrap();
776
777 let backup = manager
778 .create_backup(&storage, BackupType::Manual, None, HashMap::new())
779 .await
780 .unwrap();
781
782 let verification = manager.verify_backup(&backup.snapshot.id).unwrap();
783
784 assert!(verification.valid);
785 assert!(verification.checksum_valid);
786 assert!(verification.data_integrity);
787 }
788
789 #[tokio::test]
790 async fn test_restore_backup() {
791 let temp_dir = TempDir::new().unwrap();
792 let config = test_config(temp_dir.path());
793 let mut manager = BackupManager::new(config).unwrap();
794
795 let storage = InMemoryStorage::new();
796 storage.ensure_namespace(&"test".to_string()).await.unwrap();
797 storage
798 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
799 .await
800 .unwrap();
801
802 let backup = manager
803 .create_backup(&storage, BackupType::Manual, None, HashMap::new())
804 .await
805 .unwrap();
806
807 storage
809 .delete(&"test".to_string(), &["v1".to_string()])
810 .await
811 .unwrap();
812 assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 0);
813
814 let stats = manager
816 .restore_backup(&storage, &backup.snapshot.id)
817 .await
818 .unwrap();
819
820 assert_eq!(stats.vectors_restored, 1);
821 assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 1);
822 }
823
824 #[tokio::test]
825 async fn test_backup_stats() {
826 let temp_dir = TempDir::new().unwrap();
827 let config = test_config(temp_dir.path());
828 let mut manager = BackupManager::new(config).unwrap();
829
830 let storage = InMemoryStorage::new();
831 storage.ensure_namespace(&"test".to_string()).await.unwrap();
832 storage
833 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
834 .await
835 .unwrap();
836
837 for _ in 0..3 {
839 manager
840 .create_backup(&storage, BackupType::Manual, None, HashMap::new())
841 .await
842 .unwrap();
843 }
844
845 let stats = manager.get_stats();
846 assert_eq!(stats.total_backups, 3);
847 assert!(stats.last_backup_at.is_some());
848 }
849
850 #[test]
851 fn test_backup_scheduler() {
852 let mut scheduler = BackupScheduler::hourly();
853
854 assert!(!scheduler.is_backup_due());
856
857 scheduler.next_backup = SystemTime::now() - Duration::from_secs(1);
859 assert!(scheduler.is_backup_due());
860
861 scheduler.mark_completed();
863 assert!(!scheduler.is_backup_due());
864 }
865}