1use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31 Full,
33 Incremental,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40 pub kind: BackupKind,
42 pub filename: String,
44 pub start_epoch: EpochId,
46 pub end_epoch: EpochId,
48 pub checksum: u32,
50 pub size_bytes: u64,
52 pub created_at_ms: u64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59 pub version: u32,
61 pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 version: 1,
71 segments: Vec::new(),
72 }
73 }
74
75 #[must_use]
77 pub fn latest_full(&self) -> Option<&BackupSegment> {
78 self.segments
79 .iter()
80 .rev()
81 .find(|s| s.kind == BackupKind::Full)
82 }
83
84 pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86 self.segments
87 .iter()
88 .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89 .collect()
90 }
91
92 #[must_use]
94 pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95 let first = self.segments.first()?;
96 let last = self.segments.last()?;
97 Some((first.start_epoch, last.end_epoch))
98 }
99}
100
101impl Default for BackupManifest {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112 pub backed_up_epoch: EpochId,
114 pub log_sequence: u64,
116 pub timestamp_ms: u64,
118}
119
120const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133 let path = backup_dir.join(MANIFEST_FILENAME);
134 if !path.exists() {
135 return Ok(None);
136 }
137 let data = std::fs::read(&path)
138 .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139 let (manifest, _): (BackupManifest, _) =
140 bincode::serde::decode_from_slice(&data, bincode::config::standard())
141 .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142 Ok(Some(manifest))
143}
144
145pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153 std::fs::create_dir_all(backup_dir)
154 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156 let path = backup_dir.join(MANIFEST_FILENAME);
157 let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159 let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160 .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162 std::fs::write(&temp_path, data)
163 .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164 std::fs::rename(&temp_path, &path)
165 .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167 Ok(())
168}
169
170pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181 if !path.exists() {
182 return Ok(None);
183 }
184 let data = std::fs::read(&path)
185 .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186 let cursor: BackupCursor =
187 bincode::serde::decode_from_slice(&data, bincode::config::standard())
188 .map(|(c, _)| c)
189 .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190 Ok(Some(cursor))
191}
192
193pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201 let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202 let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204 let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205 .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207 std::fs::write(&temp_path, &data)
208 .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209 std::fs::rename(&temp_path, &path)
210 .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212 Ok(())
213}
214
215pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219pub const BACKUP_VERSION: u32 = 1;
221
222pub const BACKUP_HEADER_SIZE: usize = 32;
233
234pub fn write_backup_header(
236 buf: &mut Vec<u8>,
237 start_epoch: EpochId,
238 end_epoch: EpochId,
239 record_count: u64,
240) {
241 buf.extend_from_slice(&BACKUP_MAGIC);
242 buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243 buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244 buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245 buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260 if data.len() < BACKUP_HEADER_SIZE {
261 return Err(Error::Internal(
262 "incremental backup file too short".to_string(),
263 ));
264 }
265 if data[0..4] != BACKUP_MAGIC {
266 return Err(Error::Internal(
267 "invalid backup file magic bytes".to_string(),
268 ));
269 }
270 let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271 if version > BACKUP_VERSION {
272 return Err(Error::Internal(format!(
273 "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274 )));
275 }
276 let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277 let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278 let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279 Ok((start_epoch, end_epoch, record_count))
280}
281
282pub(super) fn now_ms() -> u64 {
284 std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)
286 .map(|d| d.as_millis() as u64)
287 .unwrap_or(0)
288}
289
290use grafeo_storage::wal::LpgWal;
293
294pub(super) fn do_backup_full(
304 backup_dir: &Path,
305 grafeo_file_path: &Path,
306 wal: Option<&LpgWal>,
307 current_epoch: EpochId,
308) -> Result<BackupSegment> {
309 std::fs::create_dir_all(backup_dir)
310 .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
311
312 let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
314 let segment_idx = manifest.segments.len();
315 let filename = format!("backup_full_{segment_idx:04}.grafeo");
316 let dest_path = backup_dir.join(&filename);
317
318 std::fs::copy(grafeo_file_path, &dest_path)
320 .map_err(|e| Error::Internal(format!("failed to copy database file for backup: {e}")))?;
321
322 let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
323 let file_data = std::fs::read(&dest_path)
324 .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
325 let checksum = crc32fast::hash(&file_data);
326
327 let segment = BackupSegment {
328 kind: BackupKind::Full,
329 filename,
330 start_epoch: EpochId::new(0),
331 end_epoch: current_epoch,
332 checksum,
333 size_bytes: file_size,
334 created_at_ms: now_ms(),
335 };
336
337 manifest.segments.push(segment.clone());
338 write_manifest(backup_dir, &manifest)?;
339
340 if let Some(wal) = wal {
342 let cursor = BackupCursor {
343 backed_up_epoch: current_epoch,
344 log_sequence: wal.current_sequence(),
345 timestamp_ms: now_ms(),
346 };
347 write_backup_cursor(wal.dir(), &cursor)?;
348 }
349
350 Ok(segment)
351}
352
353pub(super) fn do_backup_incremental(
363 backup_dir: &Path,
364 wal: &LpgWal,
365 current_epoch: EpochId,
366) -> Result<BackupSegment> {
367 let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
368 Error::Internal("no backup manifest found; run a full backup first".to_string())
369 })?;
370
371 if manifest.latest_full().is_none() {
372 return Err(Error::Internal(
373 "no full backup in manifest; run a full backup first".to_string(),
374 ));
375 }
376
377 let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
378 Error::Internal("no backup cursor found; run a full backup first".to_string())
379 })?;
380
381 let log_files = wal.log_files()?;
382 if log_files.is_empty() {
383 return Err(Error::Internal("no WAL log files to backup".to_string()));
384 }
385
386 let mut wal_data = Vec::new();
388 let mut record_count = 0u64;
389 for file_path in &log_files {
392 let seq = file_path
393 .file_stem()
394 .and_then(|s| s.to_str())
395 .and_then(|s| s.strip_prefix("wal_"))
396 .and_then(|s| s.parse::<u64>().ok())
397 .unwrap_or(0);
398
399 if seq <= cursor.log_sequence {
404 continue;
405 }
406
407 let file_bytes = std::fs::read(file_path).map_err(|e| {
408 Error::Internal(format!(
409 "failed to read WAL file {}: {e}",
410 file_path.display()
411 ))
412 })?;
413
414 if !file_bytes.is_empty() {
415 wal_data.extend_from_slice(&file_bytes);
416 record_count += 1; }
420 }
421
422 if wal_data.is_empty() {
423 return Err(Error::Internal(
424 "no new WAL records since last backup".to_string(),
425 ));
426 }
427
428 let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
429 let end_epoch = current_epoch;
430
431 let segment_idx = manifest.segments.len();
433 let filename = format!("backup_incr_{segment_idx:04}.wal");
434 let dest_path = backup_dir.join(&filename);
435
436 let mut output = Vec::new();
437 write_backup_header(&mut output, start_epoch, end_epoch, record_count);
438 output.extend_from_slice(&wal_data);
439
440 std::fs::write(&dest_path, &output)
441 .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
442
443 let checksum = crc32fast::hash(&output);
444 let segment = BackupSegment {
445 kind: BackupKind::Incremental,
446 filename,
447 start_epoch,
448 end_epoch,
449 checksum,
450 size_bytes: output.len() as u64,
451 created_at_ms: now_ms(),
452 };
453
454 let mut manifest = manifest;
456 manifest.segments.push(segment.clone());
457 write_manifest(backup_dir, &manifest)?;
458
459 let new_cursor = BackupCursor {
461 backed_up_epoch: current_epoch,
462 log_sequence: wal.current_sequence(),
463 timestamp_ms: now_ms(),
464 };
465 write_backup_cursor(wal.dir(), &new_cursor)?;
466
467 Ok(segment)
468}
469
470pub(super) fn do_restore_to_epoch(
484 backup_dir: &Path,
485 target_epoch: EpochId,
486 output_path: &Path,
487) -> Result<()> {
488 let manifest = read_manifest(backup_dir)?
489 .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
490
491 let full = manifest
493 .segments
494 .iter()
495 .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
496 .ok_or_else(|| {
497 Error::Internal(format!(
498 "no full backup covers epoch {}",
499 target_epoch.as_u64()
500 ))
501 })?;
502
503 let full_path = backup_dir.join(&full.filename);
505 std::fs::copy(&full_path, output_path)
506 .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
507
508 let incrementals: Vec<&BackupSegment> = manifest
510 .segments
511 .iter()
512 .filter(|s| {
513 s.kind == BackupKind::Incremental
514 && s.start_epoch > full.end_epoch
515 && s.start_epoch <= target_epoch
516 })
517 .collect();
518
519 if incrementals.is_empty() {
520 return Ok(());
522 }
523
524 let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
526 "{}.restore_wal",
527 output_path
528 .file_name()
529 .and_then(|n| n.to_str())
530 .unwrap_or("db")
531 ));
532 std::fs::create_dir_all(&wal_dir)
533 .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
534
535 for (i, incr) in incrementals.iter().enumerate() {
537 let incr_path = backup_dir.join(&incr.filename);
538 let incr_data = std::fs::read(&incr_path).map_err(|e| {
539 Error::Internal(format!(
540 "failed to read incremental backup {}: {e}",
541 incr.filename
542 ))
543 })?;
544
545 let actual_crc = crc32fast::hash(&incr_data);
547 if actual_crc != incr.checksum {
548 return Err(Error::Internal(format!(
549 "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
550 incr.filename, incr.checksum,
551 )));
552 }
553
554 if incr_data.len() > BACKUP_HEADER_SIZE {
556 let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
557 let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
558 std::fs::write(&wal_file, wal_frames).map_err(|e| {
559 Error::Internal(format!("failed to write WAL file for restore: {e}"))
560 })?;
561 }
562 }
563
564 let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
569 let records = recovery.recover_until_epoch(target_epoch)?;
570
571 let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
573 "{}.trimmed_wal",
574 wal_dir
575 .file_name()
576 .and_then(|n| n.to_str())
577 .unwrap_or("wal")
578 ));
579 std::fs::create_dir_all(&trimmed_dir)
580 .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
581
582 if !records.is_empty() {
583 use grafeo_storage::wal::{LpgWal, WalConfig};
584 let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
585 for record in &records {
586 trimmed_wal.log(record)?;
587 }
588 trimmed_wal.flush()?;
589 drop(trimmed_wal);
590 }
591
592 std::fs::remove_dir_all(&wal_dir)
594 .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
595
596 let sidecar_dir = format!("{}.wal", output_path.display());
598 let sidecar_path = std::path::Path::new(&sidecar_dir);
599 if sidecar_path.exists() {
600 std::fs::remove_dir_all(sidecar_path)
601 .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
602 }
603 std::fs::rename(&trimmed_dir, sidecar_path)
604 .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
605
606 Ok(())
607}
608
609#[cfg(test)]
612mod tests {
613 use super::*;
614 use tempfile::TempDir;
615
616 #[test]
617 fn test_manifest_new() {
618 let manifest = BackupManifest::new();
619 assert_eq!(manifest.version, 1);
620 assert!(manifest.segments.is_empty());
621 assert!(manifest.latest_full().is_none());
622 assert!(manifest.epoch_range().is_none());
623 }
624
625 #[test]
626 fn test_manifest_with_segments() {
627 let mut manifest = BackupManifest::new();
628 manifest.segments.push(BackupSegment {
629 kind: BackupKind::Full,
630 filename: "backup_full_0000.grafeo".to_string(),
631 start_epoch: EpochId::new(0),
632 end_epoch: EpochId::new(100),
633 checksum: 12345,
634 size_bytes: 1024,
635 created_at_ms: 1000,
636 });
637 manifest.segments.push(BackupSegment {
638 kind: BackupKind::Incremental,
639 filename: "backup_incr_0001.wal".to_string(),
640 start_epoch: EpochId::new(101),
641 end_epoch: EpochId::new(200),
642 checksum: 67890,
643 size_bytes: 256,
644 created_at_ms: 2000,
645 });
646
647 let full = manifest.latest_full().unwrap();
648 assert_eq!(full.end_epoch, EpochId::new(100));
649
650 let incrs = manifest.incrementals_after(EpochId::new(100));
651 assert_eq!(incrs.len(), 1);
652 assert_eq!(incrs[0].start_epoch, EpochId::new(101));
653
654 let (start, end) = manifest.epoch_range().unwrap();
655 assert_eq!(start, EpochId::new(0));
656 assert_eq!(end, EpochId::new(200));
657 }
658
659 #[test]
660 fn test_manifest_round_trip() {
661 let dir = TempDir::new().unwrap();
662 let mut manifest = BackupManifest::new();
663 manifest.segments.push(BackupSegment {
664 kind: BackupKind::Full,
665 filename: "test.grafeo".to_string(),
666 start_epoch: EpochId::new(0),
667 end_epoch: EpochId::new(50),
668 checksum: 0,
669 size_bytes: 512,
670 created_at_ms: 0,
671 });
672
673 write_manifest(dir.path(), &manifest).unwrap();
674 let loaded = read_manifest(dir.path()).unwrap().unwrap();
675 assert_eq!(loaded.segments.len(), 1);
676 assert_eq!(loaded.segments[0].filename, "test.grafeo");
677 }
678
679 #[test]
680 fn test_manifest_not_found() {
681 let dir = TempDir::new().unwrap();
682 assert!(read_manifest(dir.path()).unwrap().is_none());
683 }
684
685 #[test]
686 fn test_backup_cursor_round_trip() {
687 let dir = TempDir::new().unwrap();
688 let cursor = BackupCursor {
689 backed_up_epoch: EpochId::new(42),
690 log_sequence: 7,
691 timestamp_ms: 12345,
692 };
693
694 write_backup_cursor(dir.path(), &cursor).unwrap();
695 let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
696 assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
697 assert_eq!(loaded.log_sequence, 7);
698 assert_eq!(loaded.timestamp_ms, 12345);
699 }
700
701 #[test]
702 fn test_backup_cursor_not_found() {
703 let dir = TempDir::new().unwrap();
704 assert!(read_backup_cursor(dir.path()).unwrap().is_none());
705 }
706
707 #[test]
708 fn test_backup_header_round_trip() {
709 let mut buf = Vec::new();
710 write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
711 assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
712
713 let (start, end, count) = read_backup_header(&buf).unwrap();
714 assert_eq!(start, EpochId::new(101));
715 assert_eq!(end, EpochId::new(200));
716 assert_eq!(count, 500);
717 }
718
719 #[test]
720 fn test_backup_header_invalid_magic() {
721 let data = vec![
722 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
723 0, 0, 0, 0, 0, 0, 0,
724 ];
725 assert!(read_backup_header(&data).is_err());
726 }
727
728 #[test]
729 fn test_backup_header_too_short() {
730 let data = vec![0, 0, 0, 0];
731 assert!(read_backup_header(&data).is_err());
732 }
733
734 #[test]
735 fn test_backup_kind_serialization() {
736 let config = bincode::config::standard();
737 let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
738 let (parsed, _): (BackupKind, _) =
739 bincode::serde::decode_from_slice(&encoded, config).unwrap();
740 assert_eq!(parsed, BackupKind::Full);
741 }
742}