Skip to main content

powdb_backup/
incremental.rs

1use crate::manifest::{BackupManifest, ChangedFile, IncrementManifest};
2use crate::restore::{ensure_empty_dir, verify_and_copy_full};
3use powdb_storage::catalog::Catalog;
4use powdb_storage::page::{page_lsn, PAGE_SIZE};
5use std::io;
6use std::io::{Seek, SeekFrom, Write};
7use std::path::Path;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10fn now_secs() -> u64 {
11    SystemTime::now()
12        .duration_since(UNIX_EPOCH)
13        .map(|d| d.as_secs())
14        .unwrap_or(0)
15}
16
17fn is_durable(name: &str) -> bool {
18    name == "catalog.bin" || name.ends_with(".heap") || name.ends_with(".idx")
19}
20
21fn is_paged(name: &str) -> bool {
22    name.ends_with(".heap") || name.ends_with(".idx")
23}
24
25/// Take an incremental backup: copy only the pages of each heap/idx file whose
26/// page LSN is greater than `base.source_lsn`, plus any non-paged file (e.g.
27/// catalog.bin) that changed. Builds on `base` (a full backup or a prior
28/// increment's effective state).
29pub fn incremental_backup(
30    catalog: &mut Catalog,
31    base: &BackupManifest,
32    dest: &Path,
33) -> io::Result<IncrementManifest> {
34    catalog.checkpoint()?;
35    let source_lsn = catalog.max_lsn();
36    let src = catalog.data_dir().to_path_buf();
37    std::fs::create_dir_all(dest)?;
38
39    let mut changed: Vec<ChangedFile> = Vec::new();
40
41    // Stable iteration order so manifests are deterministic.
42    let mut entries: Vec<_> = std::fs::read_dir(&src)?
43        .filter_map(|e| e.ok())
44        .map(|e| e.file_name().to_string_lossy().to_string())
45        .filter(|n| is_durable(n))
46        .collect();
47    entries.sort();
48
49    for name in entries {
50        let path = src.join(&name);
51        let bytes = std::fs::read(&path)?;
52
53        if !is_paged(&name) || bytes.len() % PAGE_SIZE != 0 {
54            // Whole-file path: catalog.bin, or a defensively-handled
55            // non-page-aligned paged file. Only record if changed vs base.
56            let hash = blake3::hash(&bytes).to_hex().to_string();
57            let unchanged = base
58                .files
59                .iter()
60                .any(|f| f.name == name && f.blake3_hex == hash);
61            if unchanged {
62                continue;
63            }
64            std::fs::write(dest.join(&name), &bytes)?;
65            changed.push(ChangedFile::Whole {
66                name,
67                len: bytes.len() as u64,
68                blake3_hex: hash,
69            });
70            continue;
71        }
72
73        // Paged file: diff by page LSN.
74        let total_pages = (bytes.len() / PAGE_SIZE) as u32;
75        let mut page_indices: Vec<u32> = Vec::new();
76        let mut delta: Vec<u8> = Vec::new();
77        for i in 0..total_pages {
78            let start = i as usize * PAGE_SIZE;
79            let chunk = &bytes[start..start + PAGE_SIZE];
80            if page_lsn(chunk) > base.source_lsn {
81                page_indices.push(i);
82                delta.extend_from_slice(&i.to_le_bytes());
83                delta.extend_from_slice(chunk);
84            }
85        }
86        if page_indices.is_empty() {
87            // Nothing changed for this file; omit entirely.
88            continue;
89        }
90        let delta_file = format!("{name}.delta");
91        std::fs::write(dest.join(&delta_file), &delta)?;
92        let delta_blake3_hex = blake3::hash(&delta).to_hex().to_string();
93        changed.push(ChangedFile::Pages {
94            name,
95            total_pages,
96            page_indices,
97            delta_file,
98            delta_len: delta.len() as u64,
99            delta_blake3_hex,
100        });
101    }
102
103    let manifest = IncrementManifest {
104        format_version: IncrementManifest::FORMAT_VERSION,
105        created_unix_secs: now_secs(),
106        base_source_lsn: base.source_lsn,
107        source_lsn,
108        changed,
109    };
110    manifest.write(dest)?;
111    Ok(manifest)
112}
113
114/// Rebuild a data dir from a full base backup plus an ordered chain of
115/// increments. Verifies chain continuity (each increment's `base_source_lsn`
116/// must equal the running high-water LSN) and blake3-checks every file/delta,
117/// then validates the result by reopening the catalog.
118///
119/// Coarse PITR = choosing which prefix of the chain to pass here.
120pub fn restore_chain(full_dir: &Path, increment_dirs: &[&Path], dest: &Path) -> io::Result<()> {
121    ensure_empty_dir(dest)?;
122
123    // 1. Lay down the full base.
124    let full_manifest = BackupManifest::read(full_dir)?;
125    verify_and_copy_full(&full_manifest, full_dir, dest)?;
126    let mut running_lsn = full_manifest.source_lsn;
127
128    // 2. Apply each increment in order.
129    for inc_dir in increment_dirs {
130        let inc = IncrementManifest::read(inc_dir)?;
131        if inc.base_source_lsn != running_lsn {
132            return Err(io::Error::other(format!(
133                "increment chain broken: expected base lsn {}, increment built on {}",
134                running_lsn, inc.base_source_lsn
135            )));
136        }
137        for cf in &inc.changed {
138            match cf {
139                ChangedFile::Whole {
140                    name,
141                    len: _,
142                    blake3_hex,
143                } => {
144                    let bytes = std::fs::read(inc_dir.join(name))?;
145                    let hash = blake3::hash(&bytes).to_hex().to_string();
146                    if &hash != blake3_hex {
147                        return Err(io::Error::other(format!(
148                            "integrity check failed for {name}: blake3 mismatch (increment is corrupt)"
149                        )));
150                    }
151                    std::fs::write(dest.join(name), &bytes)?;
152                }
153                ChangedFile::Pages {
154                    name,
155                    total_pages,
156                    page_indices,
157                    delta_file,
158                    delta_len: _,
159                    delta_blake3_hex,
160                } => {
161                    let delta = std::fs::read(inc_dir.join(delta_file))?;
162                    let hash = blake3::hash(&delta).to_hex().to_string();
163                    if &hash != delta_blake3_hex {
164                        return Err(io::Error::other(format!(
165                            "integrity check failed for {delta_file}: blake3 mismatch (increment is corrupt)"
166                        )));
167                    }
168                    apply_page_delta(&dest.join(name), *total_pages, page_indices, &delta)?;
169                }
170            }
171        }
172        running_lsn = inc.source_lsn;
173    }
174
175    // 3. Validate the reconstructed DB opens (LSN invariant).
176    let cat = Catalog::open(dest)?;
177    drop(cat);
178    Ok(())
179}
180
181/// Open/extend `path` to `total_pages * PAGE_SIZE` bytes, then write each
182/// page record from the delta at its target offset. Pages not in the delta are
183/// left untouched (already correct from the base / prior increments).
184fn apply_page_delta(
185    path: &Path,
186    total_pages: u32,
187    page_indices: &[u32],
188    delta: &[u8],
189) -> io::Result<()> {
190    let record_len = 4 + PAGE_SIZE;
191    let expected = page_indices.len() * record_len;
192    if delta.len() != expected {
193        return Err(io::Error::other(format!(
194            "delta for {} has length {} but {} page records expected {}",
195            path.display(),
196            delta.len(),
197            page_indices.len(),
198            expected
199        )));
200    }
201
202    let mut file = std::fs::OpenOptions::new()
203        .read(true)
204        .write(true)
205        .create(true)
206        .truncate(false)
207        .open(path)?;
208    let target_len = total_pages as u64 * PAGE_SIZE as u64;
209    if file.metadata()?.len() < target_len {
210        file.set_len(target_len)?;
211    }
212
213    let mut off = 0usize;
214    while off < delta.len() {
215        let idx = u32::from_le_bytes([delta[off], delta[off + 1], delta[off + 2], delta[off + 3]]);
216        let page = &delta[off + 4..off + 4 + PAGE_SIZE];
217        file.seek(SeekFrom::Start(idx as u64 * PAGE_SIZE as u64))?;
218        file.write_all(page)?;
219        off += record_len;
220    }
221    file.flush()?;
222    Ok(())
223}