use std::path::Path;
use grafeo_common::types::EpochId;
use grafeo_common::utils::error::{Error, Result};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BackupKind {
Full,
Incremental,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupSegment {
pub kind: BackupKind,
pub filename: String,
pub start_epoch: EpochId,
pub end_epoch: EpochId,
pub checksum: u32,
pub size_bytes: u64,
pub created_at_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupManifest {
pub version: u32,
pub segments: Vec<BackupSegment>,
}
impl BackupManifest {
#[must_use]
pub fn new() -> Self {
Self {
version: 1,
segments: Vec::new(),
}
}
#[must_use]
pub fn latest_full(&self) -> Option<&BackupSegment> {
self.segments
.iter()
.rev()
.find(|s| s.kind == BackupKind::Full)
}
pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
self.segments
.iter()
.filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
.collect()
}
#[must_use]
pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
let first = self.segments.first()?;
let last = self.segments.last()?;
Some((first.start_epoch, last.end_epoch))
}
}
impl Default for BackupManifest {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupCursor {
pub backed_up_epoch: EpochId,
pub log_sequence: u64,
pub timestamp_ms: u64,
}
const MANIFEST_FILENAME: &str = "backup_manifest.json";
const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
let path = backup_dir.join(MANIFEST_FILENAME);
if !path.exists() {
return Ok(None);
}
let data = std::fs::read(&path)
.map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
let (manifest, _): (BackupManifest, _) =
bincode::serde::decode_from_slice(&data, bincode::config::standard())
.map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
Ok(Some(manifest))
}
pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
std::fs::create_dir_all(backup_dir)
.map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
let path = backup_dir.join(MANIFEST_FILENAME);
let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
.map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
std::fs::write(&temp_path, data)
.map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
std::fs::rename(&temp_path, &path)
.map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
Ok(())
}
pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
if !path.exists() {
return Ok(None);
}
let data = std::fs::read(&path)
.map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
let cursor: BackupCursor =
bincode::serde::decode_from_slice(&data, bincode::config::standard())
.map(|(c, _)| c)
.map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
Ok(Some(cursor))
}
pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
.map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
std::fs::write(&temp_path, &data)
.map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
std::fs::rename(&temp_path, &path)
.map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
Ok(())
}
pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
pub const BACKUP_VERSION: u32 = 1;
pub const BACKUP_HEADER_SIZE: usize = 32;
pub fn write_backup_header(
buf: &mut Vec<u8>,
start_epoch: EpochId,
end_epoch: EpochId,
record_count: u64,
) {
buf.extend_from_slice(&BACKUP_MAGIC);
buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
buf.extend_from_slice(&record_count.to_le_bytes());
}
pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
if data.len() < BACKUP_HEADER_SIZE {
return Err(Error::Internal(
"incremental backup file too short".to_string(),
));
}
if data[0..4] != BACKUP_MAGIC {
return Err(Error::Internal(
"invalid backup file magic bytes".to_string(),
));
}
let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
if version > BACKUP_VERSION {
return Err(Error::Internal(format!(
"unsupported backup version {version}, max supported is {BACKUP_VERSION}"
)));
}
let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
Ok((start_epoch, end_epoch, record_count))
}
pub(super) fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
use grafeo_storage::file::GrafeoFileManager;
use grafeo_storage::wal::LpgWal;
pub(super) fn do_backup_full(
backup_dir: &Path,
fm: &GrafeoFileManager,
wal: Option<&LpgWal>,
current_epoch: EpochId,
) -> Result<BackupSegment> {
std::fs::create_dir_all(backup_dir)
.map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
let segment_idx = manifest.segments.len();
let filename = format!("backup_full_{segment_idx:04}.grafeo");
let dest_path = backup_dir.join(&filename);
fm.copy_to(&dest_path)?;
let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
let file_data = std::fs::read(&dest_path)
.map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
let checksum = crc32fast::hash(&file_data);
let segment = BackupSegment {
kind: BackupKind::Full,
filename,
start_epoch: EpochId::new(0),
end_epoch: current_epoch,
checksum,
size_bytes: file_size,
created_at_ms: now_ms(),
};
manifest.segments.push(segment.clone());
write_manifest(backup_dir, &manifest)?;
if let Some(wal) = wal {
let backed_up_sequence = wal.current_sequence();
wal.rotate()
.map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
let cursor = BackupCursor {
backed_up_epoch: current_epoch,
log_sequence: backed_up_sequence,
timestamp_ms: now_ms(),
};
write_backup_cursor(wal.dir(), &cursor)?;
}
Ok(segment)
}
pub(super) fn do_backup_incremental(
backup_dir: &Path,
wal: &LpgWal,
current_epoch: EpochId,
) -> Result<BackupSegment> {
let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
Error::Internal("no backup manifest found; run a full backup first".to_string())
})?;
if manifest.latest_full().is_none() {
return Err(Error::Internal(
"no full backup in manifest; run a full backup first".to_string(),
));
}
let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
Error::Internal("no backup cursor found; run a full backup first".to_string())
})?;
let log_files = wal.log_files()?;
if log_files.is_empty() {
return Err(Error::Internal("no WAL log files to backup".to_string()));
}
let mut wal_data = Vec::new();
let mut record_count = 0u64;
for file_path in &log_files {
let seq = file_path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.strip_prefix("wal_"))
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
if seq <= cursor.log_sequence {
continue;
}
let file_bytes = std::fs::read(file_path).map_err(|e| {
Error::Internal(format!(
"failed to read WAL file {}: {e}",
file_path.display()
))
})?;
if !file_bytes.is_empty() {
wal_data.extend_from_slice(&file_bytes);
record_count += 1; }
}
if wal_data.is_empty() {
return Err(Error::Internal(
"no new WAL records since last backup".to_string(),
));
}
let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
let end_epoch = current_epoch;
let segment_idx = manifest.segments.len();
let filename = format!("backup_incr_{segment_idx:04}.wal");
let dest_path = backup_dir.join(&filename);
let mut output = Vec::new();
write_backup_header(&mut output, start_epoch, end_epoch, record_count);
output.extend_from_slice(&wal_data);
std::fs::write(&dest_path, &output)
.map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
let checksum = crc32fast::hash(&output);
let segment = BackupSegment {
kind: BackupKind::Incremental,
filename,
start_epoch,
end_epoch,
checksum,
size_bytes: output.len() as u64,
created_at_ms: now_ms(),
};
let mut manifest = manifest;
manifest.segments.push(segment.clone());
write_manifest(backup_dir, &manifest)?;
let backed_up_sequence = wal.current_sequence();
wal.rotate().map_err(|e| {
Error::Internal(format!(
"failed to rotate WAL after incremental backup: {e}"
))
})?;
let new_cursor = BackupCursor {
backed_up_epoch: current_epoch,
log_sequence: backed_up_sequence,
timestamp_ms: now_ms(),
};
write_backup_cursor(wal.dir(), &new_cursor)?;
Ok(segment)
}
pub(super) fn do_restore_to_epoch(
backup_dir: &Path,
target_epoch: EpochId,
output_path: &Path,
) -> Result<()> {
let manifest = read_manifest(backup_dir)?
.ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
let full = manifest
.segments
.iter()
.rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
.ok_or_else(|| {
Error::Internal(format!(
"no full backup covers epoch {}",
target_epoch.as_u64()
))
})?;
let full_path = backup_dir.join(&full.filename);
std::fs::copy(&full_path, output_path)
.map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
let incrementals: Vec<&BackupSegment> = manifest
.segments
.iter()
.filter(|s| {
s.kind == BackupKind::Incremental
&& s.start_epoch > full.end_epoch
&& s.start_epoch <= target_epoch
})
.collect();
if incrementals.is_empty() {
return Ok(());
}
let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
"{}.restore_wal",
output_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("db")
));
std::fs::create_dir_all(&wal_dir)
.map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
for (i, incr) in incrementals.iter().enumerate() {
let incr_path = backup_dir.join(&incr.filename);
let incr_data = std::fs::read(&incr_path).map_err(|e| {
Error::Internal(format!(
"failed to read incremental backup {}: {e}",
incr.filename
))
})?;
let actual_crc = crc32fast::hash(&incr_data);
if actual_crc != incr.checksum {
return Err(Error::Internal(format!(
"incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
incr.filename, incr.checksum,
)));
}
if incr_data.len() > BACKUP_HEADER_SIZE {
let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
std::fs::write(&wal_file, wal_frames).map_err(|e| {
Error::Internal(format!("failed to write WAL file for restore: {e}"))
})?;
}
}
let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
let records = recovery.recover_until_epoch(target_epoch)?;
let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
"{}.trimmed_wal",
wal_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("wal")
));
std::fs::create_dir_all(&trimmed_dir)
.map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
if !records.is_empty() {
use grafeo_storage::wal::{LpgWal, WalConfig};
let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
for record in &records {
trimmed_wal.log(record)?;
}
trimmed_wal.flush()?;
drop(trimmed_wal);
}
std::fs::remove_dir_all(&wal_dir)
.map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
let sidecar_dir = format!("{}.wal", output_path.display());
let sidecar_path = std::path::Path::new(&sidecar_dir);
if sidecar_path.exists() {
std::fs::remove_dir_all(sidecar_path)
.map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
}
std::fs::rename(&trimmed_dir, sidecar_path)
.map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manifest_new() {
let manifest = BackupManifest::new();
assert_eq!(manifest.version, 1);
assert!(manifest.segments.is_empty());
assert!(manifest.latest_full().is_none());
assert!(manifest.epoch_range().is_none());
}
#[test]
fn test_manifest_with_segments() {
let mut manifest = BackupManifest::new();
manifest.segments.push(BackupSegment {
kind: BackupKind::Full,
filename: "backup_full_0000.grafeo".to_string(),
start_epoch: EpochId::new(0),
end_epoch: EpochId::new(100),
checksum: 12345,
size_bytes: 1024,
created_at_ms: 1000,
});
manifest.segments.push(BackupSegment {
kind: BackupKind::Incremental,
filename: "backup_incr_0001.wal".to_string(),
start_epoch: EpochId::new(101),
end_epoch: EpochId::new(200),
checksum: 67890,
size_bytes: 256,
created_at_ms: 2000,
});
let full = manifest.latest_full().unwrap();
assert_eq!(full.end_epoch, EpochId::new(100));
let incrs = manifest.incrementals_after(EpochId::new(100));
assert_eq!(incrs.len(), 1);
assert_eq!(incrs[0].start_epoch, EpochId::new(101));
let (start, end) = manifest.epoch_range().unwrap();
assert_eq!(start, EpochId::new(0));
assert_eq!(end, EpochId::new(200));
}
#[test]
fn test_manifest_round_trip() {
let dir = TempDir::new().unwrap();
let mut manifest = BackupManifest::new();
manifest.segments.push(BackupSegment {
kind: BackupKind::Full,
filename: "test.grafeo".to_string(),
start_epoch: EpochId::new(0),
end_epoch: EpochId::new(50),
checksum: 0,
size_bytes: 512,
created_at_ms: 0,
});
write_manifest(dir.path(), &manifest).unwrap();
let loaded = read_manifest(dir.path()).unwrap().unwrap();
assert_eq!(loaded.segments.len(), 1);
assert_eq!(loaded.segments[0].filename, "test.grafeo");
}
#[test]
fn test_manifest_not_found() {
let dir = TempDir::new().unwrap();
assert!(read_manifest(dir.path()).unwrap().is_none());
}
#[test]
fn test_backup_cursor_round_trip() {
let dir = TempDir::new().unwrap();
let cursor = BackupCursor {
backed_up_epoch: EpochId::new(42),
log_sequence: 7,
timestamp_ms: 12345,
};
write_backup_cursor(dir.path(), &cursor).unwrap();
let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
assert_eq!(loaded.log_sequence, 7);
assert_eq!(loaded.timestamp_ms, 12345);
}
#[test]
fn test_backup_cursor_not_found() {
let dir = TempDir::new().unwrap();
assert!(read_backup_cursor(dir.path()).unwrap().is_none());
}
#[test]
fn test_backup_header_round_trip() {
let mut buf = Vec::new();
write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
let (start, end, count) = read_backup_header(&buf).unwrap();
assert_eq!(start, EpochId::new(101));
assert_eq!(end, EpochId::new(200));
assert_eq!(count, 500);
}
#[test]
fn test_backup_header_invalid_magic() {
let data = vec![
0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0,
];
assert!(read_backup_header(&data).is_err());
}
#[test]
fn test_backup_header_too_short() {
let data = vec![0, 0, 0, 0];
assert!(read_backup_header(&data).is_err());
}
#[test]
fn test_backup_kind_serialization() {
let config = bincode::config::standard();
let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
let (parsed, _): (BackupKind, _) =
bincode::serde::decode_from_slice(&encoded, config).unwrap();
assert_eq!(parsed, BackupKind::Full);
}
}