1use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31 Full,
33 Incremental,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40 pub kind: BackupKind,
42 pub filename: String,
44 pub start_epoch: EpochId,
46 pub end_epoch: EpochId,
48 pub checksum: u32,
50 pub size_bytes: u64,
52 pub created_at_ms: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59 pub version: u32,
61 pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 version: 1,
71 segments: Vec::new(),
72 }
73 }
74
75 #[must_use]
77 pub fn latest_full(&self) -> Option<&BackupSegment> {
78 self.segments
79 .iter()
80 .rev()
81 .find(|s| s.kind == BackupKind::Full)
82 }
83
84 pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86 self.segments
87 .iter()
88 .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89 .collect()
90 }
91
92 #[must_use]
94 pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95 let first = self.segments.first()?;
96 let last = self.segments.last()?;
97 Some((first.start_epoch, last.end_epoch))
98 }
99}
100
101impl Default for BackupManifest {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112 pub backed_up_epoch: EpochId,
114 pub log_sequence: u64,
116 pub timestamp_ms: u64,
118}
119
120const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133 let path = backup_dir.join(MANIFEST_FILENAME);
134 if !path.exists() {
135 return Ok(None);
136 }
137 let data = std::fs::read(&path)
138 .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139 let (manifest, _): (BackupManifest, _) =
140 bincode::serde::decode_from_slice(&data, bincode::config::standard())
141 .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142 Ok(Some(manifest))
143}
144
145pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153 std::fs::create_dir_all(backup_dir)
154 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156 let path = backup_dir.join(MANIFEST_FILENAME);
157 let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159 let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160 .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162 std::fs::write(&temp_path, data)
163 .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164 std::fs::rename(&temp_path, &path)
165 .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167 Ok(())
168}
169
170pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181 if !path.exists() {
182 return Ok(None);
183 }
184 let data = std::fs::read(&path)
185 .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186 let cursor: BackupCursor =
187 bincode::serde::decode_from_slice(&data, bincode::config::standard())
188 .map(|(c, _)| c)
189 .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190 Ok(Some(cursor))
191}
192
193pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202 let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204 let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205 .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207 std::fs::write(&temp_path, &data)
208 .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209 std::fs::rename(&temp_path, &path)
210 .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212 Ok(())
213}
214
215pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219pub const BACKUP_VERSION: u32 = 1;
221
222pub const BACKUP_HEADER_SIZE: usize = 32;
233
234pub fn write_backup_header(
236 buf: &mut Vec<u8>,
237 start_epoch: EpochId,
238 end_epoch: EpochId,
239 record_count: u64,
240) {
241 buf.extend_from_slice(&BACKUP_MAGIC);
242 buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243 buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244 buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245 buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260 if data.len() < BACKUP_HEADER_SIZE {
261 return Err(Error::Internal(
262 "incremental backup file too short".to_string(),
263 ));
264 }
265 if data[0..4] != BACKUP_MAGIC {
266 return Err(Error::Internal(
267 "invalid backup file magic bytes".to_string(),
268 ));
269 }
270 let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271 if version > BACKUP_VERSION {
272 return Err(Error::Internal(format!(
273 "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274 )));
275 }
276 let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277 let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278 let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279 Ok((start_epoch, end_epoch, record_count))
280}
281
282#[allow(clippy::cast_possible_truncation)]
285pub(super) fn now_ms() -> u64 {
286 std::time::SystemTime::now()
287 .duration_since(std::time::UNIX_EPOCH)
288 .map_or(0, |d| d.as_millis() as u64)
289}
290
291use grafeo_storage::file::GrafeoFileManager;
294use grafeo_storage::wal::LpgWal;
295
296pub(super) fn do_backup_full(
309 backup_dir: &Path,
310 fm: &GrafeoFileManager,
311 wal: Option<&LpgWal>,
312 current_epoch: EpochId,
313) -> Result<BackupSegment> {
314 std::fs::create_dir_all(backup_dir)
315 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
316
317 let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
319 let segment_idx = manifest.segments.len();
320 let filename = format!("backup_full_{segment_idx:04}.grafeo");
321 let dest_path = backup_dir.join(&filename);
322
323 fm.copy_to(&dest_path)?;
325
326 let file_size = std::fs::metadata(&dest_path).map_or(0, |m| m.len());
327 let file_data = std::fs::read(&dest_path)
328 .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
329 let checksum = crc32fast::hash(&file_data);
330
331 let segment = BackupSegment {
332 kind: BackupKind::Full,
333 filename,
334 start_epoch: EpochId::new(0),
335 end_epoch: current_epoch,
336 checksum,
337 size_bytes: file_size,
338 created_at_ms: now_ms(),
339 };
340
341 manifest.segments.push(segment.clone());
342 write_manifest(backup_dir, &manifest)?;
343
344 if let Some(wal) = wal {
350 let backed_up_sequence = wal.current_sequence();
353 wal.rotate()
354 .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
355 let cursor = BackupCursor {
356 backed_up_epoch: current_epoch,
357 log_sequence: backed_up_sequence,
358 timestamp_ms: now_ms(),
359 };
360 write_backup_cursor(wal.dir(), &cursor)?;
361 }
362
363 Ok(segment)
364}
365
366pub(super) fn do_backup_incremental(
376 backup_dir: &Path,
377 wal: &LpgWal,
378 current_epoch: EpochId,
379) -> Result<BackupSegment> {
380 let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
381 Error::Internal("no backup manifest found; run a full backup first".to_string())
382 })?;
383
384 if manifest.latest_full().is_none() {
385 return Err(Error::Internal(
386 "no full backup in manifest; run a full backup first".to_string(),
387 ));
388 }
389
390 let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
391 Error::Internal("no backup cursor found; run a full backup first".to_string())
392 })?;
393
394 let log_files = wal.log_files()?;
395 if log_files.is_empty() {
396 return Err(Error::Internal("no WAL log files to backup".to_string()));
397 }
398
399 let mut wal_data = Vec::new();
401 let mut record_count = 0u64;
402 for file_path in &log_files {
405 let seq = file_path
406 .file_stem()
407 .and_then(|s| s.to_str())
408 .and_then(|s| s.strip_prefix("wal_"))
409 .and_then(|s| s.parse::<u64>().ok())
410 .unwrap_or(0);
411
412 if seq <= cursor.log_sequence {
417 continue;
418 }
419
420 let file_bytes = std::fs::read(file_path).map_err(|e| {
421 Error::Internal(format!(
422 "failed to read WAL file {}: {e}",
423 file_path.display()
424 ))
425 })?;
426
427 if !file_bytes.is_empty() {
428 wal_data.extend_from_slice(&file_bytes);
429 record_count += 1; }
433 }
434
435 if wal_data.is_empty() {
436 return Err(Error::Internal(
437 "no new WAL records since last backup".to_string(),
438 ));
439 }
440
441 let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
442 let end_epoch = current_epoch;
443
444 let segment_idx = manifest.segments.len();
446 let filename = format!("backup_incr_{segment_idx:04}.wal");
447 let dest_path = backup_dir.join(&filename);
448
449 let mut output = Vec::new();
450 write_backup_header(&mut output, start_epoch, end_epoch, record_count);
451 output.extend_from_slice(&wal_data);
452
453 std::fs::write(&dest_path, &output)
454 .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
455
456 let checksum = crc32fast::hash(&output);
457 let segment = BackupSegment {
458 kind: BackupKind::Incremental,
459 filename,
460 start_epoch,
461 end_epoch,
462 checksum,
463 size_bytes: output.len() as u64,
464 created_at_ms: now_ms(),
465 };
466
467 let mut manifest = manifest;
469 manifest.segments.push(segment.clone());
470 write_manifest(backup_dir, &manifest)?;
471
472 let backed_up_sequence = wal.current_sequence();
475 wal.rotate().map_err(|e| {
476 Error::Internal(format!(
477 "failed to rotate WAL after incremental backup: {e}"
478 ))
479 })?;
480
481 let new_cursor = BackupCursor {
483 backed_up_epoch: current_epoch,
484 log_sequence: backed_up_sequence,
485 timestamp_ms: now_ms(),
486 };
487 write_backup_cursor(wal.dir(), &new_cursor)?;
488
489 Ok(segment)
490}
491
492pub(super) fn do_restore_to_epoch(
506 backup_dir: &Path,
507 target_epoch: EpochId,
508 output_path: &Path,
509) -> Result<()> {
510 let manifest = read_manifest(backup_dir)?
511 .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
512
513 let full = manifest
515 .segments
516 .iter()
517 .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
518 .ok_or_else(|| {
519 Error::Internal(format!(
520 "no full backup covers epoch {}",
521 target_epoch.as_u64()
522 ))
523 })?;
524
525 let full_path = backup_dir.join(&full.filename);
527 std::fs::copy(&full_path, output_path)
528 .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
529
530 let incrementals: Vec<&BackupSegment> = manifest
532 .segments
533 .iter()
534 .filter(|s| {
535 s.kind == BackupKind::Incremental
536 && s.start_epoch > full.end_epoch
537 && s.start_epoch <= target_epoch
538 })
539 .collect();
540
541 if incrementals.is_empty() {
542 return Ok(());
544 }
545
546 let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
548 "{}.restore_wal",
549 output_path
550 .file_name()
551 .and_then(|n| n.to_str())
552 .unwrap_or("db")
553 ));
554 std::fs::create_dir_all(&wal_dir)
555 .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
556
557 for (i, incr) in incrementals.iter().enumerate() {
559 let incr_path = backup_dir.join(&incr.filename);
560 let incr_data = std::fs::read(&incr_path).map_err(|e| {
561 Error::Internal(format!(
562 "failed to read incremental backup {}: {e}",
563 incr.filename
564 ))
565 })?;
566
567 let actual_crc = crc32fast::hash(&incr_data);
569 if actual_crc != incr.checksum {
570 return Err(Error::Internal(format!(
571 "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
572 incr.filename, incr.checksum,
573 )));
574 }
575
576 if incr_data.len() > BACKUP_HEADER_SIZE {
578 let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
579 let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
580 std::fs::write(&wal_file, wal_frames).map_err(|e| {
581 Error::Internal(format!("failed to write WAL file for restore: {e}"))
582 })?;
583 }
584 }
585
586 let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
591 let records = recovery.recover_until_epoch(target_epoch)?;
592
593 let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
595 "{}.trimmed_wal",
596 wal_dir
597 .file_name()
598 .and_then(|n| n.to_str())
599 .unwrap_or("wal")
600 ));
601 std::fs::create_dir_all(&trimmed_dir)
602 .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
603
604 if !records.is_empty() {
605 use grafeo_storage::wal::{LpgWal, WalConfig};
606 let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
607 for record in &records {
608 trimmed_wal.log(record)?;
609 }
610 trimmed_wal.flush()?;
611 drop(trimmed_wal);
612 }
613
614 std::fs::remove_dir_all(&wal_dir)
616 .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
617
618 let sidecar_dir = format!("{}.wal", output_path.display());
620 let sidecar_path = std::path::Path::new(&sidecar_dir);
621 if sidecar_path.exists() {
622 std::fs::remove_dir_all(sidecar_path)
623 .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
624 }
625 std::fs::rename(&trimmed_dir, sidecar_path)
626 .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
627
628 Ok(())
629}
630
631#[cfg(test)]
634mod tests {
635 use super::*;
636 use tempfile::TempDir;
637
638 #[test]
639 fn test_manifest_new() {
640 let manifest = BackupManifest::new();
641 assert_eq!(manifest.version, 1);
642 assert!(manifest.segments.is_empty());
643 assert!(manifest.latest_full().is_none());
644 assert!(manifest.epoch_range().is_none());
645 }
646
647 #[test]
648 fn test_manifest_with_segments() {
649 let mut manifest = BackupManifest::new();
650 manifest.segments.push(BackupSegment {
651 kind: BackupKind::Full,
652 filename: "backup_full_0000.grafeo".to_string(),
653 start_epoch: EpochId::new(0),
654 end_epoch: EpochId::new(100),
655 checksum: 12345,
656 size_bytes: 1024,
657 created_at_ms: 1000,
658 });
659 manifest.segments.push(BackupSegment {
660 kind: BackupKind::Incremental,
661 filename: "backup_incr_0001.wal".to_string(),
662 start_epoch: EpochId::new(101),
663 end_epoch: EpochId::new(200),
664 checksum: 67890,
665 size_bytes: 256,
666 created_at_ms: 2000,
667 });
668
669 let full = manifest.latest_full().unwrap();
670 assert_eq!(full.end_epoch, EpochId::new(100));
671
672 let incrs = manifest.incrementals_after(EpochId::new(100));
673 assert_eq!(incrs.len(), 1);
674 assert_eq!(incrs[0].start_epoch, EpochId::new(101));
675
676 let (start, end) = manifest.epoch_range().unwrap();
677 assert_eq!(start, EpochId::new(0));
678 assert_eq!(end, EpochId::new(200));
679 }
680
681 #[test]
682 fn test_manifest_round_trip() {
683 let dir = TempDir::new().unwrap();
684 let mut manifest = BackupManifest::new();
685 manifest.segments.push(BackupSegment {
686 kind: BackupKind::Full,
687 filename: "test.grafeo".to_string(),
688 start_epoch: EpochId::new(0),
689 end_epoch: EpochId::new(50),
690 checksum: 0,
691 size_bytes: 512,
692 created_at_ms: 0,
693 });
694
695 write_manifest(dir.path(), &manifest).unwrap();
696 let loaded = read_manifest(dir.path()).unwrap().unwrap();
697 assert_eq!(loaded.segments.len(), 1);
698 assert_eq!(loaded.segments[0].filename, "test.grafeo");
699 }
700
701 #[test]
702 fn test_manifest_not_found() {
703 let dir = TempDir::new().unwrap();
704 assert!(read_manifest(dir.path()).unwrap().is_none());
705 }
706
707 #[test]
708 fn test_backup_cursor_round_trip() {
709 let dir = TempDir::new().unwrap();
710 let cursor = BackupCursor {
711 backed_up_epoch: EpochId::new(42),
712 log_sequence: 7,
713 timestamp_ms: 12345,
714 };
715
716 write_backup_cursor(dir.path(), &cursor).unwrap();
717 let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
718 assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
719 assert_eq!(loaded.log_sequence, 7);
720 assert_eq!(loaded.timestamp_ms, 12345);
721 }
722
723 #[test]
724 fn test_backup_cursor_not_found() {
725 let dir = TempDir::new().unwrap();
726 assert!(read_backup_cursor(dir.path()).unwrap().is_none());
727 }
728
729 #[test]
730 fn test_backup_header_round_trip() {
731 let mut buf = Vec::new();
732 write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
733 assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
734
735 let (start, end, count) = read_backup_header(&buf).unwrap();
736 assert_eq!(start, EpochId::new(101));
737 assert_eq!(end, EpochId::new(200));
738 assert_eq!(count, 500);
739 }
740
741 #[test]
742 fn test_backup_header_invalid_magic() {
743 let data = vec![
744 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
745 0, 0, 0, 0, 0, 0, 0,
746 ];
747 assert!(read_backup_header(&data).is_err());
748 }
749
750 #[test]
751 fn test_backup_header_too_short() {
752 let data = vec![0, 0, 0, 0];
753 assert!(read_backup_header(&data).is_err());
754 }
755
756 #[test]
757 fn test_backup_kind_serialization() {
758 let config = bincode::config::standard();
759 let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
760 let (parsed, _): (BackupKind, _) =
761 bincode::serde::decode_from_slice(&encoded, config).unwrap();
762 assert_eq!(parsed, BackupKind::Full);
763 }
764}