powdb_backup/
incremental.rs1use crate::manifest::{BackupManifest, ChangedFile, IncrementManifest};
2use crate::restore::{ensure_empty_dir, verify_and_copy_full};
3use powdb_storage::catalog::Catalog;
4use powdb_storage::page::{page_lsn, PAGE_SIZE};
5use std::io;
6use std::io::{Seek, SeekFrom, Write};
7use std::path::Path;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10fn now_secs() -> u64 {
11 SystemTime::now()
12 .duration_since(UNIX_EPOCH)
13 .map(|d| d.as_secs())
14 .unwrap_or(0)
15}
16
17fn is_durable(name: &str) -> bool {
18 name == "catalog.bin" || name.ends_with(".heap") || name.ends_with(".idx")
19}
20
21fn is_paged(name: &str) -> bool {
22 name.ends_with(".heap") || name.ends_with(".idx")
23}
24
25pub fn incremental_backup(
30 catalog: &mut Catalog,
31 base: &BackupManifest,
32 dest: &Path,
33) -> io::Result<IncrementManifest> {
34 catalog.checkpoint()?;
35 let source_lsn = catalog.max_lsn();
36 let src = catalog.data_dir().to_path_buf();
37 std::fs::create_dir_all(dest)?;
38
39 let mut changed: Vec<ChangedFile> = Vec::new();
40
41 let mut entries: Vec<_> = std::fs::read_dir(&src)?
43 .filter_map(|e| e.ok())
44 .map(|e| e.file_name().to_string_lossy().to_string())
45 .filter(|n| is_durable(n))
46 .collect();
47 entries.sort();
48
49 for name in entries {
50 let path = src.join(&name);
51 let bytes = std::fs::read(&path)?;
52
53 if !is_paged(&name) || bytes.len() % PAGE_SIZE != 0 {
54 let hash = blake3::hash(&bytes).to_hex().to_string();
57 let unchanged = base
58 .files
59 .iter()
60 .any(|f| f.name == name && f.blake3_hex == hash);
61 if unchanged {
62 continue;
63 }
64 std::fs::write(dest.join(&name), &bytes)?;
65 changed.push(ChangedFile::Whole {
66 name,
67 len: bytes.len() as u64,
68 blake3_hex: hash,
69 });
70 continue;
71 }
72
73 let total_pages = (bytes.len() / PAGE_SIZE) as u32;
75 let mut page_indices: Vec<u32> = Vec::new();
76 let mut delta: Vec<u8> = Vec::new();
77 for i in 0..total_pages {
78 let start = i as usize * PAGE_SIZE;
79 let chunk = &bytes[start..start + PAGE_SIZE];
80 if page_lsn(chunk) > base.source_lsn {
81 page_indices.push(i);
82 delta.extend_from_slice(&i.to_le_bytes());
83 delta.extend_from_slice(chunk);
84 }
85 }
86 if page_indices.is_empty() {
87 continue;
89 }
90 let delta_file = format!("{name}.delta");
91 std::fs::write(dest.join(&delta_file), &delta)?;
92 let delta_blake3_hex = blake3::hash(&delta).to_hex().to_string();
93 changed.push(ChangedFile::Pages {
94 name,
95 total_pages,
96 page_indices,
97 delta_file,
98 delta_len: delta.len() as u64,
99 delta_blake3_hex,
100 });
101 }
102
103 let manifest = IncrementManifest {
104 format_version: IncrementManifest::FORMAT_VERSION,
105 created_unix_secs: now_secs(),
106 base_source_lsn: base.source_lsn,
107 source_lsn,
108 changed,
109 };
110 manifest.write(dest)?;
111 Ok(manifest)
112}
113
114pub fn restore_chain(full_dir: &Path, increment_dirs: &[&Path], dest: &Path) -> io::Result<()> {
121 ensure_empty_dir(dest)?;
122
123 let full_manifest = BackupManifest::read(full_dir)?;
125 verify_and_copy_full(&full_manifest, full_dir, dest)?;
126 let mut running_lsn = full_manifest.source_lsn;
127
128 for inc_dir in increment_dirs {
130 let inc = IncrementManifest::read(inc_dir)?;
131 if inc.base_source_lsn != running_lsn {
132 return Err(io::Error::other(format!(
133 "increment chain broken: expected base lsn {}, increment built on {}",
134 running_lsn, inc.base_source_lsn
135 )));
136 }
137 for cf in &inc.changed {
138 match cf {
139 ChangedFile::Whole {
140 name,
141 len: _,
142 blake3_hex,
143 } => {
144 let bytes = std::fs::read(inc_dir.join(name))?;
145 let hash = blake3::hash(&bytes).to_hex().to_string();
146 if &hash != blake3_hex {
147 return Err(io::Error::other(format!(
148 "integrity check failed for {name}: blake3 mismatch (increment is corrupt)"
149 )));
150 }
151 std::fs::write(dest.join(name), &bytes)?;
152 }
153 ChangedFile::Pages {
154 name,
155 total_pages,
156 page_indices,
157 delta_file,
158 delta_len: _,
159 delta_blake3_hex,
160 } => {
161 let delta = std::fs::read(inc_dir.join(delta_file))?;
162 let hash = blake3::hash(&delta).to_hex().to_string();
163 if &hash != delta_blake3_hex {
164 return Err(io::Error::other(format!(
165 "integrity check failed for {delta_file}: blake3 mismatch (increment is corrupt)"
166 )));
167 }
168 apply_page_delta(&dest.join(name), *total_pages, page_indices, &delta)?;
169 }
170 }
171 }
172 running_lsn = inc.source_lsn;
173 }
174
175 let cat = Catalog::open(dest)?;
177 drop(cat);
178 Ok(())
179}
180
181fn apply_page_delta(
185 path: &Path,
186 total_pages: u32,
187 page_indices: &[u32],
188 delta: &[u8],
189) -> io::Result<()> {
190 let record_len = 4 + PAGE_SIZE;
191 let expected = page_indices.len() * record_len;
192 if delta.len() != expected {
193 return Err(io::Error::other(format!(
194 "delta for {} has length {} but {} page records expected {}",
195 path.display(),
196 delta.len(),
197 page_indices.len(),
198 expected
199 )));
200 }
201
202 let mut file = std::fs::OpenOptions::new()
203 .read(true)
204 .write(true)
205 .create(true)
206 .truncate(false)
207 .open(path)?;
208 let target_len = total_pages as u64 * PAGE_SIZE as u64;
209 if file.metadata()?.len() < target_len {
210 file.set_len(target_len)?;
211 }
212
213 let mut off = 0usize;
214 while off < delta.len() {
215 let idx = u32::from_le_bytes([delta[off], delta[off + 1], delta[off + 2], delta[off + 3]]);
216 let page = &delta[off + 4..off + 4 + PAGE_SIZE];
217 file.seek(SeekFrom::Start(idx as u64 * PAGE_SIZE as u64))?;
218 file.write_all(page)?;
219 off += record_len;
220 }
221 file.flush()?;
222 Ok(())
223}