Skip to main content

oxgraph_db/database/
maintenance.rs

1//! Checkpointing: the auto-checkpoint policy, validation, and the
2//! crash-safe fold of base+overlay into a fresh base generation.
3
4use std::path::Path;
5
6use super::{
7    Db,
8    open::{base_file, create_empty_log, delta_file, file_len, write_superblock},
9};
10use crate::{
11    DbError, StorageError,
12    backing::{self, Base},
13    crc,
14    freeze::{self, FreezeStamps},
15    lock::WriterLock,
16    storage, wal,
17};
18
19/// Auto-checkpoint policy: decides when a dirty commit should fold the
20/// delta-log into a fresh base generation, bounding the log tail that recovery
21/// must replay.
22///
23/// The default is size-ratio: trigger when the delta-log grows past `factor`
24/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
25/// disables auto-triggering entirely (folded only by an explicit
26/// [`Db::compact`]).
27///
28/// # Performance
29///
30/// Copying this value is `O(1)`.
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32#[non_exhaustive]
33pub enum CheckpointPolicy {
34    /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
35    Manual,
36    /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
37    /// times the live base size (a small floor guards a tiny/empty base so the
38    /// gen-0 store does not checkpoint on its first commit).
39    SizeRatio {
40        /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
41        /// before the next dirty commit folds it.
42        factor: u32,
43    },
44}
45
46impl CheckpointPolicy {
47    /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
48    /// four times the live base size.
49    pub const DEFAULT_FACTOR: u32 = 4;
50
51    /// The base-size floor (bytes) below which [`CheckpointPolicy::SizeRatio`]
52    /// never fires: a base smaller than this is treated as exactly this large
53    /// when the ratio is evaluated, so a freshly created (near-empty) store is
54    /// not checkpointed on its first commits before the base carries
55    /// meaningful data.
56    pub const MIN_BASE_BYTES: u64 = 4 * 1024;
57
58    /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
59    /// should trigger an auto-checkpoint under this policy.
60    ///
61    /// # Performance
62    ///
63    /// This method is `O(1)`.
64    #[must_use]
65    const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
66        match self {
67            Self::Manual => false,
68            Self::SizeRatio { factor } => {
69                let floor = if base_bytes < Self::MIN_BASE_BYTES {
70                    Self::MIN_BASE_BYTES
71                } else {
72                    base_bytes
73                };
74                log_bytes > floor.saturating_mul(factor as u64)
75            }
76        }
77    }
78}
79
80impl Default for CheckpointPolicy {
81    /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
82    ///
83    /// # Performance
84    ///
85    /// This function is `O(1)`.
86    fn default() -> Self {
87        Self::SizeRatio {
88            factor: Self::DEFAULT_FACTOR,
89        }
90    }
91}
92
93impl Db {
94    /// Returns the configured auto-checkpoint policy.
95    ///
96    /// # Performance
97    ///
98    /// This method is `O(1)`.
99    #[must_use]
100    pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
101        self.checkpoint_policy
102    }
103
104    /// Sets the auto-checkpoint policy consulted after each dirty commit.
105    ///
106    /// # Performance
107    ///
108    /// This method is `O(1)`.
109    pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
110        self.checkpoint_policy = policy;
111    }
112
113    /// Validates the current handle with the strongest offline check:
114    /// re-reads the superblock, verifies the live base's container integrity
115    /// in full ([`oxgraph_snapshot::Snapshot::open_checked`] for the table
116    /// checksum, then [`oxgraph_snapshot::Snapshot::verify_all`] over EVERY
117    /// section — a mismatch names the failing section kind), and finally
118    /// attaches the base to run the structural bind checks (format version,
119    /// property sort order, posting bounds).
120    ///
121    /// # Errors
122    ///
123    /// Returns [`DbError`] when the superblock or base fails validation; a
124    /// checksum failure names the failing section.
125    ///
126    /// # Performance
127    ///
128    /// This method is `O(base bytes)`: one `verify_all` sweep plus one
129    /// verify-at-bind attach (two checksum passes over the base).
130    pub fn validate(&self) -> Result<(), DbError> {
131        wal::read_superblock(&self.root)?;
132        let backing =
133            backing::open_backing(&self.root.join(base_file(self.base_generation)), false)?;
134        let snapshot = oxgraph_snapshot::Snapshot::open_checked(&backing, crc::checksum_append)
135            .map_err(|error| StorageError::invalid_store(error.to_string()))?;
136        snapshot
137            .verify_all(crc::checksum_append)
138            .map_err(|error| StorageError::invalid_store(error.to_string()))?;
139        Base::attach(backing).map(|_base| ()).map_err(DbError::from)
140    }
141
142    /// Validates an OXGDB database at `path`.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`DbError`] when the store fails to open and recover.
147    ///
148    /// # Performance
149    ///
150    /// This function is `O(base bytes + log bytes)`.
151    pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
152        Self::open(path).map(|_database| ())
153    }
154
155    /// Folds the current base+overlay into a new base generation, rotating the
156    /// delta-log and republishing the superblock (a manual checkpoint).
157    ///
158    /// This is the checkpoint primitive, exposed here so the existing `compact`
159    /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
160    /// configured separately via [`Db::set_checkpoint_policy`].
161    ///
162    /// # Errors
163    ///
164    /// Returns [`DbError`] when encoding, writing, or publishing the new
165    /// generation fails.
166    ///
167    /// # Performance
168    ///
169    /// This method is `O(visible state bytes)`.
170    pub fn compact(&mut self) -> Result<(), DbError> {
171        self.checkpoint()
172    }
173
174    /// Folds the current base+overlay into base-`{g+1}`, creates an empty
175    /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
176    /// linearization point), then unlinks the old base and log.
177    ///
178    /// The order is crash-safe: the new base is fully durable BEFORE the
179    /// superblock names it (so a crash before the superblock leaves the OLD
180    /// superblock authoritative and the orphan new base is ignored), and the old
181    /// base/log are unlinked only AFTER the superblock names the new generation
182    /// (so a crash before the unlink leaves the NEW superblock authoritative and
183    /// the orphan old files are ignored). The
184    /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
185    ///
186    /// # Errors
187    ///
188    /// Returns [`DbError`] when encoding, writing, or publishing fails.
189    ///
190    /// # Performance
191    ///
192    /// This method is `O(visible state bytes)`.
193    pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
194        self.checkpoint_inner(
195            #[cfg(test)]
196            CheckpointStop::Complete,
197        )
198    }
199
200    /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
201    /// [`CheckpointStop`] that simulates a crash by returning early right after a
202    /// chosen fsync point, leaving the on-disk files exactly as a real crash
203    /// there would, so the crash-matrix test can reopen and assert recovery.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`DbError`] when encoding, writing, or publishing fails.
208    ///
209    /// # Performance
210    ///
211    /// This method is `O(visible state bytes)`.
212    pub(super) fn checkpoint_inner(
213        &mut self,
214        #[cfg(test)] stop: CheckpointStop,
215    ) -> Result<(), DbError> {
216        let _lock = WriterLock::acquire(&self.root)?;
217        let next_generation = self
218            .base_generation
219            .checked_add(1)
220            .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
221        let view = self.current.view();
222        let commit_seq = self.current.lsn().get();
223        let base_bytes = freeze::freeze_view(
224            &view,
225            FreezeStamps {
226                commit_seq,
227                transaction_id: self.last_transaction_id.get(),
228                generation: next_generation,
229            },
230        )?;
231        // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
232        storage::atomic_write(
233            &self.root,
234            &self
235                .root
236                .join(format!("{}.tmp", base_file(next_generation))),
237            &self.root.join(base_file(next_generation)),
238            &base_bytes,
239        )?;
240        // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
241        create_empty_log(&self.root, next_generation)?;
242        // Crash point A: new base + new log durable, superblock NOT yet
243        // published. The OLD superblock still names `g`, so recovery uses the old
244        // generation; the new base/log are orphans.
245        #[cfg(test)]
246        if matches!(stop, CheckpointStop::BeforeSuperblock) {
247            return Ok(());
248        }
249        // (3) publish the superblock naming g+1 — the linearization point.
250        write_superblock(
251            &self.root,
252            next_generation,
253            commit_seq,
254            commit_seq,
255            self.last_transaction_id.get(),
256        )?;
257        // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
258        // Recovery uses the new generation; the old base/log are orphans.
259        #[cfg(test)]
260        if matches!(stop, CheckpointStop::BeforeRotate) {
261            return Ok(());
262        }
263        // Re-open over the new generation, then (4) unlink the old base + log.
264        let reopened = Self::open(&self.root)?;
265        let old_generation = self.base_generation;
266        let policy = self.checkpoint_policy;
267        self.current = reopened.current;
268        self.base_generation = reopened.base_generation;
269        self.last_transaction_id = reopened.last_transaction_id;
270        // The reopen reset the policy to the default; restore the caller's.
271        self.checkpoint_policy = policy;
272        let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
273        let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
274        let _ = storage::sync_directory(&self.root);
275        Ok(())
276    }
277
278    /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
279    /// delta-log has grown too large relative to the base. Called after a dirty
280    /// commit publishes its frame. A failed fold is surfaced so the caller can
281    /// observe it; the committed data is already durable in the log regardless.
282    ///
283    /// # Errors
284    ///
285    /// Returns [`DbError`] when the triggered fold fails.
286    ///
287    /// # Performance
288    ///
289    /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
290    pub(super) fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
291        let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
292        let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
293        if self
294            .checkpoint_policy
295            .should_checkpoint(log_bytes, base_bytes)
296        {
297            self.checkpoint()?;
298        }
299        Ok(())
300    }
301}
302
303/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
304/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
305/// the recovered state at each crash window.
306///
307/// The crash-matrix test that constructs the non-`Complete` variants is
308/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
309/// miri's isolation cannot model), so under miri only `Complete` is constructed
310/// and the other variants are expectedly unused.
311///
312/// # Performance
313///
314/// `perf: unspecified`; a test-only control tag.
315#[cfg(test)]
316#[cfg_attr(
317    miri,
318    expect(
319        dead_code,
320        reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
321    )
322)]
323#[derive(Clone, Copy, Debug, Eq, PartialEq)]
324pub(super) enum CheckpointStop {
325    /// Run the whole checkpoint (the production path).
326    Complete,
327    /// Stop after the new base + new log are durable, before the superblock is
328    /// published (the old superblock stays authoritative).
329    BeforeSuperblock,
330    /// Stop after the superblock names the new generation, before the old
331    /// base/log are unlinked (the new superblock is authoritative).
332    BeforeRotate,
333}