oxgraph_db/database/maintenance.rs
1//! Checkpointing: the auto-checkpoint policy, validation, and the
2//! crash-safe fold of base+overlay into a fresh base generation.
3
4use std::path::Path;
5
6use super::{
7 Db,
8 open::{base_file, create_empty_log, delta_file, file_len, write_superblock},
9};
10use crate::{
11 DbError, StorageError,
12 backing::{self, Base},
13 crc,
14 freeze::{self, FreezeStamps},
15 lock::WriterLock,
16 storage, wal,
17};
18
19/// Auto-checkpoint policy: decides when a dirty commit should fold the
20/// delta-log into a fresh base generation, bounding the log tail that recovery
21/// must replay.
22///
23/// The default is size-ratio: trigger when the delta-log grows past `factor`
24/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
25/// disables auto-triggering entirely (folded only by an explicit
26/// [`Db::compact`]).
27///
28/// # Performance
29///
30/// Copying this value is `O(1)`.
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32#[non_exhaustive]
33pub enum CheckpointPolicy {
34 /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
35 Manual,
36 /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
37 /// times the live base size (a small floor guards a tiny/empty base so the
38 /// gen-0 store does not checkpoint on its first commit).
39 SizeRatio {
40 /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
41 /// before the next dirty commit folds it.
42 factor: u32,
43 },
44}
45
46impl CheckpointPolicy {
47 /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
48 /// four times the live base size.
49 pub const DEFAULT_FACTOR: u32 = 4;
50
51 /// The base-size floor (bytes) below which [`CheckpointPolicy::SizeRatio`]
52 /// never fires: a base smaller than this is treated as exactly this large
53 /// when the ratio is evaluated, so a freshly created (near-empty) store is
54 /// not checkpointed on its first commits before the base carries
55 /// meaningful data.
56 pub const MIN_BASE_BYTES: u64 = 4 * 1024;
57
58 /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
59 /// should trigger an auto-checkpoint under this policy.
60 ///
61 /// # Performance
62 ///
63 /// This method is `O(1)`.
64 #[must_use]
65 const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
66 match self {
67 Self::Manual => false,
68 Self::SizeRatio { factor } => {
69 let floor = if base_bytes < Self::MIN_BASE_BYTES {
70 Self::MIN_BASE_BYTES
71 } else {
72 base_bytes
73 };
74 log_bytes > floor.saturating_mul(factor as u64)
75 }
76 }
77 }
78}
79
80impl Default for CheckpointPolicy {
81 /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
82 ///
83 /// # Performance
84 ///
85 /// This function is `O(1)`.
86 fn default() -> Self {
87 Self::SizeRatio {
88 factor: Self::DEFAULT_FACTOR,
89 }
90 }
91}
92
93impl Db {
94 /// Returns the configured auto-checkpoint policy.
95 ///
96 /// # Performance
97 ///
98 /// This method is `O(1)`.
99 #[must_use]
100 pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
101 self.checkpoint_policy
102 }
103
104 /// Sets the auto-checkpoint policy consulted after each dirty commit.
105 ///
106 /// # Performance
107 ///
108 /// This method is `O(1)`.
109 pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
110 self.checkpoint_policy = policy;
111 }
112
113 /// Validates the current handle with the strongest offline check:
114 /// re-reads the superblock, verifies the live base's container integrity
115 /// in full ([`oxgraph_snapshot::Snapshot::open_checked`] for the table
116 /// checksum, then [`oxgraph_snapshot::Snapshot::verify_all`] over EVERY
117 /// section — a mismatch names the failing section kind), and finally
118 /// attaches the base to run the structural bind checks (format version,
119 /// property sort order, posting bounds).
120 ///
121 /// # Errors
122 ///
123 /// Returns [`DbError`] when the superblock or base fails validation; a
124 /// checksum failure names the failing section.
125 ///
126 /// # Performance
127 ///
128 /// This method is `O(base bytes)`: one `verify_all` sweep plus one
129 /// verify-at-bind attach (two checksum passes over the base).
130 pub fn validate(&self) -> Result<(), DbError> {
131 wal::read_superblock(&self.root)?;
132 let backing =
133 backing::open_backing(&self.root.join(base_file(self.base_generation)), false)?;
134 let snapshot = oxgraph_snapshot::Snapshot::open_checked(&backing, crc::checksum_append)
135 .map_err(|error| StorageError::invalid_store(error.to_string()))?;
136 snapshot
137 .verify_all(crc::checksum_append)
138 .map_err(|error| StorageError::invalid_store(error.to_string()))?;
139 Base::attach(backing).map(|_base| ()).map_err(DbError::from)
140 }
141
142 /// Validates an OXGDB database at `path`.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`DbError`] when the store fails to open and recover.
147 ///
148 /// # Performance
149 ///
150 /// This function is `O(base bytes + log bytes)`.
151 pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
152 Self::open(path).map(|_database| ())
153 }
154
155 /// Folds the current base+overlay into a new base generation, rotating the
156 /// delta-log and republishing the superblock (a manual checkpoint).
157 ///
158 /// This is the checkpoint primitive, exposed here so the existing `compact`
159 /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
160 /// configured separately via [`Db::set_checkpoint_policy`].
161 ///
162 /// # Errors
163 ///
164 /// Returns [`DbError`] when encoding, writing, or publishing the new
165 /// generation fails.
166 ///
167 /// # Performance
168 ///
169 /// This method is `O(visible state bytes)`.
170 pub fn compact(&mut self) -> Result<(), DbError> {
171 self.checkpoint()
172 }
173
174 /// Folds the current base+overlay into base-`{g+1}`, creates an empty
175 /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
176 /// linearization point), then unlinks the old base and log.
177 ///
178 /// The order is crash-safe: the new base is fully durable BEFORE the
179 /// superblock names it (so a crash before the superblock leaves the OLD
180 /// superblock authoritative and the orphan new base is ignored), and the old
181 /// base/log are unlinked only AFTER the superblock names the new generation
182 /// (so a crash before the unlink leaves the NEW superblock authoritative and
183 /// the orphan old files are ignored). The
184 /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
185 ///
186 /// # Errors
187 ///
188 /// Returns [`DbError`] when encoding, writing, or publishing fails.
189 ///
190 /// # Performance
191 ///
192 /// This method is `O(visible state bytes)`.
193 pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
194 self.checkpoint_inner(
195 #[cfg(test)]
196 CheckpointStop::Complete,
197 )
198 }
199
200 /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
201 /// [`CheckpointStop`] that simulates a crash by returning early right after a
202 /// chosen fsync point, leaving the on-disk files exactly as a real crash
203 /// there would, so the crash-matrix test can reopen and assert recovery.
204 ///
205 /// # Errors
206 ///
207 /// Returns [`DbError`] when encoding, writing, or publishing fails.
208 ///
209 /// # Performance
210 ///
211 /// This method is `O(visible state bytes)`.
212 pub(super) fn checkpoint_inner(
213 &mut self,
214 #[cfg(test)] stop: CheckpointStop,
215 ) -> Result<(), DbError> {
216 let _lock = WriterLock::acquire(&self.root)?;
217 let next_generation = self
218 .base_generation
219 .checked_add(1)
220 .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
221 let view = self.current.view();
222 let commit_seq = self.current.lsn().get();
223 let base_bytes = freeze::freeze_view(
224 &view,
225 FreezeStamps {
226 commit_seq,
227 transaction_id: self.last_transaction_id.get(),
228 generation: next_generation,
229 },
230 )?;
231 // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
232 storage::atomic_write(
233 &self.root,
234 &self
235 .root
236 .join(format!("{}.tmp", base_file(next_generation))),
237 &self.root.join(base_file(next_generation)),
238 &base_bytes,
239 )?;
240 // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
241 create_empty_log(&self.root, next_generation)?;
242 // Crash point A: new base + new log durable, superblock NOT yet
243 // published. The OLD superblock still names `g`, so recovery uses the old
244 // generation; the new base/log are orphans.
245 #[cfg(test)]
246 if matches!(stop, CheckpointStop::BeforeSuperblock) {
247 return Ok(());
248 }
249 // (3) publish the superblock naming g+1 — the linearization point.
250 write_superblock(
251 &self.root,
252 next_generation,
253 commit_seq,
254 commit_seq,
255 self.last_transaction_id.get(),
256 )?;
257 // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
258 // Recovery uses the new generation; the old base/log are orphans.
259 #[cfg(test)]
260 if matches!(stop, CheckpointStop::BeforeRotate) {
261 return Ok(());
262 }
263 // Re-open over the new generation, then (4) unlink the old base + log.
264 let reopened = Self::open(&self.root)?;
265 let old_generation = self.base_generation;
266 let policy = self.checkpoint_policy;
267 self.current = reopened.current;
268 self.base_generation = reopened.base_generation;
269 self.last_transaction_id = reopened.last_transaction_id;
270 // The reopen reset the policy to the default; restore the caller's.
271 self.checkpoint_policy = policy;
272 let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
273 let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
274 let _ = storage::sync_directory(&self.root);
275 Ok(())
276 }
277
278 /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
279 /// delta-log has grown too large relative to the base. Called after a dirty
280 /// commit publishes its frame. A failed fold is surfaced so the caller can
281 /// observe it; the committed data is already durable in the log regardless.
282 ///
283 /// # Errors
284 ///
285 /// Returns [`DbError`] when the triggered fold fails.
286 ///
287 /// # Performance
288 ///
289 /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
290 pub(super) fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
291 let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
292 let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
293 if self
294 .checkpoint_policy
295 .should_checkpoint(log_bytes, base_bytes)
296 {
297 self.checkpoint()?;
298 }
299 Ok(())
300 }
301}
302
303/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
304/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
305/// the recovered state at each crash window.
306///
307/// The crash-matrix test that constructs the non-`Complete` variants is
308/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
309/// miri's isolation cannot model), so under miri only `Complete` is constructed
310/// and the other variants are expectedly unused.
311///
312/// # Performance
313///
314/// `perf: unspecified`; a test-only control tag.
315#[cfg(test)]
316#[cfg_attr(
317 miri,
318 expect(
319 dead_code,
320 reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
321 )
322)]
323#[derive(Clone, Copy, Debug, Eq, PartialEq)]
324pub(super) enum CheckpointStop {
325 /// Run the whole checkpoint (the production path).
326 Complete,
327 /// Stop after the new base + new log are durable, before the superblock is
328 /// published (the old superblock stays authoritative).
329 BeforeSuperblock,
330 /// Stop after the superblock names the new generation, before the old
331 /// base/log are unlinked (the new superblock is authoritative).
332 BeforeRotate,
333}