1use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31 Full,
33 Incremental,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40 pub kind: BackupKind,
42 pub filename: String,
44 pub start_epoch: EpochId,
46 pub end_epoch: EpochId,
48 pub checksum: u32,
50 pub size_bytes: u64,
52 pub created_at_ms: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59 pub version: u32,
61 pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 version: 1,
71 segments: Vec::new(),
72 }
73 }
74
75 #[must_use]
77 pub fn latest_full(&self) -> Option<&BackupSegment> {
78 self.segments
79 .iter()
80 .rev()
81 .find(|s| s.kind == BackupKind::Full)
82 }
83
84 pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86 self.segments
87 .iter()
88 .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89 .collect()
90 }
91
92 #[must_use]
94 pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95 let first = self.segments.first()?;
96 let last = self.segments.last()?;
97 Some((first.start_epoch, last.end_epoch))
98 }
99}
100
101impl Default for BackupManifest {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112 pub backed_up_epoch: EpochId,
114 pub log_sequence: u64,
116 pub timestamp_ms: u64,
118}
119
120const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133 let path = backup_dir.join(MANIFEST_FILENAME);
134 if !path.exists() {
135 return Ok(None);
136 }
137 let data = std::fs::read(&path)
138 .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139 let (manifest, _): (BackupManifest, _) =
140 bincode::serde::decode_from_slice(&data, bincode::config::standard())
141 .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142 Ok(Some(manifest))
143}
144
145pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153 std::fs::create_dir_all(backup_dir)
154 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156 let path = backup_dir.join(MANIFEST_FILENAME);
157 let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159 let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160 .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162 std::fs::write(&temp_path, data)
163 .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164 std::fs::rename(&temp_path, &path)
165 .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167 Ok(())
168}
169
170pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181 if !path.exists() {
182 return Ok(None);
183 }
184 let data = std::fs::read(&path)
185 .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186 let cursor: BackupCursor =
187 bincode::serde::decode_from_slice(&data, bincode::config::standard())
188 .map(|(c, _)| c)
189 .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190 Ok(Some(cursor))
191}
192
193pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202 let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204 let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205 .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207 std::fs::write(&temp_path, &data)
208 .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209 std::fs::rename(&temp_path, &path)
210 .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212 Ok(())
213}
214
215pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219pub const BACKUP_VERSION: u32 = 1;
221
222pub const BACKUP_HEADER_SIZE: usize = 32;
233
234pub fn write_backup_header(
236 buf: &mut Vec<u8>,
237 start_epoch: EpochId,
238 end_epoch: EpochId,
239 record_count: u64,
240) {
241 buf.extend_from_slice(&BACKUP_MAGIC);
242 buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243 buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244 buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245 buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260 if data.len() < BACKUP_HEADER_SIZE {
261 return Err(Error::Internal(
262 "incremental backup file too short".to_string(),
263 ));
264 }
265 if data[0..4] != BACKUP_MAGIC {
266 return Err(Error::Internal(
267 "invalid backup file magic bytes".to_string(),
268 ));
269 }
270 let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271 if version > BACKUP_VERSION {
272 return Err(Error::Internal(format!(
273 "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274 )));
275 }
276 let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277 let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278 let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279 Ok((start_epoch, end_epoch, record_count))
280}
281
282#[allow(clippy::cast_possible_truncation)]
285pub(super) fn now_ms() -> u64 {
286 std::time::SystemTime::now()
287 .duration_since(std::time::UNIX_EPOCH)
288 .map(|d| d.as_millis() as u64)
289 .unwrap_or(0)
290}
291
292use grafeo_storage::file::GrafeoFileManager;
295use grafeo_storage::wal::LpgWal;
296
297pub(super) fn do_backup_full(
310 backup_dir: &Path,
311 fm: &GrafeoFileManager,
312 wal: Option<&LpgWal>,
313 current_epoch: EpochId,
314) -> Result<BackupSegment> {
315 std::fs::create_dir_all(backup_dir)
316 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
317
318 let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
320 let segment_idx = manifest.segments.len();
321 let filename = format!("backup_full_{segment_idx:04}.grafeo");
322 let dest_path = backup_dir.join(&filename);
323
324 fm.copy_to(&dest_path)?;
326
327 let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
328 let file_data = std::fs::read(&dest_path)
329 .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
330 let checksum = crc32fast::hash(&file_data);
331
332 let segment = BackupSegment {
333 kind: BackupKind::Full,
334 filename,
335 start_epoch: EpochId::new(0),
336 end_epoch: current_epoch,
337 checksum,
338 size_bytes: file_size,
339 created_at_ms: now_ms(),
340 };
341
342 manifest.segments.push(segment.clone());
343 write_manifest(backup_dir, &manifest)?;
344
345 if let Some(wal) = wal {
351 let backed_up_sequence = wal.current_sequence();
354 wal.rotate()
355 .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
356 let cursor = BackupCursor {
357 backed_up_epoch: current_epoch,
358 log_sequence: backed_up_sequence,
359 timestamp_ms: now_ms(),
360 };
361 write_backup_cursor(wal.dir(), &cursor)?;
362 }
363
364 Ok(segment)
365}
366
367pub(super) fn do_backup_incremental(
377 backup_dir: &Path,
378 wal: &LpgWal,
379 current_epoch: EpochId,
380) -> Result<BackupSegment> {
381 let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
382 Error::Internal("no backup manifest found; run a full backup first".to_string())
383 })?;
384
385 if manifest.latest_full().is_none() {
386 return Err(Error::Internal(
387 "no full backup in manifest; run a full backup first".to_string(),
388 ));
389 }
390
391 let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
392 Error::Internal("no backup cursor found; run a full backup first".to_string())
393 })?;
394
395 let log_files = wal.log_files()?;
396 if log_files.is_empty() {
397 return Err(Error::Internal("no WAL log files to backup".to_string()));
398 }
399
400 let mut wal_data = Vec::new();
402 let mut record_count = 0u64;
403 for file_path in &log_files {
406 let seq = file_path
407 .file_stem()
408 .and_then(|s| s.to_str())
409 .and_then(|s| s.strip_prefix("wal_"))
410 .and_then(|s| s.parse::<u64>().ok())
411 .unwrap_or(0);
412
413 if seq <= cursor.log_sequence {
418 continue;
419 }
420
421 let file_bytes = std::fs::read(file_path).map_err(|e| {
422 Error::Internal(format!(
423 "failed to read WAL file {}: {e}",
424 file_path.display()
425 ))
426 })?;
427
428 if !file_bytes.is_empty() {
429 wal_data.extend_from_slice(&file_bytes);
430 record_count += 1; }
434 }
435
436 if wal_data.is_empty() {
437 return Err(Error::Internal(
438 "no new WAL records since last backup".to_string(),
439 ));
440 }
441
442 let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
443 let end_epoch = current_epoch;
444
445 let segment_idx = manifest.segments.len();
447 let filename = format!("backup_incr_{segment_idx:04}.wal");
448 let dest_path = backup_dir.join(&filename);
449
450 let mut output = Vec::new();
451 write_backup_header(&mut output, start_epoch, end_epoch, record_count);
452 output.extend_from_slice(&wal_data);
453
454 std::fs::write(&dest_path, &output)
455 .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
456
457 let checksum = crc32fast::hash(&output);
458 let segment = BackupSegment {
459 kind: BackupKind::Incremental,
460 filename,
461 start_epoch,
462 end_epoch,
463 checksum,
464 size_bytes: output.len() as u64,
465 created_at_ms: now_ms(),
466 };
467
468 let mut manifest = manifest;
470 manifest.segments.push(segment.clone());
471 write_manifest(backup_dir, &manifest)?;
472
473 let backed_up_sequence = wal.current_sequence();
476 wal.rotate().map_err(|e| {
477 Error::Internal(format!(
478 "failed to rotate WAL after incremental backup: {e}"
479 ))
480 })?;
481
482 let new_cursor = BackupCursor {
484 backed_up_epoch: current_epoch,
485 log_sequence: backed_up_sequence,
486 timestamp_ms: now_ms(),
487 };
488 write_backup_cursor(wal.dir(), &new_cursor)?;
489
490 Ok(segment)
491}
492
493pub(super) fn do_restore_to_epoch(
507 backup_dir: &Path,
508 target_epoch: EpochId,
509 output_path: &Path,
510) -> Result<()> {
511 let manifest = read_manifest(backup_dir)?
512 .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
513
514 let full = manifest
516 .segments
517 .iter()
518 .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
519 .ok_or_else(|| {
520 Error::Internal(format!(
521 "no full backup covers epoch {}",
522 target_epoch.as_u64()
523 ))
524 })?;
525
526 let full_path = backup_dir.join(&full.filename);
528 std::fs::copy(&full_path, output_path)
529 .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
530
531 let incrementals: Vec<&BackupSegment> = manifest
533 .segments
534 .iter()
535 .filter(|s| {
536 s.kind == BackupKind::Incremental
537 && s.start_epoch > full.end_epoch
538 && s.start_epoch <= target_epoch
539 })
540 .collect();
541
542 if incrementals.is_empty() {
543 return Ok(());
545 }
546
547 let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
549 "{}.restore_wal",
550 output_path
551 .file_name()
552 .and_then(|n| n.to_str())
553 .unwrap_or("db")
554 ));
555 std::fs::create_dir_all(&wal_dir)
556 .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
557
558 for (i, incr) in incrementals.iter().enumerate() {
560 let incr_path = backup_dir.join(&incr.filename);
561 let incr_data = std::fs::read(&incr_path).map_err(|e| {
562 Error::Internal(format!(
563 "failed to read incremental backup {}: {e}",
564 incr.filename
565 ))
566 })?;
567
568 let actual_crc = crc32fast::hash(&incr_data);
570 if actual_crc != incr.checksum {
571 return Err(Error::Internal(format!(
572 "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
573 incr.filename, incr.checksum,
574 )));
575 }
576
577 if incr_data.len() > BACKUP_HEADER_SIZE {
579 let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
580 let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
581 std::fs::write(&wal_file, wal_frames).map_err(|e| {
582 Error::Internal(format!("failed to write WAL file for restore: {e}"))
583 })?;
584 }
585 }
586
587 let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
592 let records = recovery.recover_until_epoch(target_epoch)?;
593
594 let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
596 "{}.trimmed_wal",
597 wal_dir
598 .file_name()
599 .and_then(|n| n.to_str())
600 .unwrap_or("wal")
601 ));
602 std::fs::create_dir_all(&trimmed_dir)
603 .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
604
605 if !records.is_empty() {
606 use grafeo_storage::wal::{LpgWal, WalConfig};
607 let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
608 for record in &records {
609 trimmed_wal.log(record)?;
610 }
611 trimmed_wal.flush()?;
612 drop(trimmed_wal);
613 }
614
615 std::fs::remove_dir_all(&wal_dir)
617 .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
618
619 let sidecar_dir = format!("{}.wal", output_path.display());
621 let sidecar_path = std::path::Path::new(&sidecar_dir);
622 if sidecar_path.exists() {
623 std::fs::remove_dir_all(sidecar_path)
624 .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
625 }
626 std::fs::rename(&trimmed_dir, sidecar_path)
627 .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
628
629 Ok(())
630}
631
632#[cfg(test)]
635mod tests {
636 use super::*;
637 use tempfile::TempDir;
638
639 #[test]
640 fn test_manifest_new() {
641 let manifest = BackupManifest::new();
642 assert_eq!(manifest.version, 1);
643 assert!(manifest.segments.is_empty());
644 assert!(manifest.latest_full().is_none());
645 assert!(manifest.epoch_range().is_none());
646 }
647
648 #[test]
649 fn test_manifest_with_segments() {
650 let mut manifest = BackupManifest::new();
651 manifest.segments.push(BackupSegment {
652 kind: BackupKind::Full,
653 filename: "backup_full_0000.grafeo".to_string(),
654 start_epoch: EpochId::new(0),
655 end_epoch: EpochId::new(100),
656 checksum: 12345,
657 size_bytes: 1024,
658 created_at_ms: 1000,
659 });
660 manifest.segments.push(BackupSegment {
661 kind: BackupKind::Incremental,
662 filename: "backup_incr_0001.wal".to_string(),
663 start_epoch: EpochId::new(101),
664 end_epoch: EpochId::new(200),
665 checksum: 67890,
666 size_bytes: 256,
667 created_at_ms: 2000,
668 });
669
670 let full = manifest.latest_full().unwrap();
671 assert_eq!(full.end_epoch, EpochId::new(100));
672
673 let incrs = manifest.incrementals_after(EpochId::new(100));
674 assert_eq!(incrs.len(), 1);
675 assert_eq!(incrs[0].start_epoch, EpochId::new(101));
676
677 let (start, end) = manifest.epoch_range().unwrap();
678 assert_eq!(start, EpochId::new(0));
679 assert_eq!(end, EpochId::new(200));
680 }
681
682 #[test]
683 fn test_manifest_round_trip() {
684 let dir = TempDir::new().unwrap();
685 let mut manifest = BackupManifest::new();
686 manifest.segments.push(BackupSegment {
687 kind: BackupKind::Full,
688 filename: "test.grafeo".to_string(),
689 start_epoch: EpochId::new(0),
690 end_epoch: EpochId::new(50),
691 checksum: 0,
692 size_bytes: 512,
693 created_at_ms: 0,
694 });
695
696 write_manifest(dir.path(), &manifest).unwrap();
697 let loaded = read_manifest(dir.path()).unwrap().unwrap();
698 assert_eq!(loaded.segments.len(), 1);
699 assert_eq!(loaded.segments[0].filename, "test.grafeo");
700 }
701
702 #[test]
703 fn test_manifest_not_found() {
704 let dir = TempDir::new().unwrap();
705 assert!(read_manifest(dir.path()).unwrap().is_none());
706 }
707
708 #[test]
709 fn test_backup_cursor_round_trip() {
710 let dir = TempDir::new().unwrap();
711 let cursor = BackupCursor {
712 backed_up_epoch: EpochId::new(42),
713 log_sequence: 7,
714 timestamp_ms: 12345,
715 };
716
717 write_backup_cursor(dir.path(), &cursor).unwrap();
718 let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
719 assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
720 assert_eq!(loaded.log_sequence, 7);
721 assert_eq!(loaded.timestamp_ms, 12345);
722 }
723
724 #[test]
725 fn test_backup_cursor_not_found() {
726 let dir = TempDir::new().unwrap();
727 assert!(read_backup_cursor(dir.path()).unwrap().is_none());
728 }
729
730 #[test]
731 fn test_backup_header_round_trip() {
732 let mut buf = Vec::new();
733 write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
734 assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
735
736 let (start, end, count) = read_backup_header(&buf).unwrap();
737 assert_eq!(start, EpochId::new(101));
738 assert_eq!(end, EpochId::new(200));
739 assert_eq!(count, 500);
740 }
741
742 #[test]
743 fn test_backup_header_invalid_magic() {
744 let data = vec![
745 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
746 0, 0, 0, 0, 0, 0, 0,
747 ];
748 assert!(read_backup_header(&data).is_err());
749 }
750
751 #[test]
752 fn test_backup_header_too_short() {
753 let data = vec![0, 0, 0, 0];
754 assert!(read_backup_header(&data).is_err());
755 }
756
757 #[test]
758 fn test_backup_kind_serialization() {
759 let config = bincode::config::standard();
760 let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
761 let (parsed, _): (BackupKind, _) =
762 bincode::serde::decode_from_slice(&encoded, config).unwrap();
763 assert_eq!(parsed, BackupKind::Full);
764 }
765}