1use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31 Full,
33 Incremental,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40 pub kind: BackupKind,
42 pub filename: String,
44 pub start_epoch: EpochId,
46 pub end_epoch: EpochId,
48 pub checksum: u32,
50 pub size_bytes: u64,
52 pub created_at_ms: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59 pub version: u32,
61 pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 version: 1,
71 segments: Vec::new(),
72 }
73 }
74
75 #[must_use]
77 pub fn latest_full(&self) -> Option<&BackupSegment> {
78 self.segments
79 .iter()
80 .rev()
81 .find(|s| s.kind == BackupKind::Full)
82 }
83
84 pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86 self.segments
87 .iter()
88 .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89 .collect()
90 }
91
92 #[must_use]
94 pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95 let first = self.segments.first()?;
96 let last = self.segments.last()?;
97 Some((first.start_epoch, last.end_epoch))
98 }
99}
100
101impl Default for BackupManifest {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112 pub backed_up_epoch: EpochId,
114 pub log_sequence: u64,
116 pub timestamp_ms: u64,
118}
119
120const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133 let path = backup_dir.join(MANIFEST_FILENAME);
134 if !path.exists() {
135 return Ok(None);
136 }
137 let data = std::fs::read(&path)
138 .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139 let (manifest, _): (BackupManifest, _) =
140 bincode::serde::decode_from_slice(&data, bincode::config::standard())
141 .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142 Ok(Some(manifest))
143}
144
145pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153 std::fs::create_dir_all(backup_dir)
154 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156 let path = backup_dir.join(MANIFEST_FILENAME);
157 let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159 let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160 .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162 std::fs::write(&temp_path, data)
163 .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164 std::fs::rename(&temp_path, &path)
165 .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167 Ok(())
168}
169
170pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181 if !path.exists() {
182 return Ok(None);
183 }
184 let data = std::fs::read(&path)
185 .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186 let cursor: BackupCursor =
187 bincode::serde::decode_from_slice(&data, bincode::config::standard())
188 .map(|(c, _)| c)
189 .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190 Ok(Some(cursor))
191}
192
193pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202 let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204 let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205 .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207 std::fs::write(&temp_path, &data)
208 .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209 std::fs::rename(&temp_path, &path)
210 .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212 Ok(())
213}
214
215pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219pub const BACKUP_VERSION: u32 = 1;
221
222pub const BACKUP_HEADER_SIZE: usize = 32;
233
234pub fn write_backup_header(
236 buf: &mut Vec<u8>,
237 start_epoch: EpochId,
238 end_epoch: EpochId,
239 record_count: u64,
240) {
241 buf.extend_from_slice(&BACKUP_MAGIC);
242 buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243 buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244 buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245 buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260 if data.len() < BACKUP_HEADER_SIZE {
261 return Err(Error::Internal(
262 "incremental backup file too short".to_string(),
263 ));
264 }
265 if data[0..4] != BACKUP_MAGIC {
266 return Err(Error::Internal(
267 "invalid backup file magic bytes".to_string(),
268 ));
269 }
270 let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271 if version > BACKUP_VERSION {
272 return Err(Error::Internal(format!(
273 "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274 )));
275 }
276 let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277 let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278 let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279 Ok((start_epoch, end_epoch, record_count))
280}
281
282pub(super) fn now_ms() -> u64 {
284 std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)
286 .map(|d| d.as_millis() as u64)
287 .unwrap_or(0)
288}
289
290use grafeo_storage::file::GrafeoFileManager;
293use grafeo_storage::wal::LpgWal;
294
295pub(super) fn do_backup_full(
308 backup_dir: &Path,
309 fm: &GrafeoFileManager,
310 wal: Option<&LpgWal>,
311 current_epoch: EpochId,
312) -> Result<BackupSegment> {
313 std::fs::create_dir_all(backup_dir)
314 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
315
316 let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
318 let segment_idx = manifest.segments.len();
319 let filename = format!("backup_full_{segment_idx:04}.grafeo");
320 let dest_path = backup_dir.join(&filename);
321
322 fm.copy_to(&dest_path)?;
324
325 let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
326 let file_data = std::fs::read(&dest_path)
327 .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
328 let checksum = crc32fast::hash(&file_data);
329
330 let segment = BackupSegment {
331 kind: BackupKind::Full,
332 filename,
333 start_epoch: EpochId::new(0),
334 end_epoch: current_epoch,
335 checksum,
336 size_bytes: file_size,
337 created_at_ms: now_ms(),
338 };
339
340 manifest.segments.push(segment.clone());
341 write_manifest(backup_dir, &manifest)?;
342
343 if let Some(wal) = wal {
349 let backed_up_sequence = wal.current_sequence();
352 wal.rotate()
353 .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
354 let cursor = BackupCursor {
355 backed_up_epoch: current_epoch,
356 log_sequence: backed_up_sequence,
357 timestamp_ms: now_ms(),
358 };
359 write_backup_cursor(wal.dir(), &cursor)?;
360 }
361
362 Ok(segment)
363}
364
365pub(super) fn do_backup_incremental(
375 backup_dir: &Path,
376 wal: &LpgWal,
377 current_epoch: EpochId,
378) -> Result<BackupSegment> {
379 let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
380 Error::Internal("no backup manifest found; run a full backup first".to_string())
381 })?;
382
383 if manifest.latest_full().is_none() {
384 return Err(Error::Internal(
385 "no full backup in manifest; run a full backup first".to_string(),
386 ));
387 }
388
389 let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
390 Error::Internal("no backup cursor found; run a full backup first".to_string())
391 })?;
392
393 let log_files = wal.log_files()?;
394 if log_files.is_empty() {
395 return Err(Error::Internal("no WAL log files to backup".to_string()));
396 }
397
398 let mut wal_data = Vec::new();
400 let mut record_count = 0u64;
401 for file_path in &log_files {
404 let seq = file_path
405 .file_stem()
406 .and_then(|s| s.to_str())
407 .and_then(|s| s.strip_prefix("wal_"))
408 .and_then(|s| s.parse::<u64>().ok())
409 .unwrap_or(0);
410
411 if seq <= cursor.log_sequence {
416 continue;
417 }
418
419 let file_bytes = std::fs::read(file_path).map_err(|e| {
420 Error::Internal(format!(
421 "failed to read WAL file {}: {e}",
422 file_path.display()
423 ))
424 })?;
425
426 if !file_bytes.is_empty() {
427 wal_data.extend_from_slice(&file_bytes);
428 record_count += 1; }
432 }
433
434 if wal_data.is_empty() {
435 return Err(Error::Internal(
436 "no new WAL records since last backup".to_string(),
437 ));
438 }
439
440 let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
441 let end_epoch = current_epoch;
442
443 let segment_idx = manifest.segments.len();
445 let filename = format!("backup_incr_{segment_idx:04}.wal");
446 let dest_path = backup_dir.join(&filename);
447
448 let mut output = Vec::new();
449 write_backup_header(&mut output, start_epoch, end_epoch, record_count);
450 output.extend_from_slice(&wal_data);
451
452 std::fs::write(&dest_path, &output)
453 .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
454
455 let checksum = crc32fast::hash(&output);
456 let segment = BackupSegment {
457 kind: BackupKind::Incremental,
458 filename,
459 start_epoch,
460 end_epoch,
461 checksum,
462 size_bytes: output.len() as u64,
463 created_at_ms: now_ms(),
464 };
465
466 let mut manifest = manifest;
468 manifest.segments.push(segment.clone());
469 write_manifest(backup_dir, &manifest)?;
470
471 let backed_up_sequence = wal.current_sequence();
474 wal.rotate().map_err(|e| {
475 Error::Internal(format!(
476 "failed to rotate WAL after incremental backup: {e}"
477 ))
478 })?;
479
480 let new_cursor = BackupCursor {
482 backed_up_epoch: current_epoch,
483 log_sequence: backed_up_sequence,
484 timestamp_ms: now_ms(),
485 };
486 write_backup_cursor(wal.dir(), &new_cursor)?;
487
488 Ok(segment)
489}
490
491pub(super) fn do_restore_to_epoch(
505 backup_dir: &Path,
506 target_epoch: EpochId,
507 output_path: &Path,
508) -> Result<()> {
509 let manifest = read_manifest(backup_dir)?
510 .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
511
512 let full = manifest
514 .segments
515 .iter()
516 .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
517 .ok_or_else(|| {
518 Error::Internal(format!(
519 "no full backup covers epoch {}",
520 target_epoch.as_u64()
521 ))
522 })?;
523
524 let full_path = backup_dir.join(&full.filename);
526 std::fs::copy(&full_path, output_path)
527 .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
528
529 let incrementals: Vec<&BackupSegment> = manifest
531 .segments
532 .iter()
533 .filter(|s| {
534 s.kind == BackupKind::Incremental
535 && s.start_epoch > full.end_epoch
536 && s.start_epoch <= target_epoch
537 })
538 .collect();
539
540 if incrementals.is_empty() {
541 return Ok(());
543 }
544
545 let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
547 "{}.restore_wal",
548 output_path
549 .file_name()
550 .and_then(|n| n.to_str())
551 .unwrap_or("db")
552 ));
553 std::fs::create_dir_all(&wal_dir)
554 .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
555
556 for (i, incr) in incrementals.iter().enumerate() {
558 let incr_path = backup_dir.join(&incr.filename);
559 let incr_data = std::fs::read(&incr_path).map_err(|e| {
560 Error::Internal(format!(
561 "failed to read incremental backup {}: {e}",
562 incr.filename
563 ))
564 })?;
565
566 let actual_crc = crc32fast::hash(&incr_data);
568 if actual_crc != incr.checksum {
569 return Err(Error::Internal(format!(
570 "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
571 incr.filename, incr.checksum,
572 )));
573 }
574
575 if incr_data.len() > BACKUP_HEADER_SIZE {
577 let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
578 let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
579 std::fs::write(&wal_file, wal_frames).map_err(|e| {
580 Error::Internal(format!("failed to write WAL file for restore: {e}"))
581 })?;
582 }
583 }
584
585 let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
590 let records = recovery.recover_until_epoch(target_epoch)?;
591
592 let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
594 "{}.trimmed_wal",
595 wal_dir
596 .file_name()
597 .and_then(|n| n.to_str())
598 .unwrap_or("wal")
599 ));
600 std::fs::create_dir_all(&trimmed_dir)
601 .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
602
603 if !records.is_empty() {
604 use grafeo_storage::wal::{LpgWal, WalConfig};
605 let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
606 for record in &records {
607 trimmed_wal.log(record)?;
608 }
609 trimmed_wal.flush()?;
610 drop(trimmed_wal);
611 }
612
613 std::fs::remove_dir_all(&wal_dir)
615 .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
616
617 let sidecar_dir = format!("{}.wal", output_path.display());
619 let sidecar_path = std::path::Path::new(&sidecar_dir);
620 if sidecar_path.exists() {
621 std::fs::remove_dir_all(sidecar_path)
622 .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
623 }
624 std::fs::rename(&trimmed_dir, sidecar_path)
625 .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
626
627 Ok(())
628}
629
630#[cfg(test)]
633mod tests {
634 use super::*;
635 use tempfile::TempDir;
636
637 #[test]
638 fn test_manifest_new() {
639 let manifest = BackupManifest::new();
640 assert_eq!(manifest.version, 1);
641 assert!(manifest.segments.is_empty());
642 assert!(manifest.latest_full().is_none());
643 assert!(manifest.epoch_range().is_none());
644 }
645
646 #[test]
647 fn test_manifest_with_segments() {
648 let mut manifest = BackupManifest::new();
649 manifest.segments.push(BackupSegment {
650 kind: BackupKind::Full,
651 filename: "backup_full_0000.grafeo".to_string(),
652 start_epoch: EpochId::new(0),
653 end_epoch: EpochId::new(100),
654 checksum: 12345,
655 size_bytes: 1024,
656 created_at_ms: 1000,
657 });
658 manifest.segments.push(BackupSegment {
659 kind: BackupKind::Incremental,
660 filename: "backup_incr_0001.wal".to_string(),
661 start_epoch: EpochId::new(101),
662 end_epoch: EpochId::new(200),
663 checksum: 67890,
664 size_bytes: 256,
665 created_at_ms: 2000,
666 });
667
668 let full = manifest.latest_full().unwrap();
669 assert_eq!(full.end_epoch, EpochId::new(100));
670
671 let incrs = manifest.incrementals_after(EpochId::new(100));
672 assert_eq!(incrs.len(), 1);
673 assert_eq!(incrs[0].start_epoch, EpochId::new(101));
674
675 let (start, end) = manifest.epoch_range().unwrap();
676 assert_eq!(start, EpochId::new(0));
677 assert_eq!(end, EpochId::new(200));
678 }
679
680 #[test]
681 fn test_manifest_round_trip() {
682 let dir = TempDir::new().unwrap();
683 let mut manifest = BackupManifest::new();
684 manifest.segments.push(BackupSegment {
685 kind: BackupKind::Full,
686 filename: "test.grafeo".to_string(),
687 start_epoch: EpochId::new(0),
688 end_epoch: EpochId::new(50),
689 checksum: 0,
690 size_bytes: 512,
691 created_at_ms: 0,
692 });
693
694 write_manifest(dir.path(), &manifest).unwrap();
695 let loaded = read_manifest(dir.path()).unwrap().unwrap();
696 assert_eq!(loaded.segments.len(), 1);
697 assert_eq!(loaded.segments[0].filename, "test.grafeo");
698 }
699
700 #[test]
701 fn test_manifest_not_found() {
702 let dir = TempDir::new().unwrap();
703 assert!(read_manifest(dir.path()).unwrap().is_none());
704 }
705
706 #[test]
707 fn test_backup_cursor_round_trip() {
708 let dir = TempDir::new().unwrap();
709 let cursor = BackupCursor {
710 backed_up_epoch: EpochId::new(42),
711 log_sequence: 7,
712 timestamp_ms: 12345,
713 };
714
715 write_backup_cursor(dir.path(), &cursor).unwrap();
716 let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
717 assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
718 assert_eq!(loaded.log_sequence, 7);
719 assert_eq!(loaded.timestamp_ms, 12345);
720 }
721
722 #[test]
723 fn test_backup_cursor_not_found() {
724 let dir = TempDir::new().unwrap();
725 assert!(read_backup_cursor(dir.path()).unwrap().is_none());
726 }
727
728 #[test]
729 fn test_backup_header_round_trip() {
730 let mut buf = Vec::new();
731 write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
732 assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
733
734 let (start, end, count) = read_backup_header(&buf).unwrap();
735 assert_eq!(start, EpochId::new(101));
736 assert_eq!(end, EpochId::new(200));
737 assert_eq!(count, 500);
738 }
739
740 #[test]
741 fn test_backup_header_invalid_magic() {
742 let data = vec![
743 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
744 0, 0, 0, 0, 0, 0, 0,
745 ];
746 assert!(read_backup_header(&data).is_err());
747 }
748
749 #[test]
750 fn test_backup_header_too_short() {
751 let data = vec![0, 0, 0, 0];
752 assert!(read_backup_header(&data).is_err());
753 }
754
755 #[test]
756 fn test_backup_kind_serialization() {
757 let config = bincode::config::standard();
758 let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
759 let (parsed, _): (BackupKind, _) =
760 bincode::serde::decode_from_slice(&encoded, config).unwrap();
761 assert_eq!(parsed, BackupKind::Full);
762 }
763}