1use anyhow::{anyhow, Result};
11use chrono::{DateTime, Utc};
12use rocksdb::{
13 backup::{BackupEngine, BackupEngineOptions},
14 checkpoint::Checkpoint,
15 Env, DB,
16};
17use serde::{Deserialize, Serialize};
18use sha2::{Digest, Sha256};
19use std::fs;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct BackupMetadata {
26 pub backup_id: u32,
28 pub created_at: DateTime<Utc>,
30 pub user_id: String,
32 pub backup_type: BackupType,
34 pub size_bytes: u64,
36 pub checksum: String,
38 pub memory_count: usize,
40 pub sequence_number: u64,
42 #[serde(default)]
44 pub secondary_stores: Vec<String>,
45 #[serde(default)]
47 pub secondary_size_bytes: u64,
48}
49
50pub struct SecondaryStoreRef<'a> {
52 pub name: &'a str,
53 pub db: &'a Arc<DB>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57pub enum BackupType {
58 Full,
59 Incremental,
60}
61
62pub struct ShodhBackupEngine {
64 backup_path: PathBuf,
65}
66
67impl ShodhBackupEngine {
68 pub fn new(backup_path: PathBuf) -> Result<Self> {
73 fs::create_dir_all(&backup_path)?;
74 Ok(Self { backup_path })
75 }
76
77 pub fn backup_path(&self) -> &Path {
79 &self.backup_path
80 }
81
82 pub fn create_backup(&self, db: &DB, user_id: &str) -> Result<BackupMetadata> {
91 let backup_dir = self.backup_path.join(user_id);
92 fs::create_dir_all(&backup_dir)?;
93
94 let backup_opts = BackupEngineOptions::new(&backup_dir)?;
96 let env = Env::new()?;
97 let mut backup_engine = BackupEngine::open(&backup_opts, &env)?;
98
99 let before_count = backup_engine.get_backup_info().len();
101 backup_engine.create_new_backup(db)?;
102
103 let backup_info = backup_engine.get_backup_info();
104 let latest_backup = backup_info
105 .last()
106 .ok_or_else(|| anyhow!("No backup created"))?;
107
108 let backup_id = latest_backup.backup_id;
109 let size_bytes = latest_backup.size;
110
111 let sequence_number = db.latest_sequence_number();
113
114 let memory_count = self.estimate_memory_count(db)?;
116
117 let checksum = self.calculate_backup_checksum(&backup_dir, backup_id)?;
119
120 let backup_type = if before_count == 0 {
122 BackupType::Full
123 } else {
124 BackupType::Incremental
125 };
126
127 let metadata = BackupMetadata {
128 backup_id,
129 created_at: Utc::now(),
130 user_id: user_id.to_string(),
131 backup_type,
132 size_bytes,
133 checksum,
134 memory_count,
135 sequence_number,
136 secondary_stores: Vec::new(),
137 secondary_size_bytes: 0,
138 };
139
140 self.save_metadata(&metadata)?;
142
143 tracing::info!(
144 backup_id = backup_id,
145 user_id = user_id,
146 size_mb = size_bytes / 1024 / 1024,
147 "Backup created successfully"
148 );
149
150 Ok(metadata)
151 }
152
153 pub fn create_comprehensive_backup(
159 &self,
160 db: &DB,
161 user_id: &str,
162 secondary_stores: &[SecondaryStoreRef<'_>],
163 ) -> Result<BackupMetadata> {
164 self.create_comprehensive_backup_with_graph(db, user_id, secondary_stores, None)
165 }
166
167 pub fn create_comprehensive_backup_with_graph(
169 &self,
170 db: &DB,
171 user_id: &str,
172 secondary_stores: &[SecondaryStoreRef<'_>],
173 graph_db: Option<&DB>,
174 ) -> Result<BackupMetadata> {
175 let mut metadata = self.create_backup(db, user_id)?;
177
178 let secondary_dir = self
180 .backup_path
181 .join(user_id)
182 .join(format!("secondary_{}", metadata.backup_id));
183 fs::create_dir_all(&secondary_dir)?;
184
185 if let Some(graph) = graph_db {
187 let graph_checkpoint_dir = secondary_dir.join("graph");
188 let checkpoint = Checkpoint::new(graph)
189 .map_err(|e| anyhow!("Failed to create checkpoint handle for graph DB: {}", e))?;
190 checkpoint
191 .create_checkpoint(&graph_checkpoint_dir)
192 .map_err(|e| {
193 let _ = fs::remove_dir_all(&graph_checkpoint_dir);
194 anyhow!("Failed to checkpoint graph DB: {}", e)
195 })?;
196 let graph_size = dir_size(&graph_checkpoint_dir).unwrap_or(0);
197 tracing::debug!(size_kb = graph_size / 1024, "Graph DB checkpointed");
198 }
199
200 let mut backed_up_stores = Vec::new();
201 let mut total_secondary_bytes: u64 = 0;
202
203 for store_ref in secondary_stores {
204 let store_checkpoint_dir = secondary_dir.join(store_ref.name);
205
206 if store_checkpoint_dir.exists() {
208 tracing::warn!(
209 store = store_ref.name,
210 "Checkpoint directory already exists, skipping"
211 );
212 continue;
213 }
214
215 let checkpoint = Checkpoint::new(store_ref.db).map_err(|e| {
216 anyhow!(
217 "Failed to create checkpoint handle for secondary store '{}': {}",
218 store_ref.name,
219 e
220 )
221 })?;
222
223 if let Err(e) = checkpoint.create_checkpoint(&store_checkpoint_dir) {
224 let _ = fs::remove_dir_all(&store_checkpoint_dir);
226 return Err(anyhow!(
227 "Failed to checkpoint secondary store '{}': {}",
228 store_ref.name,
229 e
230 ));
231 }
232
233 let store_size = dir_size(&store_checkpoint_dir).unwrap_or(0);
234 total_secondary_bytes += store_size;
235 backed_up_stores.push(store_ref.name.to_string());
236
237 tracing::debug!(
238 store = store_ref.name,
239 size_kb = store_size / 1024,
240 "Secondary store checkpointed"
241 );
242 }
243
244 if graph_db.is_some() {
246 backed_up_stores.push("graph".to_string());
247 }
248
249 metadata.secondary_stores = backed_up_stores;
251 metadata.secondary_size_bytes = total_secondary_bytes;
252
253 let backup_dir = self.backup_path.join(user_id);
256 metadata.checksum = self.calculate_backup_checksum(&backup_dir, metadata.backup_id)?;
257 self.save_metadata(&metadata)?;
258
259 tracing::info!(
260 backup_id = metadata.backup_id,
261 user_id = user_id,
262 secondary_stores = metadata.secondary_stores.len(),
263 secondary_size_kb = total_secondary_bytes / 1024,
264 "Comprehensive backup created"
265 );
266
267 Ok(metadata)
268 }
269
270 pub fn restore_backup(
277 &self,
278 user_id: &str,
279 backup_id: Option<u32>,
280 restore_path: &Path,
281 ) -> Result<()> {
282 let backup_dir = self.backup_path.join(user_id);
283
284 if !backup_dir.exists() {
285 return Err(anyhow!("No backups found for user: {user_id}"));
286 }
287
288 let backup_opts = BackupEngineOptions::new(&backup_dir)?;
289 let env = Env::new()?;
290 let mut backup_engine = BackupEngine::open(&backup_opts, &env)?;
291
292 match backup_id {
294 Some(id) => {
295 tracing::info!(backup_id = id, "Restoring from specific backup");
296 backup_engine.restore_from_backup(
297 restore_path,
298 restore_path,
299 &rocksdb::backup::RestoreOptions::default(),
300 id,
301 )?;
302 }
303 None => {
304 tracing::info!("Restoring from latest backup");
305 backup_engine.restore_from_latest_backup(
306 restore_path,
307 restore_path,
308 &rocksdb::backup::RestoreOptions::default(),
309 )?;
310 }
311 }
312
313 tracing::info!(
314 user_id = user_id,
315 restore_path = ?restore_path,
316 "Restore completed successfully"
317 );
318
319 Ok(())
320 }
321
322 pub fn list_backups(&self, user_id: &str) -> Result<Vec<BackupMetadata>> {
324 let backup_dir = self.backup_path.join(user_id);
325
326 if !backup_dir.exists() {
327 return Ok(Vec::new());
328 }
329
330 let backup_opts = BackupEngineOptions::new(&backup_dir)?;
331 let env = Env::new()?;
332 let backup_engine = BackupEngine::open(&backup_opts, &env)?;
333
334 let backup_info = backup_engine.get_backup_info();
335 let mut metadata_list = Vec::new();
336
337 for info in backup_info {
338 if let Ok(metadata) = self.load_metadata(user_id, info.backup_id) {
339 metadata_list.push(metadata);
340 }
341 }
342
343 Ok(metadata_list)
344 }
345
346 pub fn restore_comprehensive_backup(
351 &self,
352 user_id: &str,
353 backup_id: Option<u32>,
354 restore_path: &Path,
355 secondary_restore_paths: &[(&str, &Path)],
356 ) -> Result<Vec<String>> {
357 self.restore_backup(user_id, backup_id, restore_path)?;
359
360 let resolved_backup_id = match backup_id {
362 Some(id) => id,
363 None => {
364 let backup_dir = self.backup_path.join(user_id);
365 let backup_opts = BackupEngineOptions::new(&backup_dir)?;
366 let env = Env::new()?;
367 let backup_engine = BackupEngine::open(&backup_opts, &env)?;
368 let info = backup_engine.get_backup_info();
369 info.last()
370 .map(|i| i.backup_id)
371 .ok_or_else(|| anyhow!("No backups available"))?
372 }
373 };
374
375 let secondary_dir = self
377 .backup_path
378 .join(user_id)
379 .join(format!("secondary_{resolved_backup_id}"));
380
381 let mut restored_stores = Vec::new();
382
383 if secondary_dir.exists() {
384 for (store_name, target_path) in secondary_restore_paths {
385 let checkpoint_dir = secondary_dir.join(store_name);
386 if !checkpoint_dir.exists() {
387 tracing::debug!(
388 store = *store_name,
389 "No checkpoint found in backup, skipping"
390 );
391 continue;
392 }
393
394 let mut tmp_os = target_path.as_os_str().to_os_string();
397 tmp_os.push(".restore_tmp");
398 let temp_path = PathBuf::from(tmp_os);
399 if temp_path.exists() {
400 fs::remove_dir_all(&temp_path).map_err(|e| {
401 anyhow!(
402 "Failed to clean up stale temp dir for {}: {}",
403 store_name,
404 e
405 )
406 })?;
407 }
408
409 if let Err(e) = copy_dir_recursive(&checkpoint_dir, &temp_path) {
410 let _ = fs::remove_dir_all(&temp_path);
412 tracing::warn!(
413 store = *store_name,
414 error = %e,
415 "Failed to copy checkpoint for restore, skipping (original data preserved)"
416 );
417 continue;
418 }
419
420 if target_path.exists() {
422 if let Err(e) = fs::remove_dir_all(target_path) {
423 let _ = fs::remove_dir_all(&temp_path);
425 return Err(anyhow!(
426 "Failed to remove existing {} directory at {:?}: {}",
427 store_name,
428 target_path,
429 e
430 ));
431 }
432 }
433
434 if let Err(e) = fs::rename(&temp_path, target_path) {
435 if let Err(copy_err) = copy_dir_recursive(&temp_path, target_path) {
437 let _ = fs::remove_dir_all(&temp_path);
438 return Err(anyhow!(
439 "Failed to finalize restore for {}: rename={}, copy={}",
440 store_name,
441 e,
442 copy_err
443 ));
444 }
445 let _ = fs::remove_dir_all(&temp_path);
446 }
447
448 restored_stores.push(store_name.to_string());
449 tracing::info!(
450 store = *store_name,
451 target = ?target_path,
452 "Secondary store restored from checkpoint"
453 );
454 }
455 }
456
457 tracing::info!(
458 user_id = user_id,
459 backup_id = resolved_backup_id,
460 restored_secondary = restored_stores.len(),
461 "Comprehensive restore completed"
462 );
463
464 Ok(restored_stores)
465 }
466
467 pub fn purge_old_backups(&self, user_id: &str, keep_count: usize) -> Result<usize> {
470 if keep_count == 0 {
471 return Err(anyhow!(
472 "keep_count must be >= 1 to prevent deleting all backups"
473 ));
474 }
475
476 let backup_dir = self.backup_path.join(user_id);
477
478 if !backup_dir.exists() {
479 return Ok(0);
480 }
481
482 let backup_opts = BackupEngineOptions::new(&backup_dir)?;
483 let env = Env::new()?;
484 let mut backup_engine = BackupEngine::open(&backup_opts, &env)?;
485
486 let backup_info = backup_engine.get_backup_info();
487 let total_backups = backup_info.len();
488
489 if total_backups <= keep_count {
490 return Ok(0);
491 }
492
493 let to_delete = total_backups - keep_count;
494
495 let mut purge_ids: Vec<u32> = backup_info.iter().map(|b| b.backup_id).collect();
497 purge_ids.sort();
498 let purge_ids: Vec<u32> = purge_ids.into_iter().take(to_delete).collect();
499
500 backup_engine.purge_old_backups(keep_count)?;
502
503 for purged_id in &purge_ids {
505 let secondary_dir = backup_dir.join(format!("secondary_{purged_id}"));
506 if secondary_dir.exists() {
507 if let Err(e) = fs::remove_dir_all(&secondary_dir) {
508 tracing::warn!(
509 backup_id = purged_id,
510 error = %e,
511 "Failed to clean up secondary store checkpoint"
512 );
513 }
514 }
515 let metadata_path = backup_dir.join(format!("backup_{purged_id}.json"));
517 if let Err(e) = fs::remove_file(&metadata_path) {
518 tracing::warn!(
519 backup_id = purged_id,
520 error = %e,
521 "Failed to remove backup metadata file"
522 );
523 }
524 }
525
526 tracing::info!(
527 purged_count = to_delete,
528 kept_count = keep_count,
529 user_id = user_id,
530 "Purged old backups"
531 );
532
533 Ok(to_delete)
534 }
535
536 pub fn verify_backup(&self, user_id: &str, backup_id: u32) -> Result<bool> {
538 let metadata = self.load_metadata(user_id, backup_id)?;
539 let backup_dir = self.backup_path.join(user_id);
540
541 let current_checksum = self.calculate_backup_checksum(&backup_dir, backup_id)?;
542
543 Ok(current_checksum == metadata.checksum)
544 }
545
546 fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
551 let metadata_path = self
552 .backup_path
553 .join(&metadata.user_id)
554 .join(format!("backup_{}.json", metadata.backup_id));
555
556 let json = serde_json::to_string_pretty(metadata)?;
557 fs::write(metadata_path, json)?;
558
559 Ok(())
560 }
561
562 fn load_metadata(&self, user_id: &str, backup_id: u32) -> Result<BackupMetadata> {
563 let metadata_path = self
564 .backup_path
565 .join(user_id)
566 .join(format!("backup_{backup_id}.json"));
567
568 let json = fs::read_to_string(metadata_path)?;
569 let metadata = serde_json::from_str(&json)?;
570
571 Ok(metadata)
572 }
573
574 fn calculate_backup_checksum(&self, backup_dir: &Path, backup_id: u32) -> Result<String> {
575 let mut hasher = Sha256::new();
576
577 let backup_path = backup_dir.join(format!("private/{backup_id}"));
579 self.hash_directory_sorted(&backup_path, &mut hasher)?;
580
581 let secondary_path = backup_dir.join(format!("secondary_{backup_id}"));
583 self.hash_directory_sorted(&secondary_path, &mut hasher)?;
584
585 let result = hasher.finalize();
586 Ok(format!("{result:x}"))
587 }
588
589 fn hash_directory_sorted(&self, dir: &Path, hasher: &mut Sha256) -> Result<()> {
590 if !dir.exists() {
591 return Ok(());
592 }
593
594 let mut entries: Vec<_> = fs::read_dir(dir)?.filter_map(|e| e.ok()).collect();
595 entries.sort_by_key(|e| e.file_name());
596
597 for entry in entries {
598 let path = entry.path();
599 hasher.update(entry.file_name().to_string_lossy().as_bytes());
601 if path.is_dir() {
602 self.hash_directory_sorted(&path, hasher)?;
604 } else {
605 let file_contents = fs::read(&path)?;
606 hasher.update(&file_contents);
607 }
608 }
609 Ok(())
610 }
611
612 fn estimate_memory_count(&self, db: &DB) -> Result<usize> {
613 let mut count = 0;
615 let iter = db.iterator(rocksdb::IteratorMode::Start);
616
617 for _ in iter {
618 count += 1;
619 }
620
621 Ok(count)
622 }
623}
624
625pub fn copy_dir_recursive_pub(src: &Path, dst: &Path) -> Result<()> {
627 copy_dir_recursive(src, dst)
628}
629
630fn dir_size(path: &Path) -> Result<u64> {
632 let mut total = 0u64;
633 if path.is_dir() {
634 for entry in fs::read_dir(path)? {
635 let entry = entry?;
636 let entry_path = entry.path();
637 if entry_path.is_dir() {
638 total += dir_size(&entry_path)?;
639 } else {
640 total += entry.metadata()?.len();
641 }
642 }
643 }
644 Ok(total)
645}
646
647fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
649 fs::create_dir_all(dst)?;
650 for entry in fs::read_dir(src)? {
651 let entry = entry?;
652 let src_path = entry.path();
653 let dst_path = dst.join(entry.file_name());
654 if src_path.is_dir() {
655 copy_dir_recursive(&src_path, &dst_path)?;
656 } else {
657 fs::copy(&src_path, &dst_path)?;
658 }
659 }
660 Ok(())
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use rocksdb::Options;
667 use serde_json::Value;
668 use tempfile::TempDir;
669
670 #[test]
671 fn test_backup_engine_creation() {
672 let temp_dir = TempDir::new().unwrap();
673 let backup_engine = ShodhBackupEngine::new(temp_dir.path().to_path_buf());
674 assert!(backup_engine.is_ok());
675 }
676
677 #[test]
678 fn test_backup_metadata_serialization() {
679 let metadata = BackupMetadata {
680 backup_id: 1,
681 created_at: Utc::now(),
682 user_id: "test_user".to_string(),
683 backup_type: BackupType::Full,
684 size_bytes: 1024,
685 checksum: "abc123".to_string(),
686 memory_count: 100,
687 sequence_number: 42,
688 secondary_stores: vec!["todo_items".to_string(), "prospective_tasks".to_string()],
689 secondary_size_bytes: 2048,
690 };
691
692 let json = serde_json::to_string(&metadata).unwrap();
693 let deserialized: BackupMetadata = serde_json::from_str(&json).unwrap();
694
695 assert_eq!(metadata.backup_id, deserialized.backup_id);
696 assert_eq!(metadata.user_id, deserialized.user_id);
697 }
698
699 #[test]
700 fn test_dir_size_counts_nested_files() {
701 let temp_dir = TempDir::new().unwrap();
702 let root = temp_dir.path();
703 let nested = root.join("nested");
704 fs::create_dir_all(&nested).unwrap();
705
706 fs::write(root.join("a.txt"), b"12345").unwrap();
707 fs::write(nested.join("b.txt"), b"1234567890").unwrap();
708
709 let size = dir_size(root).unwrap();
710 assert_eq!(size, 15);
711 }
712
713 #[test]
714 fn test_copy_dir_recursive_pub_copies_files() {
715 let temp_dir = TempDir::new().unwrap();
716 let src = temp_dir.path().join("src");
717 let dst = temp_dir.path().join("dst");
718 fs::create_dir_all(src.join("deep")).unwrap();
719 fs::write(src.join("file1.txt"), b"alpha").unwrap();
720 fs::write(src.join("deep").join("file2.txt"), b"beta").unwrap();
721
722 copy_dir_recursive_pub(&src, &dst).unwrap();
723
724 assert_eq!(fs::read(dst.join("file1.txt")).unwrap(), b"alpha");
725 assert_eq!(
726 fs::read(dst.join("deep").join("file2.txt")).unwrap(),
727 b"beta"
728 );
729 }
730
731 #[test]
732 fn test_list_backups_empty_when_user_missing() {
733 let temp_dir = TempDir::new().unwrap();
734 let engine = ShodhBackupEngine::new(temp_dir.path().to_path_buf()).unwrap();
735 let backups = engine.list_backups("missing-user").unwrap();
736 assert!(backups.is_empty());
737 }
738
739 #[test]
740 fn test_verify_backup_round_trip() {
741 let temp_dir = TempDir::new().unwrap();
742 let backup_root = temp_dir.path().join("backups");
743 let db_path = temp_dir.path().join("db");
744 let user_id = "user1";
745
746 let mut opts = Options::default();
747 opts.create_if_missing(true);
748 let db = DB::open(&opts, &db_path).unwrap();
749 db.put(b"k1", b"v1").unwrap();
750 db.put(b"k2", b"v2").unwrap();
751
752 let engine = ShodhBackupEngine::new(backup_root.clone()).unwrap();
753 let metadata = engine.create_backup(&db, user_id).unwrap();
754
755 let verified = engine.verify_backup(user_id, metadata.backup_id).unwrap();
756 assert!(verified);
757 }
758
759 #[test]
760 fn test_verify_backup_detects_checksum_mismatch() {
761 let temp_dir = TempDir::new().unwrap();
762 let backup_root = temp_dir.path().join("backups");
763 let db_path = temp_dir.path().join("db");
764 let user_id = "user2";
765
766 let mut opts = Options::default();
767 opts.create_if_missing(true);
768 let db = DB::open(&opts, &db_path).unwrap();
769 db.put(b"k", b"v").unwrap();
770
771 let engine = ShodhBackupEngine::new(backup_root.clone()).unwrap();
772 let metadata = engine.create_backup(&db, user_id).unwrap();
773
774 let metadata_path = backup_root
775 .join(user_id)
776 .join(format!("backup_{}.json", metadata.backup_id));
777 let json = fs::read_to_string(&metadata_path).unwrap();
778 let mut parsed: Value = serde_json::from_str(&json).unwrap();
779 parsed["checksum"] = Value::String("0000badchecksum".to_string());
780 fs::write(
781 &metadata_path,
782 serde_json::to_string_pretty(&parsed).unwrap(),
783 )
784 .unwrap();
785
786 let verified = engine.verify_backup(user_id, metadata.backup_id).unwrap();
787 assert!(!verified);
788 }
789
790 #[test]
791 fn test_purge_old_backups_validates_keep_count() {
792 let temp_dir = TempDir::new().unwrap();
793 let engine = ShodhBackupEngine::new(temp_dir.path().to_path_buf()).unwrap();
794 let err = engine.purge_old_backups("user", 0).unwrap_err();
795 assert!(err.to_string().contains("keep_count must be >= 1"));
796 }
797
798 #[test]
799 fn test_purge_old_backups_removes_old_entries() {
800 let temp_dir = TempDir::new().unwrap();
801 let backup_root = temp_dir.path().join("backups");
802 let db_path = temp_dir.path().join("db");
803 let user_id = "purge-user";
804
805 let mut opts = Options::default();
806 opts.create_if_missing(true);
807 let db = DB::open(&opts, &db_path).unwrap();
808
809 let engine = ShodhBackupEngine::new(backup_root.clone()).unwrap();
810 db.put(b"k1", b"v1").unwrap();
811 let first = engine.create_backup(&db, user_id).unwrap();
812 db.put(b"k2", b"v2").unwrap();
813 let second = engine.create_backup(&db, user_id).unwrap();
814
815 let purged = engine.purge_old_backups(user_id, 1).unwrap();
816 assert_eq!(purged, 1);
817
818 let remaining = engine.list_backups(user_id).unwrap();
819 assert_eq!(remaining.len(), 1);
820 assert_eq!(remaining[0].backup_id, second.backup_id);
821
822 let first_metadata_path = backup_root
823 .join(user_id)
824 .join(format!("backup_{}.json", first.backup_id));
825 assert!(!first_metadata_path.exists());
826 }
827}