use crate::manifest::{BackupManifest, ChangedFile, IncrementManifest};
use crate::restore::{ensure_empty_dir, verify_and_copy_full};
use powdb_storage::catalog::Catalog;
use powdb_storage::page::{page_lsn, PAGE_SIZE};
use std::io;
use std::io::{Seek, SeekFrom, Write};
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn is_durable(name: &str) -> bool {
name == "catalog.bin" || name.ends_with(".heap") || name.ends_with(".idx")
}
fn is_paged(name: &str) -> bool {
name.ends_with(".heap") || name.ends_with(".idx")
}
pub fn incremental_backup(
catalog: &mut Catalog,
base: &BackupManifest,
dest: &Path,
) -> io::Result<IncrementManifest> {
catalog.checkpoint()?;
let source_lsn = catalog.max_lsn();
let src = catalog.data_dir().to_path_buf();
std::fs::create_dir_all(dest)?;
let mut changed: Vec<ChangedFile> = Vec::new();
let mut entries: Vec<_> = std::fs::read_dir(&src)?
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().to_string())
.filter(|n| is_durable(n))
.collect();
entries.sort();
for name in entries {
let path = src.join(&name);
let bytes = std::fs::read(&path)?;
if !is_paged(&name) || bytes.len() % PAGE_SIZE != 0 {
let hash = blake3::hash(&bytes).to_hex().to_string();
let unchanged = base
.files
.iter()
.any(|f| f.name == name && f.blake3_hex == hash);
if unchanged {
continue;
}
std::fs::write(dest.join(&name), &bytes)?;
changed.push(ChangedFile::Whole {
name,
len: bytes.len() as u64,
blake3_hex: hash,
});
continue;
}
let total_pages = (bytes.len() / PAGE_SIZE) as u32;
let mut page_indices: Vec<u32> = Vec::new();
let mut delta: Vec<u8> = Vec::new();
for i in 0..total_pages {
let start = i as usize * PAGE_SIZE;
let chunk = &bytes[start..start + PAGE_SIZE];
if page_lsn(chunk) > base.source_lsn {
page_indices.push(i);
delta.extend_from_slice(&i.to_le_bytes());
delta.extend_from_slice(chunk);
}
}
if page_indices.is_empty() {
continue;
}
let delta_file = format!("{name}.delta");
std::fs::write(dest.join(&delta_file), &delta)?;
let delta_blake3_hex = blake3::hash(&delta).to_hex().to_string();
changed.push(ChangedFile::Pages {
name,
total_pages,
page_indices,
delta_file,
delta_len: delta.len() as u64,
delta_blake3_hex,
});
}
let manifest = IncrementManifest {
format_version: IncrementManifest::FORMAT_VERSION,
created_unix_secs: now_secs(),
base_source_lsn: base.source_lsn,
source_lsn,
changed,
};
manifest.write(dest)?;
Ok(manifest)
}
pub fn restore_chain(full_dir: &Path, increment_dirs: &[&Path], dest: &Path) -> io::Result<()> {
ensure_empty_dir(dest)?;
let full_manifest = BackupManifest::read(full_dir)?;
verify_and_copy_full(&full_manifest, full_dir, dest)?;
let mut running_lsn = full_manifest.source_lsn;
for inc_dir in increment_dirs {
let inc = IncrementManifest::read(inc_dir)?;
if inc.base_source_lsn != running_lsn {
return Err(io::Error::other(format!(
"increment chain broken: expected base lsn {}, increment built on {}",
running_lsn, inc.base_source_lsn
)));
}
for cf in &inc.changed {
match cf {
ChangedFile::Whole {
name,
len: _,
blake3_hex,
} => {
let bytes = std::fs::read(inc_dir.join(name))?;
let hash = blake3::hash(&bytes).to_hex().to_string();
if &hash != blake3_hex {
return Err(io::Error::other(format!(
"integrity check failed for {name}: blake3 mismatch (increment is corrupt)"
)));
}
std::fs::write(dest.join(name), &bytes)?;
}
ChangedFile::Pages {
name,
total_pages,
page_indices,
delta_file,
delta_len: _,
delta_blake3_hex,
} => {
let delta = std::fs::read(inc_dir.join(delta_file))?;
let hash = blake3::hash(&delta).to_hex().to_string();
if &hash != delta_blake3_hex {
return Err(io::Error::other(format!(
"integrity check failed for {delta_file}: blake3 mismatch (increment is corrupt)"
)));
}
apply_page_delta(&dest.join(name), *total_pages, page_indices, &delta)?;
}
}
}
running_lsn = inc.source_lsn;
}
let cat = Catalog::open(dest)?;
drop(cat);
Ok(())
}
fn apply_page_delta(
path: &Path,
total_pages: u32,
page_indices: &[u32],
delta: &[u8],
) -> io::Result<()> {
let record_len = 4 + PAGE_SIZE;
let expected = page_indices.len() * record_len;
if delta.len() != expected {
return Err(io::Error::other(format!(
"delta for {} has length {} but {} page records expected {}",
path.display(),
delta.len(),
page_indices.len(),
expected
)));
}
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
let target_len = total_pages as u64 * PAGE_SIZE as u64;
if file.metadata()?.len() < target_len {
file.set_len(target_len)?;
}
let mut off = 0usize;
while off < delta.len() {
let idx = u32::from_le_bytes([delta[off], delta[off + 1], delta[off + 2], delta[off + 3]]);
let page = &delta[off + 4..off + 4 + PAGE_SIZE];
file.seek(SeekFrom::Start(idx as u64 * PAGE_SIZE as u64))?;
file.write_all(page)?;
off += record_len;
}
file.flush()?;
Ok(())
}