1use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31 Full,
33 Incremental,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40 pub kind: BackupKind,
42 pub filename: String,
44 pub start_epoch: EpochId,
46 pub end_epoch: EpochId,
48 pub checksum: u32,
50 pub size_bytes: u64,
52 pub created_at_ms: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59 pub version: u32,
61 pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 version: 1,
71 segments: Vec::new(),
72 }
73 }
74
75 #[must_use]
77 pub fn latest_full(&self) -> Option<&BackupSegment> {
78 self.segments
79 .iter()
80 .rev()
81 .find(|s| s.kind == BackupKind::Full)
82 }
83
84 pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86 self.segments
87 .iter()
88 .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89 .collect()
90 }
91
92 #[must_use]
94 pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95 let first = self.segments.first()?;
96 let last = self.segments.last()?;
97 Some((first.start_epoch, last.end_epoch))
98 }
99}
100
101impl Default for BackupManifest {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112 pub backed_up_epoch: EpochId,
114 pub log_sequence: u64,
116 pub timestamp_ms: u64,
118}
119
120const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133 let path = backup_dir.join(MANIFEST_FILENAME);
134 if !path.exists() {
135 return Ok(None);
136 }
137 let data = std::fs::read(&path)
138 .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139 let (manifest, _): (BackupManifest, _) =
140 bincode::serde::decode_from_slice(&data, bincode::config::standard())
141 .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142 Ok(Some(manifest))
143}
144
145pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153 std::fs::create_dir_all(backup_dir)
154 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156 let path = backup_dir.join(MANIFEST_FILENAME);
157 let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159 let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160 .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162 std::fs::write(&temp_path, data)
163 .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164 std::fs::rename(&temp_path, &path)
165 .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167 Ok(())
168}
169
170pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181 if !path.exists() {
182 return Ok(None);
183 }
184 let data = std::fs::read(&path)
185 .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186 let cursor: BackupCursor =
187 bincode::serde::decode_from_slice(&data, bincode::config::standard())
188 .map(|(c, _)| c)
189 .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190 Ok(Some(cursor))
191}
192
193pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202 let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204 let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205 .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207 std::fs::write(&temp_path, &data)
208 .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209 std::fs::rename(&temp_path, &path)
210 .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212 Ok(())
213}
214
215pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219pub const BACKUP_VERSION: u32 = 1;
221
222pub const BACKUP_HEADER_SIZE: usize = 32;
233
234pub fn write_backup_header(
236 buf: &mut Vec<u8>,
237 start_epoch: EpochId,
238 end_epoch: EpochId,
239 record_count: u64,
240) {
241 buf.extend_from_slice(&BACKUP_MAGIC);
242 buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243 buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244 buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245 buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260 if data.len() < BACKUP_HEADER_SIZE {
261 return Err(Error::Internal(
262 "incremental backup file too short".to_string(),
263 ));
264 }
265 if data[0..4] != BACKUP_MAGIC {
266 return Err(Error::Internal(
267 "invalid backup file magic bytes".to_string(),
268 ));
269 }
270 let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271 if version > BACKUP_VERSION {
272 return Err(Error::Internal(format!(
273 "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274 )));
275 }
276 let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277 let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278 let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279 Ok((start_epoch, end_epoch, record_count))
280}
281
282pub(super) fn now_ms() -> u64 {
284 std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)
286 .map(|d| d.as_millis() as u64)
287 .unwrap_or(0)
288}
289
290use grafeo_storage::file::GrafeoFileManager;
293use grafeo_storage::wal::LpgWal;
294
295pub(super) fn do_backup_full(
308 backup_dir: &Path,
309 fm: &GrafeoFileManager,
310 wal: Option<&LpgWal>,
311 current_epoch: EpochId,
312) -> Result<BackupSegment> {
313 std::fs::create_dir_all(backup_dir)
314 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
315
316 let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
318 let segment_idx = manifest.segments.len();
319 let filename = format!("backup_full_{segment_idx:04}.grafeo");
320 let dest_path = backup_dir.join(&filename);
321
322 fm.copy_to(&dest_path)?;
324
325 let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
326 let file_data = std::fs::read(&dest_path)
327 .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
328 let checksum = crc32fast::hash(&file_data);
329
330 let segment = BackupSegment {
331 kind: BackupKind::Full,
332 filename,
333 start_epoch: EpochId::new(0),
334 end_epoch: current_epoch,
335 checksum,
336 size_bytes: file_size,
337 created_at_ms: now_ms(),
338 };
339
340 manifest.segments.push(segment.clone());
341 write_manifest(backup_dir, &manifest)?;
342
343 if let Some(wal) = wal {
345 let cursor = BackupCursor {
346 backed_up_epoch: current_epoch,
347 log_sequence: wal.current_sequence(),
348 timestamp_ms: now_ms(),
349 };
350 write_backup_cursor(wal.dir(), &cursor)?;
351 }
352
353 Ok(segment)
354}
355
356pub(super) fn do_backup_incremental(
366 backup_dir: &Path,
367 wal: &LpgWal,
368 current_epoch: EpochId,
369) -> Result<BackupSegment> {
370 let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
371 Error::Internal("no backup manifest found; run a full backup first".to_string())
372 })?;
373
374 if manifest.latest_full().is_none() {
375 return Err(Error::Internal(
376 "no full backup in manifest; run a full backup first".to_string(),
377 ));
378 }
379
380 let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
381 Error::Internal("no backup cursor found; run a full backup first".to_string())
382 })?;
383
384 let log_files = wal.log_files()?;
385 if log_files.is_empty() {
386 return Err(Error::Internal("no WAL log files to backup".to_string()));
387 }
388
389 let mut wal_data = Vec::new();
391 let mut record_count = 0u64;
392 for file_path in &log_files {
395 let seq = file_path
396 .file_stem()
397 .and_then(|s| s.to_str())
398 .and_then(|s| s.strip_prefix("wal_"))
399 .and_then(|s| s.parse::<u64>().ok())
400 .unwrap_or(0);
401
402 if seq <= cursor.log_sequence {
407 continue;
408 }
409
410 let file_bytes = std::fs::read(file_path).map_err(|e| {
411 Error::Internal(format!(
412 "failed to read WAL file {}: {e}",
413 file_path.display()
414 ))
415 })?;
416
417 if !file_bytes.is_empty() {
418 wal_data.extend_from_slice(&file_bytes);
419 record_count += 1; }
423 }
424
425 if wal_data.is_empty() {
426 return Err(Error::Internal(
427 "no new WAL records since last backup".to_string(),
428 ));
429 }
430
431 let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
432 let end_epoch = current_epoch;
433
434 let segment_idx = manifest.segments.len();
436 let filename = format!("backup_incr_{segment_idx:04}.wal");
437 let dest_path = backup_dir.join(&filename);
438
439 let mut output = Vec::new();
440 write_backup_header(&mut output, start_epoch, end_epoch, record_count);
441 output.extend_from_slice(&wal_data);
442
443 std::fs::write(&dest_path, &output)
444 .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
445
446 let checksum = crc32fast::hash(&output);
447 let segment = BackupSegment {
448 kind: BackupKind::Incremental,
449 filename,
450 start_epoch,
451 end_epoch,
452 checksum,
453 size_bytes: output.len() as u64,
454 created_at_ms: now_ms(),
455 };
456
457 let mut manifest = manifest;
459 manifest.segments.push(segment.clone());
460 write_manifest(backup_dir, &manifest)?;
461
462 let new_cursor = BackupCursor {
464 backed_up_epoch: current_epoch,
465 log_sequence: wal.current_sequence(),
466 timestamp_ms: now_ms(),
467 };
468 write_backup_cursor(wal.dir(), &new_cursor)?;
469
470 Ok(segment)
471}
472
473pub(super) fn do_restore_to_epoch(
487 backup_dir: &Path,
488 target_epoch: EpochId,
489 output_path: &Path,
490) -> Result<()> {
491 let manifest = read_manifest(backup_dir)?
492 .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
493
494 let full = manifest
496 .segments
497 .iter()
498 .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
499 .ok_or_else(|| {
500 Error::Internal(format!(
501 "no full backup covers epoch {}",
502 target_epoch.as_u64()
503 ))
504 })?;
505
506 let full_path = backup_dir.join(&full.filename);
508 std::fs::copy(&full_path, output_path)
509 .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
510
511 let incrementals: Vec<&BackupSegment> = manifest
513 .segments
514 .iter()
515 .filter(|s| {
516 s.kind == BackupKind::Incremental
517 && s.start_epoch > full.end_epoch
518 && s.start_epoch <= target_epoch
519 })
520 .collect();
521
522 if incrementals.is_empty() {
523 return Ok(());
525 }
526
527 let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
529 "{}.restore_wal",
530 output_path
531 .file_name()
532 .and_then(|n| n.to_str())
533 .unwrap_or("db")
534 ));
535 std::fs::create_dir_all(&wal_dir)
536 .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
537
538 for (i, incr) in incrementals.iter().enumerate() {
540 let incr_path = backup_dir.join(&incr.filename);
541 let incr_data = std::fs::read(&incr_path).map_err(|e| {
542 Error::Internal(format!(
543 "failed to read incremental backup {}: {e}",
544 incr.filename
545 ))
546 })?;
547
548 let actual_crc = crc32fast::hash(&incr_data);
550 if actual_crc != incr.checksum {
551 return Err(Error::Internal(format!(
552 "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
553 incr.filename, incr.checksum,
554 )));
555 }
556
557 if incr_data.len() > BACKUP_HEADER_SIZE {
559 let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
560 let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
561 std::fs::write(&wal_file, wal_frames).map_err(|e| {
562 Error::Internal(format!("failed to write WAL file for restore: {e}"))
563 })?;
564 }
565 }
566
567 let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
572 let records = recovery.recover_until_epoch(target_epoch)?;
573
574 let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
576 "{}.trimmed_wal",
577 wal_dir
578 .file_name()
579 .and_then(|n| n.to_str())
580 .unwrap_or("wal")
581 ));
582 std::fs::create_dir_all(&trimmed_dir)
583 .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
584
585 if !records.is_empty() {
586 use grafeo_storage::wal::{LpgWal, WalConfig};
587 let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
588 for record in &records {
589 trimmed_wal.log(record)?;
590 }
591 trimmed_wal.flush()?;
592 drop(trimmed_wal);
593 }
594
595 std::fs::remove_dir_all(&wal_dir)
597 .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
598
599 let sidecar_dir = format!("{}.wal", output_path.display());
601 let sidecar_path = std::path::Path::new(&sidecar_dir);
602 if sidecar_path.exists() {
603 std::fs::remove_dir_all(sidecar_path)
604 .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
605 }
606 std::fs::rename(&trimmed_dir, sidecar_path)
607 .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
608
609 Ok(())
610}
611
612#[cfg(test)]
615mod tests {
616 use super::*;
617 use tempfile::TempDir;
618
619 #[test]
620 fn test_manifest_new() {
621 let manifest = BackupManifest::new();
622 assert_eq!(manifest.version, 1);
623 assert!(manifest.segments.is_empty());
624 assert!(manifest.latest_full().is_none());
625 assert!(manifest.epoch_range().is_none());
626 }
627
628 #[test]
629 fn test_manifest_with_segments() {
630 let mut manifest = BackupManifest::new();
631 manifest.segments.push(BackupSegment {
632 kind: BackupKind::Full,
633 filename: "backup_full_0000.grafeo".to_string(),
634 start_epoch: EpochId::new(0),
635 end_epoch: EpochId::new(100),
636 checksum: 12345,
637 size_bytes: 1024,
638 created_at_ms: 1000,
639 });
640 manifest.segments.push(BackupSegment {
641 kind: BackupKind::Incremental,
642 filename: "backup_incr_0001.wal".to_string(),
643 start_epoch: EpochId::new(101),
644 end_epoch: EpochId::new(200),
645 checksum: 67890,
646 size_bytes: 256,
647 created_at_ms: 2000,
648 });
649
650 let full = manifest.latest_full().unwrap();
651 assert_eq!(full.end_epoch, EpochId::new(100));
652
653 let incrs = manifest.incrementals_after(EpochId::new(100));
654 assert_eq!(incrs.len(), 1);
655 assert_eq!(incrs[0].start_epoch, EpochId::new(101));
656
657 let (start, end) = manifest.epoch_range().unwrap();
658 assert_eq!(start, EpochId::new(0));
659 assert_eq!(end, EpochId::new(200));
660 }
661
662 #[test]
663 fn test_manifest_round_trip() {
664 let dir = TempDir::new().unwrap();
665 let mut manifest = BackupManifest::new();
666 manifest.segments.push(BackupSegment {
667 kind: BackupKind::Full,
668 filename: "test.grafeo".to_string(),
669 start_epoch: EpochId::new(0),
670 end_epoch: EpochId::new(50),
671 checksum: 0,
672 size_bytes: 512,
673 created_at_ms: 0,
674 });
675
676 write_manifest(dir.path(), &manifest).unwrap();
677 let loaded = read_manifest(dir.path()).unwrap().unwrap();
678 assert_eq!(loaded.segments.len(), 1);
679 assert_eq!(loaded.segments[0].filename, "test.grafeo");
680 }
681
682 #[test]
683 fn test_manifest_not_found() {
684 let dir = TempDir::new().unwrap();
685 assert!(read_manifest(dir.path()).unwrap().is_none());
686 }
687
688 #[test]
689 fn test_backup_cursor_round_trip() {
690 let dir = TempDir::new().unwrap();
691 let cursor = BackupCursor {
692 backed_up_epoch: EpochId::new(42),
693 log_sequence: 7,
694 timestamp_ms: 12345,
695 };
696
697 write_backup_cursor(dir.path(), &cursor).unwrap();
698 let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
699 assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
700 assert_eq!(loaded.log_sequence, 7);
701 assert_eq!(loaded.timestamp_ms, 12345);
702 }
703
704 #[test]
705 fn test_backup_cursor_not_found() {
706 let dir = TempDir::new().unwrap();
707 assert!(read_backup_cursor(dir.path()).unwrap().is_none());
708 }
709
710 #[test]
711 fn test_backup_header_round_trip() {
712 let mut buf = Vec::new();
713 write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
714 assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
715
716 let (start, end, count) = read_backup_header(&buf).unwrap();
717 assert_eq!(start, EpochId::new(101));
718 assert_eq!(end, EpochId::new(200));
719 assert_eq!(count, 500);
720 }
721
722 #[test]
723 fn test_backup_header_invalid_magic() {
724 let data = vec![
725 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
726 0, 0, 0, 0, 0, 0, 0,
727 ];
728 assert!(read_backup_header(&data).is_err());
729 }
730
731 #[test]
732 fn test_backup_header_too_short() {
733 let data = vec![0, 0, 0, 0];
734 assert!(read_backup_header(&data).is_err());
735 }
736
737 #[test]
738 fn test_backup_kind_serialization() {
739 let config = bincode::config::standard();
740 let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
741 let (parsed, _): (BackupKind, _) =
742 bincode::serde::decode_from_slice(&encoded, config).unwrap();
743 assert_eq!(parsed, BackupKind::Full);
744 }
745}