Skip to main content

oxgraph_db/
database.rs

1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Database`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`begin_read` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13    borrow::Cow,
14    path::{Path, PathBuf},
15    sync::Arc,
16};
17
18use crate::{
19    Catalog, CheckpointGeneration, CommitSeq, DbError, ElementId, ElementRecord, GraphProjection,
20    HypergraphProjection, IncidenceId, IncidenceRecord, IndexId, LabelId, PreparedQuery,
21    ProjectionDefinition, ProjectionId, PropertyKeyId, PropertySubject, PropertyType,
22    PropertyValue, QueryLanguage, QueryResult, RelationId, RelationRecord, RelationTypeId, RoleId,
23    TransactionId,
24    backing::Base,
25    catalog::{IndexDefinition, PropertyFamily},
26    freeze::{self, FreezeStamps},
27    lock::WriterLock,
28    overlay::{Overlay, Snapshot, StateView, WriteOverlay},
29    projection,
30    state::NextIds,
31    storage,
32    traversal::{self, TraversalOptions, TraversalResult},
33    wal,
34    wire::SuperblockRecord,
35};
36
37/// Lookup input for a cataloged index.
38///
39/// This type makes index lookup shape explicit: membership indexes accept
40/// [`IndexLookup::All`], single-property indexes accept scalar equality or
41/// range inputs, and composite equality indexes accept an ordered value tuple.
42///
43/// # Performance
44///
45/// Copying this value is `O(1)`.
46#[derive(Clone, Copy, Debug)]
47pub enum IndexLookup<'value> {
48    /// Lookup every subject represented by a membership-style index.
49    All,
50    /// Lookup one scalar equality value.
51    Equal(&'value PropertyValue),
52    /// Lookup one inclusive scalar range.
53    Range {
54        /// Inclusive lower bound.
55        min: &'value PropertyValue,
56        /// Inclusive upper bound.
57        max: &'value PropertyValue,
58    },
59    /// Lookup one ordered composite equality tuple.
60    CompositeEqual(&'value [PropertyValue]),
61}
62
63/// Auto-checkpoint policy: decides when a dirty commit should fold the
64/// delta-log into a fresh base generation, bounding the log tail that recovery
65/// must replay.
66///
67/// The default is size-ratio: trigger when the delta-log grows past `factor`
68/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
69/// disables auto-triggering entirely (checkpoint only via
70/// [`Database::checkpoint`]/[`Database::compact`]).
71///
72/// # Performance
73///
74/// Copying this value is `O(1)`.
75#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum CheckpointPolicy {
77    /// Never auto-checkpoint; the caller drives [`Database::checkpoint`].
78    Manual,
79    /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
80    /// times the live base size (a small floor guards a tiny/empty base so the
81    /// gen-0 store does not checkpoint on its first commit).
82    SizeRatio {
83        /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
84        /// before the next dirty commit folds it.
85        factor: u32,
86    },
87}
88
89impl CheckpointPolicy {
90    /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
91    /// four times the live base size.
92    pub const DEFAULT_FACTOR: u32 = 4;
93
94    /// The base-size floor (bytes) below which the size-ratio policy never fires,
95    /// so a freshly created (near-empty) base is not checkpointed on its first
96    /// commits before it carries meaningful data.
97    const MIN_BASE_BYTES: u64 = 4 * 1024;
98
99    /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
100    /// should trigger an auto-checkpoint under this policy.
101    ///
102    /// # Performance
103    ///
104    /// This method is `O(1)`.
105    #[must_use]
106    const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
107        match self {
108            Self::Manual => false,
109            Self::SizeRatio { factor } => {
110                let floor = if base_bytes < Self::MIN_BASE_BYTES {
111                    Self::MIN_BASE_BYTES
112                } else {
113                    base_bytes
114                };
115                log_bytes > floor.saturating_mul(factor as u64)
116            }
117        }
118    }
119}
120
121impl Default for CheckpointPolicy {
122    /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
123    ///
124    /// # Performance
125    ///
126    /// This function is `O(1)`.
127    fn default() -> Self {
128        Self::SizeRatio {
129            factor: Self::DEFAULT_FACTOR,
130        }
131    }
132}
133
134/// Builds the base filename for generation `generation`.
135///
136/// # Performance
137///
138/// This function is `O(1)`.
139fn base_file(generation: u64) -> String {
140    format!("base-{generation}.oxgdb")
141}
142
143/// Builds the delta-log filename for generation `generation`.
144///
145/// # Performance
146///
147/// This function is `O(1)`.
148fn delta_file(generation: u64) -> String {
149    format!("delta-{generation}.log")
150}
151
152/// Open OXGDB database handle.
153///
154/// # Performance
155///
156/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
157/// delta-log handle.
158pub struct Database {
159    /// Root database directory.
160    root: PathBuf,
161    /// The current visible snapshot (base generation + published overlay),
162    /// shared by readers through an atomically reference-counted handle.
163    current: Arc<Snapshot>,
164    /// Live base generation named by the superblock; every delta frame and the
165    /// per-generation log filename carry it.
166    base_generation: u64,
167    /// Last writer transaction id durably recorded (the last dirty commit's id).
168    /// A rollback burns a session-local id above this but does not advance it.
169    last_transaction_id: TransactionId,
170    /// Auto-checkpoint policy consulted after each dirty commit.
171    checkpoint_policy: CheckpointPolicy,
172}
173
174impl Database {
175    /// Creates a new empty OXGDB database at `path`.
176    ///
177    /// The create order is base-0 then empty delta-0.log then the writer lock
178    /// file then the superblock (written LAST as the create-complete marker), so
179    /// a half-created store is detected on open rather than silently opened
180    /// empty.
181    ///
182    /// # Errors
183    ///
184    /// Returns [`DbError::AlreadyExists`] when a store already exists, or
185    /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
186    ///
187    /// # Performance
188    ///
189    /// This function is `O(empty base bytes)`.
190    pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
191        let root = path.as_ref().to_path_buf();
192        if root.join(wal::SUPERBLOCK_FILE).exists() {
193            return Err(DbError::AlreadyExists);
194        }
195        // Base-0: an empty merged view (empty base under an empty overlay).
196        let empty_base = crate::overlay::BaseRecords::empty();
197        let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
198        let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
199        let base_bytes = freeze::freeze_view(
200            &view,
201            FreezeStamps {
202                commit_seq: 0,
203                transaction_id: 0,
204                generation: 0,
205            },
206        )?;
207        storage::atomic_write(
208            &root,
209            &root.join(format!("{}.tmp", base_file(0))),
210            &root.join(base_file(0)),
211            &base_bytes,
212        )?;
213        // Empty delta-0.log, durably created.
214        create_empty_log(&root, 0)?;
215        // Superblock is written LAST; its existence is the create-complete marker.
216        write_superblock(&root, 0, 0, 0, 0)?;
217        Self::open(&root)
218    }
219
220    /// Opens an existing OXGDB database, recovering the live frontier from the
221    /// valid prefix of the delta-log replayed over the base named by the
222    /// superblock.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`DbError`] when the store is missing, malformed, or the log is
227    /// corrupt beyond a torn tail.
228    ///
229    /// # Performance
230    ///
231    /// This function is `O(base bytes + log bytes)`.
232    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
233        let root = path.as_ref().to_path_buf();
234        let superblock = wal::read_superblock(&root)?;
235        let generation = superblock.base_generation.get();
236
237        let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
238        let base_records = Arc::new(crate::overlay::BaseRecords::from_view(base.get())?);
239        let base_header = *base.get().header();
240        let base_catalog = base.get().catalog().clone();
241        let base_next = NextIds::from_header(&base_header);
242
243        // Replay the valid prefix of the per-generation delta-log.
244        let log_path = root.join(delta_file(generation));
245        let log_bytes = read_log(&log_path)?;
246        let outcome = wal::replay(generation, &log_bytes)?;
247        // A torn tail truncates the log back to its last-good byte length.
248        if outcome.valid_len < log_bytes.len() {
249            truncate_log(&log_path, outcome.valid_len)?;
250        }
251
252        // Fold the replayed frames into a fresh overlay over the base, deriving
253        // the live frontier (commit_seq/txn_id) from the last good frame.
254        let mut write = WriteOverlay::new(base_next, base_catalog);
255        let mut recovered_next = base_next;
256        let mut last_commit_seq = superblock.commit_seq.get();
257        let mut last_txn = superblock.transaction_id.get();
258        for frame in &outcome.frames {
259            for op in &frame.ops {
260                write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
261            }
262            recovered_next = recovered_next.elementwise_max(write.next_ids());
263            last_commit_seq = frame.lsn;
264            last_txn = last_txn.max(frame.txn_id);
265        }
266        // ids are never reused: the recovered watermark is the elementwise max of
267        // the base header and every replayed frame's watermark.
268        write.set_next_ids(recovered_next);
269        let overlay = Arc::new(write.freeze());
270
271        let snapshot = Arc::new(Snapshot::new(
272            CheckpointGeneration::new(generation),
273            CommitSeq::new(last_commit_seq),
274            base,
275            overlay,
276        )?);
277
278        Ok(Self {
279            root,
280            current: snapshot,
281            base_generation: generation,
282            last_transaction_id: TransactionId::new(last_txn),
283            checkpoint_policy: CheckpointPolicy::default(),
284        })
285    }
286
287    /// Returns the live base generation named by the superblock (the count of
288    /// folds this store has undergone; gen-0 is the freshly created store).
289    ///
290    /// # Performance
291    ///
292    /// This method is `O(1)`.
293    #[must_use]
294    pub const fn live_generation(&self) -> CheckpointGeneration {
295        CheckpointGeneration::new(self.base_generation)
296    }
297
298    /// Returns the configured auto-checkpoint policy.
299    ///
300    /// # Performance
301    ///
302    /// This method is `O(1)`.
303    #[must_use]
304    pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
305        self.checkpoint_policy
306    }
307
308    /// Sets the auto-checkpoint policy consulted after each dirty commit.
309    ///
310    /// # Performance
311    ///
312    /// This method is `O(1)`.
313    pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
314        self.checkpoint_policy = policy;
315    }
316
317    /// Validates the current handle by re-reading the superblock and verifying
318    /// the live base's content CRC.
319    ///
320    /// # Errors
321    ///
322    /// Returns [`DbError`] when the superblock or base fails validation.
323    ///
324    /// # Performance
325    ///
326    /// This method is `O(base bytes)`.
327    pub fn validate(&self) -> Result<(), DbError> {
328        wal::read_superblock(&self.root)?;
329        Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
330    }
331
332    /// Validates an OXGDB database at `path`.
333    ///
334    /// # Errors
335    ///
336    /// Returns [`DbError`] when the store fails to open and recover.
337    ///
338    /// # Performance
339    ///
340    /// This function is `O(base bytes + log bytes)`.
341    pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
342        Self::open(path).map(|_database| ())
343    }
344
345    /// Folds the current base+overlay into a new base generation, rotating the
346    /// delta-log and republishing the superblock (a manual checkpoint).
347    ///
348    /// This is the checkpoint primitive, exposed here so the existing `compact`
349    /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
350    /// configured separately via [`Database::set_checkpoint_policy`].
351    ///
352    /// # Errors
353    ///
354    /// Returns [`DbError`] when encoding, writing, or publishing the new
355    /// generation fails.
356    ///
357    /// # Performance
358    ///
359    /// This method is `O(visible state bytes)`.
360    pub fn compact(&mut self) -> Result<(), DbError> {
361        self.checkpoint()
362    }
363
364    /// Folds the current base+overlay into base-`{g+1}`, creates an empty
365    /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
366    /// linearization point), then unlinks the old base and log.
367    ///
368    /// The order is crash-safe: the new base is fully durable BEFORE the
369    /// superblock names it (so a crash before the superblock leaves the OLD
370    /// superblock authoritative and the orphan new base is ignored), and the old
371    /// base/log are unlinked only AFTER the superblock names the new generation
372    /// (so a crash before the unlink leaves the NEW superblock authoritative and
373    /// the orphan old files are ignored). The
374    /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
375    ///
376    /// # Errors
377    ///
378    /// Returns [`DbError`] when encoding, writing, or publishing fails.
379    ///
380    /// # Performance
381    ///
382    /// This method is `O(visible state bytes)`.
383    pub fn checkpoint(&mut self) -> Result<(), DbError> {
384        self.checkpoint_inner(
385            #[cfg(test)]
386            CheckpointStop::Complete,
387        )
388    }
389
390    /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
391    /// [`CheckpointStop`] that simulates a crash by returning early right after a
392    /// chosen fsync point, leaving the on-disk files exactly as a real crash
393    /// there would, so the crash-matrix test can reopen and assert recovery.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`DbError`] when encoding, writing, or publishing fails.
398    ///
399    /// # Performance
400    ///
401    /// This method is `O(visible state bytes)`.
402    fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
403        let _lock = WriterLock::acquire(&self.root)?;
404        let next_generation = self
405            .base_generation
406            .checked_add(1)
407            .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
408        let view = self.current.view();
409        let commit_seq = self.current.lsn().get();
410        let base_bytes = freeze::freeze_view(
411            &view,
412            FreezeStamps {
413                commit_seq,
414                transaction_id: self.last_transaction_id.get(),
415                generation: next_generation,
416            },
417        )?;
418        // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
419        storage::atomic_write(
420            &self.root,
421            &self
422                .root
423                .join(format!("{}.tmp", base_file(next_generation))),
424            &self.root.join(base_file(next_generation)),
425            &base_bytes,
426        )?;
427        // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
428        create_empty_log(&self.root, next_generation)?;
429        // Crash point A: new base + new log durable, superblock NOT yet
430        // published. The OLD superblock still names `g`, so recovery uses the old
431        // generation; the new base/log are orphans.
432        #[cfg(test)]
433        if matches!(stop, CheckpointStop::BeforeSuperblock) {
434            return Ok(());
435        }
436        // (3) publish the superblock naming g+1 — the linearization point.
437        write_superblock(
438            &self.root,
439            next_generation,
440            commit_seq,
441            commit_seq,
442            self.last_transaction_id.get(),
443        )?;
444        // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
445        // Recovery uses the new generation; the old base/log are orphans.
446        #[cfg(test)]
447        if matches!(stop, CheckpointStop::BeforeRotate) {
448            return Ok(());
449        }
450        // Re-open over the new generation, then (4) unlink the old base + log.
451        let reopened = Self::open(&self.root)?;
452        let old_generation = self.base_generation;
453        let policy = self.checkpoint_policy;
454        self.current = reopened.current;
455        self.base_generation = reopened.base_generation;
456        self.last_transaction_id = reopened.last_transaction_id;
457        // The reopen reset the policy to the default; restore the caller's.
458        self.checkpoint_policy = policy;
459        let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
460        let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
461        let _ = storage::sync_directory(&self.root);
462        Ok(())
463    }
464
465    /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
466    /// delta-log has grown too large relative to the base. Called after a dirty
467    /// commit publishes its frame. A failed fold is surfaced so the caller can
468    /// observe it; the committed data is already durable in the log regardless.
469    ///
470    /// # Errors
471    ///
472    /// Returns [`DbError`] when the triggered fold fails.
473    ///
474    /// # Performance
475    ///
476    /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
477    fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
478        let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
479        let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
480        if self
481            .checkpoint_policy
482            .should_checkpoint(log_bytes, base_bytes)
483        {
484            self.checkpoint()?;
485        }
486        Ok(())
487    }
488
489    /// Returns operational status for this handle, including the live generation
490    /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
491    /// weighs.
492    ///
493    /// # Performance
494    ///
495    /// This method is `O(visible state)` for the merged counts plus two `stat`
496    /// syscalls for the file sizes.
497    #[must_use]
498    pub fn status(&self) -> DatabaseStatus {
499        let view = self.current.view();
500        DatabaseStatus {
501            visible_commit_seq: self.current.lsn(),
502            last_transaction_id: self.last_transaction_id,
503            live_generation: CheckpointGeneration::new(self.base_generation),
504            base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
505            log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
506            element_count: view.element_count(),
507            relation_count: view.relation_count(),
508            incidence_count: view.incidence_count(),
509            catalog: self.catalog_summary(),
510        }
511    }
512
513    /// Returns a catalog-size summary.
514    ///
515    /// # Performance
516    ///
517    /// This method is `O(catalog entry count)`.
518    #[must_use]
519    pub fn catalog_summary(&self) -> CatalogSummary {
520        CatalogSummary::from_catalog(self.current.view().catalog())
521    }
522
523    /// Starts a read transaction pinned to the current visible snapshot.
524    ///
525    /// # Performance
526    ///
527    /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
528    /// observes a fixed state even across later commits and checkpoints.
529    #[must_use]
530    pub fn begin_read(&self) -> ReadTransaction {
531        ReadTransaction {
532            snapshot: Arc::clone(&self.current),
533        }
534    }
535
536    /// Starts the single writer transaction, acquiring the cross-process writer
537    /// lock for the transaction's lifetime.
538    ///
539    /// # Errors
540    ///
541    /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
542    /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
543    ///
544    /// # Performance
545    ///
546    /// This method is `O(1)`: the writer layers a fresh empty write overlay over
547    /// the current snapshot.
548    pub fn begin_write(&mut self) -> Result<WriteTransaction<'_>, DbError> {
549        let lock = WriterLock::acquire(&self.root)?;
550        let transaction_id = self
551            .last_transaction_id
552            .checked_next()
553            .ok_or(DbError::TransactionIdOverflow)?;
554        // Burn the id eagerly so it is session-local-visible even on rollback;
555        // it only becomes durable when a dirty commit writes its frame, and a
556        // reopen recovers the durable high-water mark from the log.
557        self.last_transaction_id = transaction_id;
558        let parent = Arc::clone(&self.current);
559        // Seed the writer delta from the parent's published overlay so the
560        // writer reads every committed record; the parent overlay is never
561        // mutated (the seed clones its maps).
562        let delta = WriteOverlay::from_overlay(parent.overlay());
563        Ok(WriteTransaction {
564            database: self,
565            parent,
566            delta,
567            transaction_id,
568            lock,
569        })
570    }
571
572    /// Prepares a query against the current catalog.
573    ///
574    /// # Errors
575    ///
576    /// Returns [`DbError`] when parsing or semantic analysis fails.
577    ///
578    /// # Performance
579    ///
580    /// This method is `O(query length + catalog lookup cost)`.
581    pub fn prepare(&self, language: QueryLanguage, query: &str) -> Result<PreparedQuery, DbError> {
582        PreparedQuery::prepare(language, query, &self.current.view())
583    }
584}
585
586/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
587/// be stat'd (size is advisory — used for status reporting and the
588/// auto-checkpoint heuristic, never for correctness).
589///
590/// # Performance
591///
592/// This function is `O(1)`: one `stat` syscall.
593fn file_len(path: &Path) -> u64 {
594    std::fs::metadata(path).map_or(0, |meta| meta.len())
595}
596
597/// Test-only crash-injection point for [`Database::checkpoint_inner`]: stops the
598/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
599/// the recovered state at each crash window.
600///
601/// The crash-matrix test that constructs the non-`Complete` variants is
602/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
603/// miri's isolation cannot model), so under miri only `Complete` is constructed
604/// and the other variants are expectedly unused.
605///
606/// # Performance
607///
608/// `perf: unspecified`; a test-only control tag.
609#[cfg(test)]
610#[cfg_attr(
611    miri,
612    expect(
613        dead_code,
614        reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
615    )
616)]
617#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618enum CheckpointStop {
619    /// Run the whole checkpoint (the production path).
620    Complete,
621    /// Stop after the new base + new log are durable, before the superblock is
622    /// published (the old superblock stays authoritative).
623    BeforeSuperblock,
624    /// Stop after the superblock names the new generation, before the old
625    /// base/log are unlinked (the new superblock is authoritative).
626    BeforeRotate,
627}
628
629/// Reads the whole delta-log into memory, treating a missing file as empty.
630///
631/// # Errors
632///
633/// Returns [`DbError::Io`] when the file cannot be read.
634///
635/// # Performance
636///
637/// This function is `O(log bytes)`.
638fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
639    match std::fs::read(path) {
640        Ok(bytes) => Ok(bytes),
641        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
642        Err(error) => Err(DbError::io("read delta-log", error)),
643    }
644}
645
646/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
647/// discarding a torn tail under the open path.
648///
649/// # Errors
650///
651/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
652///
653/// # Performance
654///
655/// This function is `O(1)`.
656fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
657    let file = std::fs::OpenOptions::new()
658        .write(true)
659        .open(path)
660        .map_err(|error| DbError::io("open delta-log for truncate", error))?;
661    let len = u64::try_from(len)
662        .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
663    file.set_len(len)
664        .map_err(|error| DbError::io("truncate delta-log", error))?;
665    file.sync_all()
666        .map_err(|error| DbError::io("sync truncated delta-log", error))
667}
668
669/// Creates an empty per-generation delta-log, fsyncing the file and the
670/// directory entry so the new (empty) log is durable.
671///
672/// # Errors
673///
674/// Returns [`DbError::Io`] when creation or syncing fails.
675///
676/// # Performance
677///
678/// This function is `O(1)`.
679fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
680    let path = root.join(delta_file(generation));
681    let file =
682        std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
683    file.sync_all()
684        .map_err(|error| DbError::io("sync delta-log", error))?;
685    storage::sync_directory(root)
686}
687
688/// Opens the live delta-log for appending (create when absent, read+append).
689///
690/// # Errors
691///
692/// Returns [`DbError::Io`] when the log cannot be opened.
693///
694/// # Performance
695///
696/// This function is `O(1)`.
697fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
698    std::fs::OpenOptions::new()
699        .create(true)
700        .truncate(false)
701        .read(true)
702        .append(true)
703        .open(root.join(delta_file(generation)))
704        .map_err(|error| DbError::io("open delta-log for append", error))
705}
706
707/// Writes the superblock naming `generation` with the given frontier stamps.
708///
709/// # Errors
710///
711/// Returns [`DbError::Io`] when publishing fails.
712///
713/// # Performance
714///
715/// This function is `O(1)`.
716fn write_superblock(
717    root: &Path,
718    generation: u64,
719    checkpoint_lsn: u64,
720    commit_seq: u64,
721    transaction_id: u64,
722) -> Result<(), DbError> {
723    wal::write_superblock(
724        root,
725        &SuperblockRecord {
726            magic: crate::wire::SUPERBLOCK_MAGIC,
727            base_generation: generation.into(),
728            checkpoint_lsn: checkpoint_lsn.into(),
729            log_byte_offset: 0u64.into(),
730            commit_seq: commit_seq.into(),
731            transaction_id: transaction_id.into(),
732            format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
733            flags: 0u32.into(),
734            crc32c: 0u32.into(),
735            pad: 0u32.into(),
736        },
737    )
738}
739
740/// Snapshot of database status.
741///
742/// # Performance
743///
744/// Copying and comparing status is `O(1)`.
745#[derive(Clone, Copy, Debug, Eq, PartialEq)]
746pub struct DatabaseStatus {
747    /// Last visible commit sequence.
748    pub visible_commit_seq: CommitSeq,
749    /// Last writer transaction ID burned by this handle.
750    ///
751    /// This value is durable after a dirty commit and session-local after
752    /// rollback.
753    pub last_transaction_id: TransactionId,
754    /// Live base generation named by the superblock (the count of folds this
755    /// store has undergone; gen-0 is the freshly created store).
756    pub live_generation: CheckpointGeneration,
757    /// On-disk byte size of the live base file.
758    pub base_byte_size: u64,
759    /// On-disk byte size of the live delta-log (the tail recovery replays and
760    /// the auto-checkpoint policy weighs against the base size).
761    pub log_byte_size: u64,
762    /// Visible element count.
763    pub element_count: usize,
764    /// Visible relation count.
765    pub relation_count: usize,
766    /// Visible incidence count.
767    pub incidence_count: usize,
768    /// Catalog-size summary.
769    pub catalog: CatalogSummary,
770}
771
772/// Catalog-size summary.
773///
774/// # Performance
775///
776/// Copying and comparing are `O(1)`.
777#[derive(Clone, Copy, Debug, Eq, PartialEq)]
778pub struct CatalogSummary {
779    /// Role count.
780    pub role_count: usize,
781    /// Label count.
782    pub label_count: usize,
783    /// Relation type count.
784    pub relation_type_count: usize,
785    /// Property key count.
786    pub property_key_count: usize,
787    /// Projection count.
788    pub projection_count: usize,
789    /// Index count.
790    pub index_count: usize,
791}
792
793impl CatalogSummary {
794    /// Builds a summary from a catalog.
795    ///
796    /// # Performance
797    ///
798    /// This function is `O(catalog entry count)`.
799    #[must_use]
800    pub fn from_catalog(catalog: &Catalog) -> Self {
801        Self {
802            role_count: catalog.roles().count(),
803            label_count: catalog.labels().count(),
804            relation_type_count: catalog.relation_types().count(),
805            property_key_count: catalog.property_keys().count(),
806            projection_count: catalog.projections().count(),
807            index_count: catalog.indexes().count(),
808        }
809    }
810}
811
812/// Reader pin identifying the visible database generation.
813///
814/// # Performance
815///
816/// Copying and comparing a pin is `O(1)`.
817#[derive(Clone, Copy, Debug, Eq, PartialEq)]
818pub struct ReadPin {
819    /// Pinned visible commit sequence.
820    pub visible_commit_seq: CommitSeq,
821    /// Pinned checkpoint generation.
822    pub generation: CheckpointGeneration,
823}
824
825/// Read transaction over a pinned snapshot.
826///
827/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
828/// [`Database`], so it stays valid across a later `begin_write`/`checkpoint` on
829/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
830/// is [`Send`] + [`Sync`] (asserted below).
831///
832/// # Performance
833///
834/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
835/// snapshot through an `Arc`, not by copying.
836pub struct ReadTransaction {
837    /// The pinned snapshot this reader observes.
838    snapshot: Arc<Snapshot>,
839}
840
841/// `ReadTransaction` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
842/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
843const fn assert_send_sync<T: Send + Sync>() {}
844const _: () = assert_send_sync::<ReadTransaction>();
845const _: () = assert_send_sync::<Arc<Snapshot>>();
846
847impl ReadTransaction {
848    /// Returns this transaction's reader pin.
849    ///
850    /// # Performance
851    ///
852    /// This method is `O(1)`.
853    #[must_use]
854    pub fn pin(&self) -> ReadPin {
855        ReadPin {
856            visible_commit_seq: self.snapshot.lsn(),
857            generation: self.snapshot.generation(),
858        }
859    }
860
861    /// Returns catalog metadata.
862    ///
863    /// # Performance
864    ///
865    /// This method is `O(1)`.
866    #[must_use]
867    pub fn catalog(&self) -> &Catalog {
868        self.snapshot.view().catalog_ref()
869    }
870
871    /// Returns visible element count.
872    ///
873    /// # Performance
874    ///
875    /// This method is `O(base + overlay change)`.
876    #[must_use]
877    pub fn element_count(&self) -> usize {
878        self.snapshot.view().element_count()
879    }
880
881    /// Returns visible relation count.
882    ///
883    /// # Performance
884    ///
885    /// This method is `O(base + overlay change)`.
886    #[must_use]
887    pub fn relation_count(&self) -> usize {
888        self.snapshot.view().relation_count()
889    }
890
891    /// Returns visible incidence count.
892    ///
893    /// # Performance
894    ///
895    /// This method is `O(base + overlay change)`.
896    #[must_use]
897    pub fn incidence_count(&self) -> usize {
898        self.snapshot.view().incidence_count()
899    }
900
901    /// Returns every visible element id in id order.
902    ///
903    /// # Performance
904    ///
905    /// This method is `O(element count)`.
906    #[must_use]
907    pub fn element_ids(&self) -> Vec<ElementId> {
908        self.snapshot
909            .view()
910            .elements()
911            .map(|record| record.id)
912            .collect()
913    }
914
915    /// Returns every visible relation id in id order.
916    ///
917    /// # Performance
918    ///
919    /// This method is `O(relation count)`.
920    #[must_use]
921    pub fn relation_ids(&self) -> Vec<RelationId> {
922        self.snapshot
923            .view()
924            .relations()
925            .map(|record| record.id)
926            .collect()
927    }
928
929    /// Returns whether an element exists.
930    ///
931    /// # Performance
932    ///
933    /// This method is `O(log change + log n)`.
934    #[must_use]
935    pub fn contains_element(&self, id: ElementId) -> bool {
936        self.snapshot.view().contains_element(id)
937    }
938
939    /// Returns whether a relation exists.
940    ///
941    /// # Performance
942    ///
943    /// This method is `O(log change + log n)`.
944    #[must_use]
945    pub fn contains_relation(&self, id: RelationId) -> bool {
946        self.snapshot.view().contains_relation(id)
947    }
948
949    /// Returns whether an incidence exists.
950    ///
951    /// # Performance
952    ///
953    /// This method is `O(log change + log n)`.
954    #[must_use]
955    pub fn contains_incidence(&self, id: IncidenceId) -> bool {
956        self.snapshot.view().contains_incidence(id)
957    }
958
959    /// Returns an element record, borrowed from the base for a base-only id and
960    /// owned for an overlay-supplied id.
961    ///
962    /// # Performance
963    ///
964    /// This method is `O(log change + log n)`.
965    #[must_use]
966    pub fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>> {
967        self.snapshot.view().element_ref(id)
968    }
969
970    /// Returns a relation record (see [`Self::element`] for the borrow contract).
971    ///
972    /// # Performance
973    ///
974    /// This method is `O(log change + log n)`.
975    #[must_use]
976    pub fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>> {
977        self.snapshot.view().relation_ref(id)
978    }
979
980    /// Returns an incidence record (see [`Self::element`] for the borrow
981    /// contract).
982    ///
983    /// # Performance
984    ///
985    /// This method is `O(log change + log n)`.
986    #[must_use]
987    pub fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>> {
988        self.snapshot.view().incidence_ref(id)
989    }
990
991    /// Returns every visible incidence attached to an element, in ascending
992    /// incidence-id order.
993    ///
994    /// The merged set mixes overlay-owned and base-borrowed records, so this
995    /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
996    /// cheap).
997    ///
998    /// # Performance
999    ///
1000    /// This method is `O(base incidences + overlay incidence change)`.
1001    #[must_use]
1002    pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1003        self.snapshot.view().element_incidences(id)
1004    }
1005
1006    /// Returns one property value (see [`Self::element`] for the borrow
1007    /// contract).
1008    ///
1009    /// # Performance
1010    ///
1011    /// This method is `O(log subjects + log keys)`.
1012    #[must_use]
1013    pub fn property(
1014        &self,
1015        subject: PropertySubject,
1016        key: PropertyKeyId,
1017    ) -> Option<Cow<'_, PropertyValue>> {
1018        self.snapshot.view().property_ref(subject, key)
1019    }
1020
1021    /// Looks up subjects with a property value.
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns [`DbError`] when the property key is unknown or `value` does not
1026    /// match the key schema.
1027    ///
1028    /// # Performance
1029    ///
1030    /// This method is `O(property subject count)`.
1031    pub fn lookup_property_equal(
1032        &self,
1033        key: PropertyKeyId,
1034        value: &PropertyValue,
1035    ) -> Result<Vec<PropertySubject>, DbError> {
1036        self.snapshot.view().typed_property_equal(key, value)
1037    }
1038
1039    /// Looks up subjects with a property inside an inclusive range.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns [`DbError`] when the property key is unknown or either bound
1044    /// does not match the key schema.
1045    ///
1046    /// # Performance
1047    ///
1048    /// This method is `O(property subject count)`.
1049    pub fn lookup_property_range(
1050        &self,
1051        key: PropertyKeyId,
1052        min: &PropertyValue,
1053        max: &PropertyValue,
1054    ) -> Result<Vec<PropertySubject>, DbError> {
1055        self.snapshot.view().typed_property_range(key, min, max)
1056    }
1057
1058    /// Executes an index lookup.
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1063    /// match the index kind, or supplied property values do not match catalog
1064    /// schemas.
1065    ///
1066    /// # Performance
1067    ///
1068    /// This method is `O(indexed family size)`.
1069    pub fn lookup_index(
1070        &self,
1071        index: IndexId,
1072        lookup: IndexLookup<'_>,
1073    ) -> Result<Vec<PropertySubject>, DbError> {
1074        let view = self.snapshot.view();
1075        let entry = view
1076            .catalog()
1077            .index(index)
1078            .ok_or(DbError::UnknownIndex { id: index })?;
1079        match (&entry.definition, lookup) {
1080            (IndexDefinition::Label { label }, IndexLookup::All) => Ok(view
1081                .elements_with_label(*label)
1082                .into_iter()
1083                .map(PropertySubject::Element)
1084                .collect()),
1085            (IndexDefinition::Label { .. }, _lookup) => {
1086                Err(DbError::unsupported("label index expects all lookup"))
1087            }
1088            (IndexDefinition::RelationType { relation_type }, IndexLookup::All) => Ok(view
1089                .relations_with_type(*relation_type)
1090                .into_iter()
1091                .map(PropertySubject::Relation)
1092                .collect()),
1093            (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1094                "relation type index expects all lookup",
1095            )),
1096            (IndexDefinition::PropertyEquality { key }, IndexLookup::Equal(value)) => {
1097                view.typed_property_equal(*key, value)
1098            }
1099            (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1100                "property equality index expects equality lookup",
1101            )),
1102            (IndexDefinition::PropertyRange { key }, IndexLookup::Range { min, max }) => {
1103                view.typed_property_range(*key, min, max)
1104            }
1105            (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1106                "property range index expects range lookup",
1107            )),
1108            (IndexDefinition::CompositeEquality { keys }, IndexLookup::CompositeEqual(values)) => {
1109                view.typed_property_composite_equal(keys, values)
1110            }
1111            (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1112                "composite equality index expects composite equality lookup",
1113            )),
1114            (IndexDefinition::Projection { projection }, IndexLookup::All) => {
1115                self.projection_index_subjects(*projection)
1116            }
1117            (IndexDefinition::Projection { .. }, _lookup) => {
1118                Err(DbError::unsupported("projection index expects all lookup"))
1119            }
1120        }
1121    }
1122
1123    /// Materializes a graph projection.
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1128    /// fails validation against current topology.
1129    ///
1130    /// # Performance
1131    ///
1132    /// This method is `O(relation count * incidence count)`.
1133    pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1134        let view = self.snapshot.view();
1135        let entry = view
1136            .catalog()
1137            .projection(id)
1138            .ok_or(DbError::UnknownProjection { id })?;
1139        match &entry.definition {
1140            ProjectionDefinition::Graph(definition) => {
1141                projection::GraphProjection::from_state(&view, definition.clone())
1142            }
1143            ProjectionDefinition::Hypergraph(_definition) => {
1144                Err(DbError::invalid_projection("projection is not a graph"))
1145            }
1146        }
1147    }
1148
1149    /// Materializes a graph projection by catalog name.
1150    ///
1151    /// # Errors
1152    ///
1153    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1154    /// fails validation against current topology.
1155    ///
1156    /// # Performance
1157    ///
1158    /// This method is `O(log projection count + relation count * incidence count)`.
1159    pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1160        let id = self
1161            .snapshot
1162            .view()
1163            .catalog()
1164            .projection_id(name)
1165            .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1166        self.graph_projection(id)
1167    }
1168
1169    /// Traverses a cataloged graph projection from canonical seed elements.
1170    ///
1171    /// Rows are unique canonical elements in BFS first-discovery order. Depth is
1172    /// the shortest discovered hop count from any seed.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Returns [`DbError`] when the projection is unknown, is not a graph,
1177    /// cannot be materialized, or a seed element is not part of the projection.
1178    ///
1179    /// # Performance
1180    ///
1181    /// This method is `O(relation count * incidence count + visited edges)`.
1182    pub fn traverse_graph(
1183        &self,
1184        projection: ProjectionId,
1185        seeds: &[ElementId],
1186        options: TraversalOptions,
1187    ) -> Result<TraversalResult, DbError> {
1188        if seeds.is_empty() || options.limit == 0 {
1189            return Ok(TraversalResult::new(Vec::new()));
1190        }
1191        let graph = self.graph_projection(projection)?;
1192        traversal::traverse_graph_projection(&graph, seeds, options)
1193    }
1194
1195    /// Materializes a hypergraph projection.
1196    ///
1197    /// # Errors
1198    ///
1199    /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1200    /// or fails validation against current topology.
1201    ///
1202    /// # Performance
1203    ///
1204    /// This method is `O(relation count * incidence count)`.
1205    pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1206        let view = self.snapshot.view();
1207        let entry = view
1208            .catalog()
1209            .projection(id)
1210            .ok_or(DbError::UnknownProjection { id })?;
1211        match &entry.definition {
1212            ProjectionDefinition::Hypergraph(definition) => {
1213                projection::HypergraphProjection::from_state(&view, definition.clone())
1214            }
1215            ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1216                "projection is not a hypergraph",
1217            )),
1218        }
1219    }
1220
1221    /// Executes a prepared query.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns [`DbError`] when execution cannot materialize a referenced
1226    /// projection.
1227    ///
1228    /// # Performance
1229    ///
1230    /// This method is `O(plan output + projection build cost when used)`.
1231    pub fn execute(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1232        query.execute(&self.snapshot.view())
1233    }
1234
1235    /// Explains a prepared query.
1236    ///
1237    /// # Performance
1238    ///
1239    /// This method is `O(plan size)`.
1240    #[must_use]
1241    pub fn explain(&self, query: &PreparedQuery) -> String {
1242        query.explain()
1243    }
1244
1245    /// Materializes subjects represented by a projection index.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns [`DbError`] when the projection is unknown or cannot be
1250    /// materialized.
1251    ///
1252    /// # Performance
1253    ///
1254    /// This method is `O(relation count * incidence count)`.
1255    fn projection_index_subjects(
1256        &self,
1257        projection: ProjectionId,
1258    ) -> Result<Vec<PropertySubject>, DbError> {
1259        let view = self.snapshot.view();
1260        let entry = view
1261            .catalog()
1262            .projection(projection)
1263            .ok_or(DbError::UnknownProjection { id: projection })?;
1264        match &entry.definition {
1265            ProjectionDefinition::Graph(definition) => {
1266                Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1267            }
1268            ProjectionDefinition::Hypergraph(definition) => Ok(
1269                projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1270            ),
1271        }
1272    }
1273}
1274
1275/// Single writer transaction.
1276///
1277/// Mutations accumulate into a private write overlay layered over the parent
1278/// snapshot; reads fall through the overlay then the base. `commit` appends the
1279/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1280/// `rollback` drops the overlay and appends nothing.
1281///
1282/// # Performance
1283///
1284/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1285pub struct WriteTransaction<'db> {
1286    /// Database receiving the commit.
1287    database: &'db mut Database,
1288    /// Parent snapshot the writer layers over (its base + frozen overlay).
1289    parent: Arc<Snapshot>,
1290    /// Private mutable delta this writer accumulates.
1291    delta: WriteOverlay,
1292    /// Writer transaction id (session-local until a dirty commit makes it
1293    /// durable).
1294    transaction_id: TransactionId,
1295    /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1296    /// transaction ends (on `rollback`, or on any early-return error path); a
1297    /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1298    /// triggered auto-checkpoint can re-acquire it.
1299    lock: WriterLock,
1300}
1301
1302impl WriteTransaction<'_> {
1303    /// Registers a structural incidence role.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1308    ///
1309    /// # Performance
1310    ///
1311    /// This method is `O(log role count + name length)`.
1312    pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1313        self.delta.register_role(name.into())
1314    }
1315
1316    /// Registers an element or relation label.
1317    ///
1318    /// # Errors
1319    ///
1320    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1321    ///
1322    /// # Performance
1323    ///
1324    /// This method is `O(log label count + name length)`.
1325    pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1326        self.delta.register_label(name.into())
1327    }
1328
1329    /// Registers a relation type.
1330    ///
1331    /// # Errors
1332    ///
1333    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1334    ///
1335    /// # Performance
1336    ///
1337    /// This method is `O(log relation type count + name length)`.
1338    pub fn register_relation_type(
1339        &mut self,
1340        name: impl Into<String>,
1341    ) -> Result<RelationTypeId, DbError> {
1342        self.delta.register_relation_type(name.into())
1343    }
1344
1345    /// Registers a typed property key.
1346    ///
1347    /// # Errors
1348    ///
1349    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1350    ///
1351    /// # Performance
1352    ///
1353    /// This method is `O(log property key count + name length)`.
1354    pub fn register_property_key(
1355        &mut self,
1356        name: impl Into<String>,
1357        family: PropertyFamily,
1358        value_type: PropertyType,
1359    ) -> Result<PropertyKeyId, DbError> {
1360        self.delta
1361            .register_property_key(name.into(), family, value_type)
1362    }
1363
1364    /// Defines a physical projection.
1365    ///
1366    /// # Errors
1367    ///
1368    /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1369    /// projection name already exists, or ID allocation fails.
1370    ///
1371    /// # Performance
1372    ///
1373    /// This method is `O(definition size + catalog lookup cost)`.
1374    pub fn define_projection(
1375        &mut self,
1376        definition: ProjectionDefinition,
1377    ) -> Result<ProjectionId, DbError> {
1378        self.validate_projection_definition(&definition)?;
1379        self.delta.register_projection(definition)
1380    }
1381
1382    /// Defines an index.
1383    ///
1384    /// # Errors
1385    ///
1386    /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1387    /// name already exists, or ID allocation fails.
1388    ///
1389    /// # Performance
1390    ///
1391    /// This method is `O(definition size + catalog lookup cost)`.
1392    pub fn define_index(
1393        &mut self,
1394        name: impl Into<String>,
1395        definition: IndexDefinition,
1396    ) -> Result<IndexId, DbError> {
1397        self.validate_index_definition(&definition)?;
1398        self.delta.register_index(name.into(), definition)
1399    }
1400
1401    /// Creates a canonical element.
1402    ///
1403    /// # Errors
1404    ///
1405    /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1406    ///
1407    /// # Performance
1408    ///
1409    /// This method is `O(log element change)`.
1410    pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1411        self.delta.create_element()
1412    }
1413
1414    /// Creates a canonical relation.
1415    ///
1416    /// # Errors
1417    ///
1418    /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1419    ///
1420    /// # Performance
1421    ///
1422    /// This method is `O(log relation change)`.
1423    pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1424        self.delta.create_relation()
1425    }
1426
1427    /// Creates a canonical incidence.
1428    ///
1429    /// # Errors
1430    ///
1431    /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1432    /// exhausted.
1433    ///
1434    /// # Performance
1435    ///
1436    /// This method is `O(log incidence change + reference lookup cost)`.
1437    pub fn create_incidence(
1438        &mut self,
1439        relation: RelationId,
1440        element: ElementId,
1441        role: RoleId,
1442    ) -> Result<IncidenceId, DbError> {
1443        self.require_relation(relation)?;
1444        self.require_element(element)?;
1445        self.require_role(role)?;
1446        self.delta.create_incidence(relation, element, role)
1447    }
1448
1449    /// Tombstones a canonical element and its incidences.
1450    ///
1451    /// # Errors
1452    ///
1453    /// Returns [`DbError::UnknownElement`] when the element is not visible.
1454    ///
1455    /// # Performance
1456    ///
1457    /// This method is `O(incidence count)`.
1458    pub fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
1459        self.require_element(id)?;
1460        let base = self.parent.base_records();
1461        self.delta.tombstone_element(base, id);
1462        // Cascade: every incidence referencing the element is tombstoned too.
1463        let incidences: Vec<IncidenceId> = self
1464            .merged()
1465            .incidences()
1466            .filter(|record| record.element == id)
1467            .map(|record| record.id)
1468            .collect();
1469        for incidence in incidences {
1470            self.delta
1471                .tombstone_incidence(self.parent.base_records(), incidence);
1472        }
1473        Ok(())
1474    }
1475
1476    /// Tombstones a canonical relation and its incidences.
1477    ///
1478    /// # Errors
1479    ///
1480    /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
1481    ///
1482    /// # Performance
1483    ///
1484    /// This method is `O(incidence count)`.
1485    pub fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
1486        self.require_relation(id)?;
1487        let base = self.parent.base_records();
1488        self.delta.tombstone_relation(base, id);
1489        let incidences: Vec<IncidenceId> = self
1490            .merged()
1491            .incidences()
1492            .filter(|record| record.relation == id)
1493            .map(|record| record.id)
1494            .collect();
1495        for incidence in incidences {
1496            self.delta
1497                .tombstone_incidence(self.parent.base_records(), incidence);
1498        }
1499        Ok(())
1500    }
1501
1502    /// Tombstones a canonical incidence.
1503    ///
1504    /// # Errors
1505    ///
1506    /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
1507    ///
1508    /// # Performance
1509    ///
1510    /// This method is `O(log incidence change)`.
1511    pub fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
1512        self.require_incidence(id)?;
1513        self.delta
1514            .tombstone_incidence(self.parent.base_records(), id);
1515        Ok(())
1516    }
1517
1518    /// Adds a label to an element.
1519    ///
1520    /// # Errors
1521    ///
1522    /// Returns [`DbError`] when the element or label is unknown.
1523    ///
1524    /// # Performance
1525    ///
1526    /// This method is `O(log element change + log label count)`.
1527    pub fn add_element_label(&mut self, element: ElementId, label: LabelId) -> Result<(), DbError> {
1528        self.require_element(element)?;
1529        self.require_label(label)?;
1530        self.delta
1531            .add_element_label(self.parent.base_records(), element, label);
1532        Ok(())
1533    }
1534
1535    /// Adds a label to a relation.
1536    ///
1537    /// # Errors
1538    ///
1539    /// Returns [`DbError`] when the relation or label is unknown.
1540    ///
1541    /// # Performance
1542    ///
1543    /// This method is `O(log relation change + log label count)`.
1544    pub fn add_relation_label(
1545        &mut self,
1546        relation: RelationId,
1547        label: LabelId,
1548    ) -> Result<(), DbError> {
1549        self.require_relation(relation)?;
1550        self.require_label(label)?;
1551        self.delta
1552            .add_relation_label(self.parent.base_records(), relation, label);
1553        Ok(())
1554    }
1555
1556    /// Sets a relation type.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns [`DbError`] when the relation or relation type is unknown.
1561    ///
1562    /// # Performance
1563    ///
1564    /// This method is `O(log relation change + log relation type count)`.
1565    pub fn set_relation_type(
1566        &mut self,
1567        relation: RelationId,
1568        relation_type: RelationTypeId,
1569    ) -> Result<(), DbError> {
1570        self.require_relation(relation)?;
1571        self.require_relation_type(relation_type)?;
1572        self.delta
1573            .set_relation_type(self.parent.base_records(), relation, relation_type);
1574        Ok(())
1575    }
1576
1577    /// Sets a property value.
1578    ///
1579    /// # Errors
1580    ///
1581    /// Returns [`DbError`] when the subject or key is unknown, or the value
1582    /// does not match the key schema.
1583    ///
1584    /// # Performance
1585    ///
1586    /// This method is `O(log subject change + log key count)`.
1587    pub fn set_property(
1588        &mut self,
1589        subject: PropertySubject,
1590        key: PropertyKeyId,
1591        value: PropertyValue,
1592    ) -> Result<(), DbError> {
1593        // Referential integrity: the subject must be visible (this rejects an
1594        // orphan property against a tombstoned/absent subject at the transaction
1595        // boundary — the overlay layer is permissive by design).
1596        self.require_subject(subject)?;
1597        let definition = self
1598            .merged()
1599            .catalog()
1600            .property_key(key)
1601            .cloned()
1602            .ok_or(DbError::UnknownPropertyKey { id: key })?;
1603        if definition.family != subject.family() {
1604            return Err(DbError::WrongPropertyFamily {
1605                expected: definition.family,
1606                actual: subject.family(),
1607            });
1608        }
1609        if definition.value_type != value.value_type() {
1610            return Err(DbError::PropertyTypeMismatch {
1611                expected: definition.value_type,
1612                actual: value.value_type(),
1613            });
1614        }
1615        self.delta
1616            .set_property(self.parent.base_records(), subject, key, value);
1617        Ok(())
1618    }
1619
1620    /// Removes a property value.
1621    ///
1622    /// # Errors
1623    ///
1624    /// Returns [`DbError`] when the subject or key is unknown.
1625    ///
1626    /// # Performance
1627    ///
1628    /// This method is `O(log subject change + log key count)`.
1629    pub fn remove_property(
1630        &mut self,
1631        subject: PropertySubject,
1632        key: PropertyKeyId,
1633    ) -> Result<(), DbError> {
1634        self.require_subject(subject)?;
1635        if self.merged().catalog().property_key(key).is_none() {
1636            return Err(DbError::UnknownPropertyKey { id: key });
1637        }
1638        self.delta
1639            .remove_property(self.parent.base_records(), subject, key);
1640        Ok(())
1641    }
1642
1643    /// Commits this write transaction durably.
1644    ///
1645    /// A non-dirty commit returns the parent's commit sequence without appending
1646    /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
1647    /// log into one WAL frame (with the watermark op last), appends it with an
1648    /// fsync (truncating back to the captured EOF on any write error so no
1649    /// interior torn record survives), THEN folds the delta into a fresh
1650    /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
1651    ///
1652    /// After publishing, a dirty commit consults the configured
1653    /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
1654    /// re-acquire it), then folds when the delta-log has outgrown the base. The
1655    /// committed frame is already durable, so an auto-fold failure does not lose
1656    /// data; it is surfaced to the caller.
1657    ///
1658    /// # Errors
1659    ///
1660    /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
1661    /// durable append, or a triggered auto-checkpoint fold fails.
1662    ///
1663    /// # Performance
1664    ///
1665    /// This method is `O(change)` for the dirty path — flat as the base grows.
1666    /// The publish step shares the parent snapshot's already-materialized
1667    /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
1668    /// folds, so the base is byte-identical within the generation), so it neither
1669    /// re-decodes the base nor rebuilds the index. A triggered fold adds
1670    /// `O(visible state bytes)` on top.
1671    pub fn commit(self) -> Result<CommitSeq, DbError> {
1672        if self.delta.is_empty() {
1673            // Non-dirty commit: no append, no publish, no durable id advance.
1674            return Ok(self.parent.lsn());
1675        }
1676        let lsn = self
1677            .parent
1678            .lsn()
1679            .checked_next()
1680            .ok_or(DbError::CommitSeqOverflow)?;
1681        let (ops, blob) = self.delta.encode_frame();
1682        let frame = wal::encode_commit(
1683            lsn.get(),
1684            self.transaction_id.get(),
1685            self.database.base_generation,
1686            &ops,
1687            &blob,
1688        )?;
1689        let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
1690        wal::append_commit(&mut log, &frame)?;
1691
1692        // Durable: the delta was seeded from the parent overlay and only added
1693        // this writer's changes, so freezing it directly is the full new
1694        // published overlay (parent state + this commit). The parent overlay was
1695        // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
1696        // pinning the parent is unaffected.
1697        let new_overlay = Arc::new(self.delta.freeze());
1698        // A commit never folds, so the new snapshot pins the SAME base generation
1699        // as the parent — the base wire bytes are byte-identical, and so are the
1700        // owned records and the derived index built from them. Share the parent's
1701        // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
1702        // and rebuilding the index, which keeps a single-element commit `O(change)`
1703        // rather than `O(base)` regardless of how large the base has grown.
1704        let snapshot = Snapshot::with_shared_base_records(
1705            self.parent.generation(),
1706            lsn,
1707            Arc::clone(self.parent.base()),
1708            new_overlay,
1709            Arc::clone(self.parent.base_records()),
1710        );
1711        self.database.current = Arc::new(snapshot);
1712        self.database.last_transaction_id = self.transaction_id;
1713        // Release the writer lock before any auto-fold so the fold can re-acquire
1714        // it (a partial move out of `self`, legal because `WriteTransaction` has
1715        // no `Drop` impl; the remaining `&mut Database` borrow stays live).
1716        drop(self.lock);
1717        self.database.maybe_auto_checkpoint()?;
1718        Ok(lsn)
1719    }
1720
1721    /// Drops this write transaction without committing.
1722    ///
1723    /// # Performance
1724    ///
1725    /// This method is `O(1)` excluding staged-delta drop cost.
1726    pub fn rollback(self) {}
1727
1728    /// Returns the merged read view this writer sees (overlay over base).
1729    ///
1730    /// # Performance
1731    ///
1732    /// This method is `O(1)` to construct.
1733    fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
1734        crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
1735    }
1736
1737    /// Requires an element to be visible in the writer's merged view.
1738    ///
1739    /// # Errors
1740    ///
1741    /// Returns [`DbError::UnknownElement`] when absent.
1742    ///
1743    /// # Performance
1744    ///
1745    /// This method is `O(log change + log n)`.
1746    fn require_element(&self, id: ElementId) -> Result<(), DbError> {
1747        if self.merged().contains_element(id) {
1748            Ok(())
1749        } else {
1750            Err(DbError::UnknownElement { id })
1751        }
1752    }
1753
1754    /// Requires a relation to be visible.
1755    ///
1756    /// # Errors
1757    ///
1758    /// Returns [`DbError::UnknownRelation`] when absent.
1759    ///
1760    /// # Performance
1761    ///
1762    /// This method is `O(log change + log n)`.
1763    fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
1764        if self.merged().contains_relation(id) {
1765            Ok(())
1766        } else {
1767            Err(DbError::UnknownRelation { id })
1768        }
1769    }
1770
1771    /// Requires an incidence to be visible.
1772    ///
1773    /// # Errors
1774    ///
1775    /// Returns [`DbError::UnknownIncidence`] when absent.
1776    ///
1777    /// # Performance
1778    ///
1779    /// This method is `O(log change + log n)`.
1780    fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
1781        if self.merged().contains_incidence(id) {
1782            Ok(())
1783        } else {
1784            Err(DbError::UnknownIncidence { id })
1785        }
1786    }
1787
1788    /// Requires a role to exist in the merged catalog.
1789    ///
1790    /// # Errors
1791    ///
1792    /// Returns [`DbError::UnknownRole`] when absent.
1793    ///
1794    /// # Performance
1795    ///
1796    /// This method is `O(log role count)`.
1797    fn require_role(&self, id: RoleId) -> Result<(), DbError> {
1798        if self.delta.catalog().role(id).is_some() {
1799            Ok(())
1800        } else {
1801            Err(DbError::UnknownRole { id })
1802        }
1803    }
1804
1805    /// Requires a label to exist in the merged catalog.
1806    ///
1807    /// # Errors
1808    ///
1809    /// Returns [`DbError::UnknownLabel`] when absent.
1810    ///
1811    /// # Performance
1812    ///
1813    /// This method is `O(log label count)`.
1814    fn require_label(&self, id: LabelId) -> Result<(), DbError> {
1815        if self.delta.catalog().label(id).is_some() {
1816            Ok(())
1817        } else {
1818            Err(DbError::UnknownLabel { id })
1819        }
1820    }
1821
1822    /// Requires a relation type to exist in the merged catalog.
1823    ///
1824    /// # Errors
1825    ///
1826    /// Returns [`DbError::UnknownRelationType`] when absent.
1827    ///
1828    /// # Performance
1829    ///
1830    /// This method is `O(log relation type count)`.
1831    fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
1832        if self.delta.catalog().relation_type(id).is_some() {
1833            Ok(())
1834        } else {
1835            Err(DbError::UnknownRelationType { id })
1836        }
1837    }
1838
1839    /// Requires a property subject to be visible.
1840    ///
1841    /// # Errors
1842    ///
1843    /// Returns the matching `Unknown*` error when the subject is absent.
1844    ///
1845    /// # Performance
1846    ///
1847    /// This method is `O(log change + log n)`.
1848    fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
1849        match subject {
1850            PropertySubject::Element(id) => self.require_element(id),
1851            PropertySubject::Relation(id) => self.require_relation(id),
1852            PropertySubject::Incidence(id) => self.require_incidence(id),
1853        }
1854    }
1855
1856    /// Validates one projection definition against the merged catalog.
1857    ///
1858    /// # Errors
1859    ///
1860    /// Returns [`DbError`] when a referenced role or relation type is unknown.
1861    ///
1862    /// # Performance
1863    ///
1864    /// This method is `O(definition size)`.
1865    fn validate_projection_definition(
1866        &self,
1867        definition: &ProjectionDefinition,
1868    ) -> Result<(), DbError> {
1869        match definition {
1870            ProjectionDefinition::Graph(graph) => {
1871                self.require_role(graph.source_role)?;
1872                self.require_role(graph.target_role)?;
1873                for relation_type in &graph.relation_types {
1874                    self.require_relation_type(*relation_type)?;
1875                }
1876                Ok(())
1877            }
1878            ProjectionDefinition::Hypergraph(hyper) => {
1879                for role in &hyper.source_roles {
1880                    self.require_role(*role)?;
1881                }
1882                for role in &hyper.target_roles {
1883                    self.require_role(*role)?;
1884                }
1885                for relation_type in &hyper.relation_types {
1886                    self.require_relation_type(*relation_type)?;
1887                }
1888                Ok(())
1889            }
1890        }
1891    }
1892
1893    /// Validates one index definition against the merged catalog.
1894    ///
1895    /// # Errors
1896    ///
1897    /// Returns [`DbError`] when a referenced catalog id is unknown or a
1898    /// composite index has no keys.
1899    ///
1900    /// # Performance
1901    ///
1902    /// This method is `O(definition size)`.
1903    fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1904        let catalog = self.delta.catalog();
1905        match definition {
1906            IndexDefinition::Label { label } => self.require_label(*label),
1907            IndexDefinition::RelationType { relation_type } => {
1908                self.require_relation_type(*relation_type)
1909            }
1910            IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1911                self.require_property_key(*key)
1912            }
1913            IndexDefinition::CompositeEquality { keys } => {
1914                if keys.is_empty() {
1915                    return Err(DbError::unsupported(
1916                        "composite equality index requires at least one key",
1917                    ));
1918                }
1919                for key in keys {
1920                    self.require_property_key(*key)?;
1921                }
1922                Ok(())
1923            }
1924            IndexDefinition::Projection { projection } => catalog
1925                .projection(*projection)
1926                .is_some()
1927                .then_some(())
1928                .ok_or(DbError::UnknownProjection { id: *projection }),
1929        }
1930    }
1931
1932    /// Requires a property key to exist in the merged catalog.
1933    ///
1934    /// # Errors
1935    ///
1936    /// Returns [`DbError::UnknownPropertyKey`] when absent.
1937    ///
1938    /// # Performance
1939    ///
1940    /// This method is `O(log property key count)`.
1941    fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1942        if self.delta.catalog().property_key(id).is_some() {
1943            Ok(())
1944        } else {
1945            Err(DbError::UnknownPropertyKey { id })
1946        }
1947    }
1948}
1949
1950#[cfg(test)]
1951#[cfg(not(miri))]
1952mod tests {
1953    use std::{
1954        path::PathBuf,
1955        sync::atomic::{AtomicU64, Ordering},
1956    };
1957
1958    use super::*;
1959
1960    /// Per-process path counter for unique temporary store directories.
1961    static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
1962
1963    /// Returns a unique temporary store path and removes any prior contents.
1964    fn temp_store(name: &str) -> PathBuf {
1965        let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
1966        let path =
1967            std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
1968        let _ = std::fs::remove_dir_all(&path);
1969        path
1970    }
1971
1972    /// The exact logical state the crash-matrix asserts recovery preserves: the
1973    /// visible element ids, the rank-keyed property values, and the `Person`
1974    /// label membership.
1975    #[derive(Debug, Eq, PartialEq)]
1976    struct LogicalState {
1977        /// Visible element ids in ascending order.
1978        elements: Vec<ElementId>,
1979        /// Subjects whose `rank` equals each probed value, by value.
1980        rank_eq_500: Vec<PropertySubject>,
1981        /// Element ids carrying the `Person` label.
1982        person_members: Vec<ElementId>,
1983    }
1984
1985    /// Catalog/topology fixture ids returned by [`build_fixture`].
1986    struct Fixture {
1987        /// `rank` integer property key.
1988        rank: PropertyKeyId,
1989        /// `Person` label.
1990        person: LabelId,
1991    }
1992
1993    /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
1994    /// even-indexed ones labelled `Person`. Returns the fixture ids.
1995    fn build_fixture(database: &mut Database) -> Fixture {
1996        let mut writer = database.begin_write().expect("begin write");
1997        let rank = writer
1998            .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
1999            .expect("rank key");
2000        let person = writer.register_label("Person").expect("person label");
2001        for index in 0..8u64 {
2002            let element = writer.create_element().expect("element");
2003            writer
2004                .set_property(
2005                    PropertySubject::Element(element),
2006                    rank,
2007                    PropertyValue::Integer(i64::try_from(index).expect("index") * 100),
2008                )
2009                .expect("set rank");
2010            if index % 2 == 0 {
2011                writer
2012                    .add_element_label(element, person)
2013                    .expect("add label");
2014            }
2015        }
2016        writer.commit().expect("commit fixture");
2017        Fixture { rank, person }
2018    }
2019
2020    /// Reads the logical state through the index-backed read surface.
2021    fn read_logical(database: &Database, fixture: &Fixture) -> LogicalState {
2022        let read = database.begin_read();
2023        let elements = read.element_ids();
2024        let rank_eq_500 = read
2025            .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
2026            .expect("rank lookup");
2027        let person_members = read.snapshot.view().elements_with_label(fixture.person);
2028        LogicalState {
2029            elements,
2030            rank_eq_500,
2031            person_members,
2032        }
2033    }
2034
2035    /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
2036    /// `database` mints must take the id one past the current maximum visible
2037    /// element id, i.e. the recovered watermark survived the fold. A regression
2038    /// that dropped the watermark on fold (so the recovered record set is
2039    /// unchanged but the next-id counter reset) would reuse an existing id and
2040    /// fail this assertion — which the unchanged-record-set checks alone miss.
2041    ///
2042    /// The probe element is rolled back, so it does not perturb the logical state
2043    /// the surrounding test re-reads.
2044    fn assert_no_id_reuse_across_fold(database: &mut Database) {
2045        let max_existing = database
2046            .begin_read()
2047            .element_ids()
2048            .into_iter()
2049            .map(ElementId::get)
2050            .max()
2051            .unwrap_or(0);
2052        let expected = ElementId::new(max_existing + 1);
2053        let mut writer = database.begin_write().expect("watermark probe writer");
2054        let minted = writer.create_element().expect("watermark probe element");
2055        assert_eq!(
2056            minted, expected,
2057            "the next minted id must be one past the max existing id (watermark \
2058             survived the fold; ids are never reused)",
2059        );
2060        // Roll the probe back so it leaves no trace in the logical state.
2061        writer.rollback();
2062    }
2063
2064    /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
2065    /// recovers EXACTLY the correct logical state. After a crash before the
2066    /// superblock lands, the OLD generation stays authoritative (the orphan new
2067    /// base is ignored); after a crash once the superblock names the new
2068    /// generation, the NEW base is authoritative. The completed checkpoint
2069    /// recovers the same logical state from the folded base. In every case the
2070    /// index-backed lookups return the same answers as before the (attempted)
2071    /// fold.
2072    #[test]
2073    fn checkpoint_crash_matrix_recovers_exact_state() {
2074        for stop in [
2075            CheckpointStop::BeforeSuperblock,
2076            CheckpointStop::BeforeRotate,
2077            CheckpointStop::Complete,
2078        ] {
2079            let path = temp_store(&format!("crash-{stop:?}"));
2080            let mut database = Database::create(&path).expect("create");
2081            let fixture = build_fixture(&mut database);
2082            let before = read_logical(&database, &fixture);
2083            let before_generation = database.base_generation;
2084
2085            // Simulate a crash at `stop`: the checkpoint returns right after the
2086            // chosen fsync, leaving the intermediate files in place. We then drop
2087            // the handle (as a crash would) and reopen from disk.
2088            database
2089                .checkpoint_inner(stop)
2090                .expect("checkpoint stop returns ok");
2091            drop(database);
2092
2093            let mut recovered = Database::open(&path).expect("reopen after crash");
2094            let after = read_logical(&recovered, &fixture);
2095            assert_eq!(
2096                after, before,
2097                "crash at {stop:?} must recover the exact logical state",
2098            );
2099
2100            // The recovered watermark survives every crash window: the next minted
2101            // id is one past the max recovered element id, so ids are never reused
2102            // across the (attempted) fold — asserted behaviorally, not merely
2103            // inferred from the unchanged record set.
2104            assert_no_id_reuse_across_fold(&mut recovered);
2105
2106            // Generation expectation per crash window.
2107            match stop {
2108                CheckpointStop::BeforeSuperblock => assert_eq!(
2109                    recovered.base_generation, before_generation,
2110                    "old superblock stays authoritative before the new one lands",
2111                ),
2112                CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
2113                    recovered.base_generation,
2114                    before_generation + 1,
2115                    "the new superblock names the folded generation",
2116                ),
2117            }
2118
2119            // A second open is idempotent (orphan files from a partial crash do
2120            // not derail a repeat recovery).
2121            let reopened = Database::open(&path).expect("second reopen");
2122            assert_eq!(read_logical(&reopened, &fixture), before);
2123
2124            drop(reopened);
2125            let _ = std::fs::remove_dir_all(&path);
2126        }
2127    }
2128
2129    /// The auto-checkpoint policy folds the delta-log into a fresh base once the
2130    /// log outgrows the base by the configured factor: under a tiny factor, a
2131    /// run of dirty commits advances the live generation (the log was folded),
2132    /// and the logical state is preserved across the fold. The manual policy
2133    /// never auto-folds.
2134    #[test]
2135    fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
2136        // Manual policy: many commits, generation never advances on its own.
2137        let manual_path = temp_store("auto-manual");
2138        let mut manual = Database::create(&manual_path).expect("create manual");
2139        manual.set_checkpoint_policy(CheckpointPolicy::Manual);
2140        let _fixture = build_fixture(&mut manual);
2141        for _ in 0..200 {
2142            let mut writer = manual.begin_write().expect("writer");
2143            writer.create_element().expect("element");
2144            writer.commit().expect("commit");
2145        }
2146        assert_eq!(
2147            manual.live_generation(),
2148            CheckpointGeneration::new(0),
2149            "manual policy must never auto-fold",
2150        );
2151        drop(manual);
2152        let _ = std::fs::remove_dir_all(&manual_path);
2153
2154        // Size-ratio policy with the smallest factor: the log soon outgrows the
2155        // tiny base floor, so a run of commits triggers at least one fold.
2156        let auto_path = temp_store("auto-ratio");
2157        let mut auto = Database::create(&auto_path).expect("create auto");
2158        auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
2159        let fixture = build_fixture(&mut auto);
2160        let before = read_logical(&auto, &fixture);
2161        for _ in 0..400 {
2162            let mut writer = auto.begin_write().expect("writer");
2163            writer.create_element().expect("element");
2164            writer.commit().expect("commit");
2165        }
2166        assert!(
2167            auto.live_generation() > CheckpointGeneration::new(0),
2168            "size-ratio policy must auto-fold once the log outgrows the base",
2169        );
2170        // The pre-existing logical state survives every fold; the policy is also
2171        // surfaced in status and preserved across the fold.
2172        let after = read_logical(&auto, &fixture);
2173        assert_eq!(after.rank_eq_500, before.rank_eq_500);
2174        assert_eq!(after.person_members, before.person_members);
2175        // Ids are never reused across the auto-fold: the next minted id is one
2176        // past the max existing id (the watermark folded into the new base).
2177        assert_no_id_reuse_across_fold(&mut auto);
2178        assert_eq!(
2179            auto.checkpoint_policy(),
2180            CheckpointPolicy::SizeRatio { factor: 1 },
2181            "the auto-fold reopen must preserve the configured policy",
2182        );
2183        // Status surfaces the live generation and the (now small) log size.
2184        let status = auto.status();
2185        assert_eq!(status.live_generation, auto.live_generation());
2186        assert!(status.base_byte_size > 0, "live base has bytes");
2187        drop(auto);
2188        let _ = std::fs::remove_dir_all(&auto_path);
2189    }
2190}