Skip to main content

oxgraph_db/
database.rs

1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13    borrow::Cow,
14    collections::BTreeSet,
15    path::{Path, PathBuf},
16    sync::Arc,
17};
18
19use crate::{
20    Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId,
21    GraphProjectionDefinition, GraphProjectionSpec, IncidenceId, IncidenceRecord, IndexId, LabelId,
22    PreparedQuery, ProjectionDefinition, ProjectionId, Properties, PropertyKeyId, PropertySubject,
23    PropertyType, PropertyValue, QueryResult, Relation, RelationId, RelationTypeId, RoleId, Schema,
24    TransactionId,
25    backing::Base,
26    catalog::{IndexDefinition, PropertyFamily},
27    freeze::{self, FreezeStamps},
28    lock::WriterLock,
29    overlay::{Overlay, Snapshot, StateView, WriteOverlay},
30    projection::{self, GraphProjection, HypergraphProjection},
31    state::NextIds,
32    storage,
33    traversal::{self, Direction, Subgraph, Walk},
34    typed::{Assignable, EqualityIndex, Key, ValueType},
35    wal,
36    wire::SuperblockRecord,
37};
38
39/// Lookup input for a cataloged index.
40///
41/// This type makes index lookup shape explicit: membership indexes accept
42/// [`Match::All`], single-property indexes accept scalar equality or
43/// range inputs, and composite equality indexes accept an ordered value tuple.
44///
45/// # Performance
46///
47/// Copying this value is `O(1)`.
48#[derive(Clone, Copy, Debug)]
49pub enum Match<'value> {
50    /// Lookup every subject represented by a membership-style index.
51    All,
52    /// Lookup one scalar equality value.
53    Equal(&'value PropertyValue),
54    /// Lookup one inclusive scalar range.
55    Range {
56        /// Inclusive lower bound.
57        min: &'value PropertyValue,
58        /// Inclusive upper bound.
59        max: &'value PropertyValue,
60    },
61    /// Lookup one ordered composite equality tuple.
62    Composite(&'value [PropertyValue]),
63}
64
65/// Auto-checkpoint policy: decides when a dirty commit should fold the
66/// delta-log into a fresh base generation, bounding the log tail that recovery
67/// must replay.
68///
69/// The default is size-ratio: trigger when the delta-log grows past `factor`
70/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
71/// disables auto-triggering entirely (folded only by an explicit
72/// [`Db::compact`]).
73///
74/// # Performance
75///
76/// Copying this value is `O(1)`.
77#[derive(Clone, Copy, Debug, Eq, PartialEq)]
78pub enum CheckpointPolicy {
79    /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
80    Manual,
81    /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
82    /// times the live base size (a small floor guards a tiny/empty base so the
83    /// gen-0 store does not checkpoint on its first commit).
84    SizeRatio {
85        /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
86        /// before the next dirty commit folds it.
87        factor: u32,
88    },
89}
90
91impl CheckpointPolicy {
92    /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
93    /// four times the live base size.
94    pub const DEFAULT_FACTOR: u32 = 4;
95
96    /// The base-size floor (bytes) below which the size-ratio policy never fires,
97    /// so a freshly created (near-empty) base is not checkpointed on its first
98    /// commits before it carries meaningful data.
99    const MIN_BASE_BYTES: u64 = 4 * 1024;
100
101    /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
102    /// should trigger an auto-checkpoint under this policy.
103    ///
104    /// # Performance
105    ///
106    /// This method is `O(1)`.
107    #[must_use]
108    const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
109        match self {
110            Self::Manual => false,
111            Self::SizeRatio { factor } => {
112                let floor = if base_bytes < Self::MIN_BASE_BYTES {
113                    Self::MIN_BASE_BYTES
114                } else {
115                    base_bytes
116                };
117                log_bytes > floor.saturating_mul(factor as u64)
118            }
119        }
120    }
121}
122
123impl Default for CheckpointPolicy {
124    /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
125    ///
126    /// # Performance
127    ///
128    /// This function is `O(1)`.
129    fn default() -> Self {
130        Self::SizeRatio {
131            factor: Self::DEFAULT_FACTOR,
132        }
133    }
134}
135
136/// The durable result of a [`Db::write`]: whether a frame landed, and at which
137/// commit sequence.
138///
139/// # Performance
140///
141/// Copying this value is `O(1)`.
142#[derive(Clone, Copy, Debug, Eq, PartialEq)]
143#[non_exhaustive]
144pub enum CommitOutcome {
145    /// The transaction made no changes; no WAL frame was appended.
146    Empty,
147    /// A durable frame landed at this commit sequence.
148    Committed(CommitSeq),
149}
150
151/// Builds the base filename for generation `generation`.
152///
153/// # Performance
154///
155/// This function is `O(1)`.
156fn base_file(generation: u64) -> String {
157    format!("base-{generation}.oxgdb")
158}
159
160/// Builds the delta-log filename for generation `generation`.
161///
162/// # Performance
163///
164/// This function is `O(1)`.
165fn delta_file(generation: u64) -> String {
166    format!("delta-{generation}.log")
167}
168
169/// Open OXGDB database handle.
170///
171/// # Performance
172///
173/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
174/// delta-log handle.
175pub struct Db {
176    /// Root database directory.
177    root: PathBuf,
178    /// The current visible snapshot (base generation + published overlay),
179    /// shared by readers through an atomically reference-counted handle.
180    current: Arc<Snapshot>,
181    /// Live base generation named by the superblock; every delta frame and the
182    /// per-generation log filename carry it.
183    base_generation: u64,
184    /// Last writer transaction id durably recorded (the last dirty commit's id).
185    /// A rollback burns a session-local id above this but does not advance it.
186    last_transaction_id: TransactionId,
187    /// Auto-checkpoint policy consulted after each dirty commit.
188    checkpoint_policy: CheckpointPolicy,
189}
190
191impl Db {
192    /// Creates a new empty OXGDB database at `path`.
193    ///
194    /// The create order is base-0 then empty delta-0.log then the writer lock
195    /// file then the superblock (written LAST as the create-complete marker), so
196    /// a half-created store is detected on open rather than silently opened
197    /// empty.
198    ///
199    /// # Errors
200    ///
201    /// Returns [`DbError::AlreadyExists`] when a store already exists, or
202    /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
203    ///
204    /// # Performance
205    ///
206    /// This function is `O(empty base bytes)`.
207    pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
208        let root = path.as_ref().to_path_buf();
209        if root.join(wal::SUPERBLOCK_FILE).exists() {
210            return Err(DbError::AlreadyExists);
211        }
212        // Base-0: an empty merged view (empty base under an empty overlay).
213        let empty_base = crate::overlay::BaseRecords::empty();
214        let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
215        let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
216        let base_bytes = freeze::freeze_view(
217            &view,
218            FreezeStamps {
219                commit_seq: 0,
220                transaction_id: 0,
221                generation: 0,
222            },
223        )?;
224        storage::atomic_write(
225            &root,
226            &root.join(format!("{}.tmp", base_file(0))),
227            &root.join(base_file(0)),
228            &base_bytes,
229        )?;
230        // Empty delta-0.log, durably created.
231        create_empty_log(&root, 0)?;
232        // Superblock is written LAST; its existence is the create-complete marker.
233        write_superblock(&root, 0, 0, 0, 0)?;
234        Self::open(&root)
235    }
236
237    /// Opens an existing OXGDB database, recovering the live frontier from the
238    /// valid prefix of the delta-log replayed over the base named by the
239    /// superblock.
240    ///
241    /// # Errors
242    ///
243    /// Returns [`DbError`] when the store is missing, malformed, or the log is
244    /// corrupt beyond a torn tail.
245    ///
246    /// # Performance
247    ///
248    /// This function is `O(base bytes + log bytes)`.
249    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
250        let root = path.as_ref().to_path_buf();
251        let superblock = wal::read_superblock(&root)?;
252        let generation = superblock.base_generation.get();
253
254        let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
255        let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
256        let base_header = *base.get().header();
257        let base_catalog = base.get().catalog().clone();
258        let base_next = NextIds::from_header(&base_header);
259
260        // Replay the valid prefix of the per-generation delta-log.
261        let log_path = root.join(delta_file(generation));
262        let log_bytes = read_log(&log_path)?;
263        let outcome = wal::replay(generation, &log_bytes)?;
264        // A torn tail truncates the log back to its last-good byte length.
265        if outcome.valid_len < log_bytes.len() {
266            truncate_log(&log_path, outcome.valid_len)?;
267        }
268
269        // Fold the replayed frames into a fresh overlay over the base, deriving
270        // the live frontier (commit_seq/txn_id) from the last good frame.
271        let mut write = WriteOverlay::new(base_next, base_catalog);
272        let mut recovered_next = base_next;
273        let mut last_commit_seq = superblock.commit_seq.get();
274        let mut last_txn = superblock.transaction_id.get();
275        for frame in &outcome.frames {
276            for op in &frame.ops {
277                write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
278            }
279            recovered_next = recovered_next.elementwise_max(write.next_ids());
280            last_commit_seq = frame.lsn;
281            last_txn = last_txn.max(frame.txn_id);
282        }
283        // ids are never reused: the recovered watermark is the elementwise max of
284        // the base header and every replayed frame's watermark.
285        write.set_next_ids(recovered_next);
286        let overlay = Arc::new(write.freeze());
287
288        // Reuse the records already decoded for replay instead of decoding the base
289        // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
290        // the records (and their derived index) match. Halves open's base-decode cost.
291        let snapshot = Arc::new(Snapshot::with_shared_base_records(
292            CheckpointGeneration::new(generation),
293            CommitSeq::new(last_commit_seq),
294            base,
295            overlay,
296            base_records,
297        ));
298
299        Ok(Self {
300            root,
301            current: snapshot,
302            base_generation: generation,
303            last_transaction_id: TransactionId::new(last_txn),
304            checkpoint_policy: CheckpointPolicy::default(),
305        })
306    }
307
308    /// Returns the live base generation named by the superblock (the count of
309    /// folds this store has undergone; gen-0 is the freshly created store).
310    ///
311    /// # Performance
312    ///
313    /// This method is `O(1)`.
314    #[must_use]
315    pub const fn live_generation(&self) -> CheckpointGeneration {
316        CheckpointGeneration::new(self.base_generation)
317    }
318
319    /// Returns the configured auto-checkpoint policy.
320    ///
321    /// # Performance
322    ///
323    /// This method is `O(1)`.
324    #[must_use]
325    pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
326        self.checkpoint_policy
327    }
328
329    /// Sets the auto-checkpoint policy consulted after each dirty commit.
330    ///
331    /// # Performance
332    ///
333    /// This method is `O(1)`.
334    pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
335        self.checkpoint_policy = policy;
336    }
337
338    /// Validates the current handle by re-reading the superblock and verifying
339    /// the live base's content CRC.
340    ///
341    /// # Errors
342    ///
343    /// Returns [`DbError`] when the superblock or base fails validation.
344    ///
345    /// # Performance
346    ///
347    /// This method is `O(base bytes)`.
348    pub fn validate(&self) -> Result<(), DbError> {
349        wal::read_superblock(&self.root)?;
350        Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
351    }
352
353    /// Validates an OXGDB database at `path`.
354    ///
355    /// # Errors
356    ///
357    /// Returns [`DbError`] when the store fails to open and recover.
358    ///
359    /// # Performance
360    ///
361    /// This function is `O(base bytes + log bytes)`.
362    pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
363        Self::open(path).map(|_database| ())
364    }
365
366    /// Folds the current base+overlay into a new base generation, rotating the
367    /// delta-log and republishing the superblock (a manual checkpoint).
368    ///
369    /// This is the checkpoint primitive, exposed here so the existing `compact`
370    /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
371    /// configured separately via [`Db::set_checkpoint_policy`].
372    ///
373    /// # Errors
374    ///
375    /// Returns [`DbError`] when encoding, writing, or publishing the new
376    /// generation fails.
377    ///
378    /// # Performance
379    ///
380    /// This method is `O(visible state bytes)`.
381    pub fn compact(&mut self) -> Result<(), DbError> {
382        self.checkpoint()
383    }
384
385    /// Folds the current base+overlay into base-`{g+1}`, creates an empty
386    /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
387    /// linearization point), then unlinks the old base and log.
388    ///
389    /// The order is crash-safe: the new base is fully durable BEFORE the
390    /// superblock names it (so a crash before the superblock leaves the OLD
391    /// superblock authoritative and the orphan new base is ignored), and the old
392    /// base/log are unlinked only AFTER the superblock names the new generation
393    /// (so a crash before the unlink leaves the NEW superblock authoritative and
394    /// the orphan old files are ignored). The
395    /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`DbError`] when encoding, writing, or publishing fails.
400    ///
401    /// # Performance
402    ///
403    /// This method is `O(visible state bytes)`.
404    pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
405        self.checkpoint_inner(
406            #[cfg(test)]
407            CheckpointStop::Complete,
408        )
409    }
410
411    /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
412    /// [`CheckpointStop`] that simulates a crash by returning early right after a
413    /// chosen fsync point, leaving the on-disk files exactly as a real crash
414    /// there would, so the crash-matrix test can reopen and assert recovery.
415    ///
416    /// # Errors
417    ///
418    /// Returns [`DbError`] when encoding, writing, or publishing fails.
419    ///
420    /// # Performance
421    ///
422    /// This method is `O(visible state bytes)`.
423    fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
424        let _lock = WriterLock::acquire(&self.root)?;
425        let next_generation = self
426            .base_generation
427            .checked_add(1)
428            .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
429        let view = self.current.view();
430        let commit_seq = self.current.lsn().get();
431        let base_bytes = freeze::freeze_view(
432            &view,
433            FreezeStamps {
434                commit_seq,
435                transaction_id: self.last_transaction_id.get(),
436                generation: next_generation,
437            },
438        )?;
439        // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
440        storage::atomic_write(
441            &self.root,
442            &self
443                .root
444                .join(format!("{}.tmp", base_file(next_generation))),
445            &self.root.join(base_file(next_generation)),
446            &base_bytes,
447        )?;
448        // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
449        create_empty_log(&self.root, next_generation)?;
450        // Crash point A: new base + new log durable, superblock NOT yet
451        // published. The OLD superblock still names `g`, so recovery uses the old
452        // generation; the new base/log are orphans.
453        #[cfg(test)]
454        if matches!(stop, CheckpointStop::BeforeSuperblock) {
455            return Ok(());
456        }
457        // (3) publish the superblock naming g+1 — the linearization point.
458        write_superblock(
459            &self.root,
460            next_generation,
461            commit_seq,
462            commit_seq,
463            self.last_transaction_id.get(),
464        )?;
465        // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
466        // Recovery uses the new generation; the old base/log are orphans.
467        #[cfg(test)]
468        if matches!(stop, CheckpointStop::BeforeRotate) {
469            return Ok(());
470        }
471        // Re-open over the new generation, then (4) unlink the old base + log.
472        let reopened = Self::open(&self.root)?;
473        let old_generation = self.base_generation;
474        let policy = self.checkpoint_policy;
475        self.current = reopened.current;
476        self.base_generation = reopened.base_generation;
477        self.last_transaction_id = reopened.last_transaction_id;
478        // The reopen reset the policy to the default; restore the caller's.
479        self.checkpoint_policy = policy;
480        let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
481        let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
482        let _ = storage::sync_directory(&self.root);
483        Ok(())
484    }
485
486    /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
487    /// delta-log has grown too large relative to the base. Called after a dirty
488    /// commit publishes its frame. A failed fold is surfaced so the caller can
489    /// observe it; the committed data is already durable in the log regardless.
490    ///
491    /// # Errors
492    ///
493    /// Returns [`DbError`] when the triggered fold fails.
494    ///
495    /// # Performance
496    ///
497    /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
498    fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
499        let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
500        let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
501        if self
502            .checkpoint_policy
503            .should_checkpoint(log_bytes, base_bytes)
504        {
505            self.checkpoint()?;
506        }
507        Ok(())
508    }
509
510    /// Returns operational status for this handle, including the live generation
511    /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
512    /// weighs.
513    ///
514    /// # Performance
515    ///
516    /// This method is `O(visible state)` for the merged counts plus two `stat`
517    /// syscalls for the file sizes.
518    #[must_use]
519    pub fn stats(&self) -> Stats {
520        let view = self.current.view();
521        Stats {
522            visible_commit_seq: self.current.lsn(),
523            last_transaction_id: self.last_transaction_id,
524            live_generation: CheckpointGeneration::new(self.base_generation),
525            base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
526            log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
527            element_count: view.element_count(),
528            relation_count: view.relation_count(),
529            incidence_count: view.incidence_count(),
530            catalog: self.catalog_summary(),
531        }
532    }
533
534    /// Returns a catalog-size summary.
535    ///
536    /// # Performance
537    ///
538    /// This method is `O(catalog entry count)`.
539    #[must_use]
540    pub fn catalog_summary(&self) -> CatalogSummary {
541        CatalogSummary::from_catalog(self.current.view().catalog())
542    }
543
544    /// Starts a read transaction pinned to the current visible snapshot.
545    ///
546    /// # Performance
547    ///
548    /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
549    /// observes a fixed state even across later commits and checkpoints.
550    #[must_use]
551    pub fn reader(&self) -> Reader {
552        Reader {
553            snapshot: Arc::clone(&self.current),
554        }
555    }
556
557    /// Starts the single writer transaction, acquiring the cross-process writer
558    /// lock for the transaction's lifetime.
559    ///
560    /// # Errors
561    ///
562    /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
563    /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
564    ///
565    /// # Performance
566    ///
567    /// This method is `O(1)`: the writer layers a fresh empty write overlay over
568    /// the current snapshot.
569    pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
570        let lock = WriterLock::acquire(&self.root)?;
571        let transaction_id = self
572            .last_transaction_id
573            .checked_next()
574            .ok_or(DbError::TransactionIdOverflow)?;
575        // Burn the id eagerly so it is session-local-visible even on rollback;
576        // it only becomes durable when a dirty commit writes its frame, and a
577        // reopen recovers the durable high-water mark from the log.
578        self.last_transaction_id = transaction_id;
579        let parent = Arc::clone(&self.current);
580        // Seed the writer delta from the parent's published overlay so the
581        // writer reads every committed record; the parent overlay is never
582        // mutated (the seed clones its maps).
583        let delta = WriteOverlay::from_overlay(parent.overlay());
584        Ok(Writer {
585            database: self,
586            parent,
587            delta,
588            transaction_id,
589            lock,
590        })
591    }
592
593    /// Runs `f` against a read transaction pinned to the current snapshot. The
594    /// primary read entry point.
595    ///
596    /// # Errors
597    ///
598    /// Propagates whatever error `f` returns.
599    ///
600    /// # Performance
601    ///
602    /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
603    pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
604        f(&self.reader())
605    }
606
607    /// Runs `f` against the single write transaction, committing on `Ok` and
608    /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
609    /// value with the [`CommitOutcome`] (whether a durable frame landed). The
610    /// primary write entry point.
611    ///
612    /// # Errors
613    ///
614    /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock,
615    /// `f`'s error (after rolling back the staged delta), or a commit error.
616    ///
617    /// # Performance
618    ///
619    /// Begin is `O(1)`; commit is `O(change)`. A triggered auto-fold adds
620    /// `O(visible bytes)`.
621    pub fn write<R>(
622        &mut self,
623        f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
624    ) -> Result<(R, CommitOutcome), DbError> {
625        let mut writer = self.begin_write()?;
626        // On `Err` the `?` drops `writer` here, releasing the lock and discarding
627        // the staged delta — no frame is appended (rollback).
628        let value = f(&mut writer)?;
629        let committed = !writer.delta.is_empty();
630        let lsn = writer.commit()?;
631        let outcome = if committed {
632            CommitOutcome::Committed(lsn)
633        } else {
634            CommitOutcome::Empty
635        };
636        Ok((value, outcome))
637    }
638
639    /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
640    /// writing, returning the [`Bound`] handle bag (for a store already
641    /// bootstrapped with this schema).
642    ///
643    /// # Errors
644    ///
645    /// Returns [`DbError::UnknownName`] when a declared item is absent.
646    ///
647    /// # Performance
648    ///
649    /// This method is `O(declared items × log catalog)`.
650    pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
651        let view = self.current.view();
652        let catalog = view.catalog();
653        let mut bound = Bound::default();
654        for name in &schema.roles {
655            let id = catalog.role_id(name).ok_or_else(|| DbError::UnknownName {
656                kind: "role",
657                name: name.clone(),
658            })?;
659            bound.roles.insert(name.clone(), id);
660        }
661        for name in &schema.labels {
662            let id = catalog.label_id(name).ok_or_else(|| DbError::UnknownName {
663                kind: "label",
664                name: name.clone(),
665            })?;
666            bound.labels.insert(name.clone(), id);
667        }
668        for name in &schema.relation_types {
669            let id = catalog
670                .relation_type_id(name)
671                .ok_or_else(|| DbError::UnknownName {
672                    kind: "relation type",
673                    name: name.clone(),
674                })?;
675            bound.relation_types.insert(name.clone(), id);
676        }
677        for (name, _family, value_type) in &schema.keys {
678            let id = catalog
679                .property_key_id(name)
680                .ok_or_else(|| DbError::UnknownName {
681                    kind: "property key",
682                    name: name.clone(),
683                })?;
684            bound.keys.insert(name.clone(), (id, *value_type));
685        }
686        for (name, key_name) in &schema.equality_indexes {
687            let (_key_id, value_type) =
688                *bound
689                    .keys
690                    .get(key_name)
691                    .ok_or_else(|| DbError::UnknownName {
692                        kind: "property key",
693                        name: key_name.clone(),
694                    })?;
695            let id = catalog.index_id(name).ok_or_else(|| DbError::UnknownName {
696                kind: "index",
697                name: name.clone(),
698            })?;
699            bound
700                .equality_indexes
701                .insert(name.clone(), (id, value_type));
702        }
703        for spec in &schema.graph_projections {
704            let id = catalog
705                .projection_id(&spec.name)
706                .ok_or_else(|| DbError::UnknownName {
707                    kind: "projection",
708                    name: spec.name.clone(),
709                })?;
710            bound.projections.insert(spec.name.clone(), id);
711        }
712        Ok(bound)
713    }
714
715    /// Prepares a query against the current catalog.
716    ///
717    /// # Errors
718    ///
719    /// Returns [`DbError`] when parsing or semantic analysis fails.
720    ///
721    /// # Performance
722    ///
723    /// This method is `O(query length + catalog lookup cost)`.
724    pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
725        PreparedQuery::prepare(query, &self.current.view())
726    }
727}
728
729/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
730/// be stat'd (size is advisory — used for status reporting and the
731/// auto-checkpoint heuristic, never for correctness).
732///
733/// # Performance
734///
735/// This function is `O(1)`: one `stat` syscall.
736fn file_len(path: &Path) -> u64 {
737    std::fs::metadata(path).map_or(0, |meta| meta.len())
738}
739
740/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
741/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
742/// the recovered state at each crash window.
743///
744/// The crash-matrix test that constructs the non-`Complete` variants is
745/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
746/// miri's isolation cannot model), so under miri only `Complete` is constructed
747/// and the other variants are expectedly unused.
748///
749/// # Performance
750///
751/// `perf: unspecified`; a test-only control tag.
752#[cfg(test)]
753#[cfg_attr(
754    miri,
755    expect(
756        dead_code,
757        reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
758    )
759)]
760#[derive(Clone, Copy, Debug, Eq, PartialEq)]
761enum CheckpointStop {
762    /// Run the whole checkpoint (the production path).
763    Complete,
764    /// Stop after the new base + new log are durable, before the superblock is
765    /// published (the old superblock stays authoritative).
766    BeforeSuperblock,
767    /// Stop after the superblock names the new generation, before the old
768    /// base/log are unlinked (the new superblock is authoritative).
769    BeforeRotate,
770}
771
772/// Reads the whole delta-log into memory, treating a missing file as empty.
773///
774/// # Errors
775///
776/// Returns [`DbError::Io`] when the file cannot be read.
777///
778/// # Performance
779///
780/// This function is `O(log bytes)`.
781fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
782    match std::fs::read(path) {
783        Ok(bytes) => Ok(bytes),
784        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
785        Err(error) => Err(DbError::io("read delta-log", error)),
786    }
787}
788
789/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
790/// discarding a torn tail under the open path.
791///
792/// # Errors
793///
794/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
795///
796/// # Performance
797///
798/// This function is `O(1)`.
799fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
800    let file = std::fs::OpenOptions::new()
801        .write(true)
802        .open(path)
803        .map_err(|error| DbError::io("open delta-log for truncate", error))?;
804    let len = u64::try_from(len)
805        .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
806    file.set_len(len)
807        .map_err(|error| DbError::io("truncate delta-log", error))?;
808    file.sync_all()
809        .map_err(|error| DbError::io("sync truncated delta-log", error))
810}
811
812/// Creates an empty per-generation delta-log, fsyncing the file and the
813/// directory entry so the new (empty) log is durable.
814///
815/// # Errors
816///
817/// Returns [`DbError::Io`] when creation or syncing fails.
818///
819/// # Performance
820///
821/// This function is `O(1)`.
822fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
823    let path = root.join(delta_file(generation));
824    let file =
825        std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
826    file.sync_all()
827        .map_err(|error| DbError::io("sync delta-log", error))?;
828    storage::sync_directory(root)
829}
830
831/// Opens the live delta-log for appending (create when absent, read+append).
832///
833/// # Errors
834///
835/// Returns [`DbError::Io`] when the log cannot be opened.
836///
837/// # Performance
838///
839/// This function is `O(1)`.
840fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
841    std::fs::OpenOptions::new()
842        .create(true)
843        .truncate(false)
844        .read(true)
845        .append(true)
846        .open(root.join(delta_file(generation)))
847        .map_err(|error| DbError::io("open delta-log for append", error))
848}
849
850/// Writes the superblock naming `generation` with the given frontier stamps.
851///
852/// # Errors
853///
854/// Returns [`DbError::Io`] when publishing fails.
855///
856/// # Performance
857///
858/// This function is `O(1)`.
859fn write_superblock(
860    root: &Path,
861    generation: u64,
862    checkpoint_lsn: u64,
863    commit_seq: u64,
864    transaction_id: u64,
865) -> Result<(), DbError> {
866    wal::write_superblock(
867        root,
868        &SuperblockRecord {
869            magic: crate::wire::SUPERBLOCK_MAGIC,
870            base_generation: generation.into(),
871            checkpoint_lsn: checkpoint_lsn.into(),
872            log_byte_offset: 0u64.into(),
873            commit_seq: commit_seq.into(),
874            transaction_id: transaction_id.into(),
875            format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
876            flags: 0u32.into(),
877            crc32c: 0u32.into(),
878            pad: 0u32.into(),
879        },
880    )
881}
882
883/// Snapshot of database status.
884///
885/// # Performance
886///
887/// Copying and comparing status is `O(1)`.
888#[derive(Clone, Copy, Debug, Eq, PartialEq)]
889pub struct Stats {
890    /// Last visible commit sequence.
891    pub visible_commit_seq: CommitSeq,
892    /// Last writer transaction ID burned by this handle.
893    ///
894    /// This value is durable after a dirty commit and session-local after
895    /// rollback.
896    pub last_transaction_id: TransactionId,
897    /// Live base generation named by the superblock (the count of folds this
898    /// store has undergone; gen-0 is the freshly created store).
899    pub live_generation: CheckpointGeneration,
900    /// On-disk byte size of the live base file.
901    pub base_byte_size: u64,
902    /// On-disk byte size of the live delta-log (the tail recovery replays and
903    /// the auto-checkpoint policy weighs against the base size).
904    pub log_byte_size: u64,
905    /// Visible element count.
906    pub element_count: usize,
907    /// Visible relation count.
908    pub relation_count: usize,
909    /// Visible incidence count.
910    pub incidence_count: usize,
911    /// Catalog-size summary.
912    pub catalog: CatalogSummary,
913}
914
915/// Catalog-size summary.
916///
917/// # Performance
918///
919/// Copying and comparing are `O(1)`.
920#[derive(Clone, Copy, Debug, Eq, PartialEq)]
921pub struct CatalogSummary {
922    /// Role count.
923    pub role_count: usize,
924    /// Label count.
925    pub label_count: usize,
926    /// Relation type count.
927    pub relation_type_count: usize,
928    /// Property key count.
929    pub property_key_count: usize,
930    /// Projection count.
931    pub projection_count: usize,
932    /// Index count.
933    pub index_count: usize,
934}
935
936impl CatalogSummary {
937    /// Builds a summary from a catalog.
938    ///
939    /// # Performance
940    ///
941    /// This function is `O(catalog entry count)`.
942    #[must_use]
943    pub fn from_catalog(catalog: &Catalog) -> Self {
944        Self {
945            role_count: catalog.roles().count(),
946            label_count: catalog.labels().count(),
947            relation_type_count: catalog.relation_types().count(),
948            property_key_count: catalog.property_keys().count(),
949            projection_count: catalog.projections().count(),
950            index_count: catalog.indexes().count(),
951        }
952    }
953}
954
955/// Reader pin identifying the visible database generation.
956///
957/// # Performance
958///
959/// Copying and comparing a pin is `O(1)`.
960#[derive(Clone, Copy, Debug, Eq, PartialEq)]
961pub struct ReadPin {
962    /// Pinned visible commit sequence.
963    pub visible_commit_seq: CommitSeq,
964    /// Pinned checkpoint generation.
965    pub generation: CheckpointGeneration,
966}
967
968/// Read transaction over a pinned snapshot.
969///
970/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
971/// [`Db`], so it stays valid across a later `begin_write`/`checkpoint` on
972/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
973/// is [`Send`] + [`Sync`] (asserted below).
974///
975/// # Performance
976///
977/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
978/// snapshot through an `Arc`, not by copying.
979pub struct Reader {
980    /// The pinned snapshot this reader observes.
981    snapshot: Arc<Snapshot>,
982}
983
984/// Returns whether a [`Reader::neighbors`] walk should follow the edge from the
985/// incidence `from` (the queried element's incidence) to the incidence `to`
986/// (a candidate neighbor's incidence) under `direction`.
987///
988/// Endpoint roles are encoded by incidence-creation order: the source endpoint
989/// has the lower incidence id. `Outgoing` follows source→target (the queried
990/// element is the source, so `from < to`), `Incoming` follows target→source, and
991/// `Both` follows either side.
992///
993/// # Performance
994///
995/// This function is `O(1)`.
996const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
997    match direction {
998        Direction::Outgoing => from.get() < to.get(),
999        Direction::Incoming => from.get() > to.get(),
1000        Direction::Both => true,
1001    }
1002}
1003
1004/// `Reader` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
1005/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
1006const fn assert_send_sync<T: Send + Sync>() {}
1007const _: () = assert_send_sync::<Reader>();
1008const _: () = assert_send_sync::<Arc<Snapshot>>();
1009
1010impl Reader {
1011    /// Returns this transaction's reader pin.
1012    ///
1013    /// # Performance
1014    ///
1015    /// This method is `O(1)`.
1016    #[must_use]
1017    pub fn pin(&self) -> ReadPin {
1018        ReadPin {
1019            visible_commit_seq: self.snapshot.lsn(),
1020            generation: self.snapshot.generation(),
1021        }
1022    }
1023
1024    /// Returns catalog metadata.
1025    ///
1026    /// # Performance
1027    ///
1028    /// This method is `O(1)`.
1029    #[must_use]
1030    pub fn catalog(&self) -> &Catalog {
1031        self.snapshot.view().catalog_ref()
1032    }
1033
1034    /// Returns visible element count.
1035    ///
1036    /// # Performance
1037    ///
1038    /// This method is `O(base + overlay change)`.
1039    #[must_use]
1040    pub fn element_count(&self) -> usize {
1041        self.snapshot.view().element_count()
1042    }
1043
1044    /// Returns visible relation count.
1045    ///
1046    /// # Performance
1047    ///
1048    /// This method is `O(base + overlay change)`.
1049    #[must_use]
1050    pub fn relation_count(&self) -> usize {
1051        self.snapshot.view().relation_count()
1052    }
1053
1054    /// Returns visible incidence count.
1055    ///
1056    /// # Performance
1057    ///
1058    /// This method is `O(base + overlay change)`.
1059    #[must_use]
1060    pub fn incidence_count(&self) -> usize {
1061        self.snapshot.view().incidence_count()
1062    }
1063
1064    /// Returns every visible element id in id order.
1065    ///
1066    /// # Performance
1067    ///
1068    /// This method is `O(element count)`.
1069    #[must_use]
1070    pub fn element_ids(&self) -> Vec<ElementId> {
1071        self.snapshot
1072            .view()
1073            .elements()
1074            .map(|record| record.id)
1075            .collect()
1076    }
1077
1078    /// Returns every visible relation id in id order.
1079    ///
1080    /// # Performance
1081    ///
1082    /// This method is `O(relation count)`.
1083    #[must_use]
1084    pub fn relation_ids(&self) -> Vec<RelationId> {
1085        self.snapshot
1086            .view()
1087            .relations()
1088            .map(|record| record.id)
1089            .collect()
1090    }
1091
1092    /// Returns whether an element exists.
1093    ///
1094    /// # Performance
1095    ///
1096    /// This method is `O(log change + log n)`.
1097    #[must_use]
1098    pub fn contains_element(&self, id: ElementId) -> bool {
1099        self.snapshot.view().contains_element(id)
1100    }
1101
1102    /// Returns whether a relation exists.
1103    ///
1104    /// # Performance
1105    ///
1106    /// This method is `O(log change + log n)`.
1107    #[must_use]
1108    pub fn contains_relation(&self, id: RelationId) -> bool {
1109        self.snapshot.view().contains_relation(id)
1110    }
1111
1112    /// Returns whether an incidence exists.
1113    ///
1114    /// # Performance
1115    ///
1116    /// This method is `O(log change + log n)`.
1117    #[must_use]
1118    pub fn contains_incidence(&self, id: IncidenceId) -> bool {
1119        self.snapshot.view().contains_incidence(id)
1120    }
1121
1122    /// Returns an owned element view — id, labels, and all properties read in one
1123    /// call.
1124    ///
1125    /// # Performance
1126    ///
1127    /// This method is `O(log n + label count + property count)`.
1128    #[must_use]
1129    pub fn element(&self, id: ElementId) -> Option<Element> {
1130        let view = self.snapshot.view();
1131        let record = view.element_ref(id)?;
1132        let labels = record.labels.iter().copied().collect();
1133        let properties =
1134            Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
1135        Some(Element::new(id, labels, properties))
1136    }
1137
1138    /// Returns an owned relation view — id, type, labels, and all properties read
1139    /// in one call.
1140    ///
1141    /// # Performance
1142    ///
1143    /// This method is `O(log n + label count + property count)`.
1144    #[must_use]
1145    pub fn relation(&self, id: RelationId) -> Option<Relation> {
1146        let view = self.snapshot.view();
1147        let record = view.relation_ref(id)?;
1148        let labels = record.labels.iter().copied().collect();
1149        let properties =
1150            Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
1151        Some(Relation::new(id, record.relation_type, labels, properties))
1152    }
1153
1154    /// Returns an owned incidence record.
1155    ///
1156    /// # Performance
1157    ///
1158    /// This method is `O(log change + log n)`.
1159    #[must_use]
1160    pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
1161        self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
1162    }
1163
1164    /// Returns every visible incidence attached to an element, in ascending
1165    /// incidence-id order.
1166    ///
1167    /// The merged set mixes overlay-owned and base-borrowed records, so this
1168    /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
1169    /// cheap).
1170    ///
1171    /// # Performance
1172    ///
1173    /// This method is `O(base incidences + overlay incidence change)`.
1174    #[must_use]
1175    pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1176        self.snapshot.view().element_incidences(id)
1177    }
1178
1179    /// Returns a binary relation's two endpoint elements, ordered by ascending
1180    /// incidence id.
1181    ///
1182    /// Reads the relation's incidences from the reverse-adjacency index and
1183    /// returns the elements carried by its first two incidences in id order. A
1184    /// relation with fewer than two visible incidences returns `None`. This
1185    /// reports endpoints structurally, without consulting any projection's
1186    /// source/target roles — use [`Self::neighbors`] when role direction matters.
1187    ///
1188    /// # Performance
1189    ///
1190    /// This method is `O(degree)` over the relation's incidences.
1191    #[must_use]
1192    pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
1193        let incidences = self.snapshot.view().relation_incidences(relation);
1194        match incidences.as_slice() {
1195            [first, second, ..] => Some((first.element, second.element)),
1196            _too_few => None,
1197        }
1198    }
1199
1200    /// Returns the elements reachable from `element` along relations of
1201    /// `relation_type`, in ascending element-id order.
1202    ///
1203    /// Direction selects the role `element` must play on each relation. Endpoint
1204    /// roles are encoded by incidence-creation order: a binary relation's source
1205    /// is its lower incidence id and its target the higher (see
1206    /// [`Self::endpoints`]). `Outgoing` requires `element` to be the source (and
1207    /// yields the target), `Incoming` requires it to be the target (and yields
1208    /// the source), and `Both` yields the opposite endpoint either way. Resolved
1209    /// over the reverse-adjacency index — each incidence of `element` whose
1210    /// relation has the requested type contributes that relation's other
1211    /// endpoint — so this works for any binary relation without a materialized
1212    /// projection.
1213    ///
1214    /// # Performance
1215    ///
1216    /// This method is `O(degree of element + sum of touched relation degrees)`.
1217    #[must_use]
1218    pub fn neighbors(
1219        &self,
1220        element: ElementId,
1221        relation_type: RelationTypeId,
1222        direction: Direction,
1223    ) -> Vec<ElementId> {
1224        let view = self.snapshot.view();
1225        let mut neighbors = BTreeSet::new();
1226        for incidence in view.element_incidences(element) {
1227            let matches_type = view
1228                .relation_ref(incidence.relation)
1229                .is_some_and(|record| record.relation_type == Some(relation_type));
1230            if !matches_type {
1231                continue;
1232            }
1233            // The incidence id encodes the endpoint role: the source endpoint is
1234            // created first (lower incidence id), the target second. Compare
1235            // `element`'s incidence id against each other endpoint's to decide
1236            // which side `element` is on, then follow per the requested direction.
1237            neighbors.extend(
1238                view.relation_incidences(incidence.relation)
1239                    .into_iter()
1240                    .filter(|other| other.element != element)
1241                    .filter(|other| follow_direction(direction, incidence.id, other.id))
1242                    .map(|other| other.element),
1243            );
1244        }
1245        neighbors.into_iter().collect()
1246    }
1247
1248    /// Returns one owned property value.
1249    ///
1250    /// # Performance
1251    ///
1252    /// This method is `O(log subjects + log keys)`.
1253    #[must_use]
1254    pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
1255        self.snapshot
1256            .view()
1257            .property_ref(subject, key)
1258            .map(Cow::into_owned)
1259    }
1260
1261    /// Returns the owned element whose value in `index` equals `value`, or `None`
1262    /// when no element matches.
1263    ///
1264    /// # Errors
1265    ///
1266    /// Returns [`DbError`] when the index is unknown or is not an equality index.
1267    ///
1268    /// # Performance
1269    ///
1270    /// This method is `O(log n + label count + property count)`.
1271    pub fn element_by_key<T: ValueType>(
1272        &self,
1273        index: EqualityIndex<T>,
1274        value: impl Assignable<T>,
1275    ) -> Result<Option<Element>, DbError> {
1276        let value = value.into_value()?;
1277        let matched = self
1278            .lookup(index.id(), Match::Equal(&value))?
1279            .into_iter()
1280            .find_map(|subject| match subject {
1281                PropertySubject::Element(id) => Some(id),
1282                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
1283            });
1284        Ok(matched.and_then(|id| self.element(id)))
1285    }
1286
1287    /// Returns the number of subjects carried by a membership index (a label or
1288    /// relation-type index).
1289    ///
1290    /// # Errors
1291    ///
1292    /// Returns [`DbError`] when the index is unknown or does not support
1293    /// membership enumeration.
1294    ///
1295    /// # Performance
1296    ///
1297    /// This method is `O(indexed family size)`.
1298    pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
1299        self.lookup(index, Match::All)
1300            .map(|subjects| subjects.len())
1301    }
1302
1303    /// Looks up subjects with a property value.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Returns [`DbError`] when the property key is unknown or `value` does not
1308    /// match the key schema.
1309    ///
1310    /// # Performance
1311    ///
1312    /// This method is `O(property subject count)`.
1313    pub fn lookup_property_equal(
1314        &self,
1315        key: PropertyKeyId,
1316        value: &PropertyValue,
1317    ) -> Result<Vec<PropertySubject>, DbError> {
1318        self.snapshot.view().typed_property_equal(key, value)
1319    }
1320
1321    /// Looks up subjects with a property inside an inclusive range.
1322    ///
1323    /// # Errors
1324    ///
1325    /// Returns [`DbError`] when the property key is unknown or either bound
1326    /// does not match the key schema.
1327    ///
1328    /// # Performance
1329    ///
1330    /// This method is `O(property subject count)`.
1331    pub fn lookup_property_range(
1332        &self,
1333        key: PropertyKeyId,
1334        min: &PropertyValue,
1335        max: &PropertyValue,
1336    ) -> Result<Vec<PropertySubject>, DbError> {
1337        self.snapshot.view().typed_property_range(key, min, max)
1338    }
1339
1340    /// Executes an index lookup.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1345    /// match the index kind, or supplied property values do not match catalog
1346    /// schemas.
1347    ///
1348    /// # Performance
1349    ///
1350    /// This method is `O(indexed family size)`.
1351    pub fn lookup(
1352        &self,
1353        index: IndexId,
1354        lookup: Match<'_>,
1355    ) -> Result<Vec<PropertySubject>, DbError> {
1356        let view = self.snapshot.view();
1357        let entry = view
1358            .catalog()
1359            .index(index)
1360            .ok_or(DbError::UnknownIndex { id: index })?;
1361        match (&entry.definition, lookup) {
1362            (IndexDefinition::Label { label }, Match::All) => Ok(view
1363                .elements_with_label(*label)
1364                .into_iter()
1365                .map(PropertySubject::Element)
1366                .collect()),
1367            (IndexDefinition::Label { .. }, _lookup) => {
1368                Err(DbError::unsupported("label index expects all lookup"))
1369            }
1370            (IndexDefinition::RelationType { relation_type }, Match::All) => Ok(view
1371                .relations_with_type(*relation_type)
1372                .into_iter()
1373                .map(PropertySubject::Relation)
1374                .collect()),
1375            (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1376                "relation type index expects all lookup",
1377            )),
1378            (IndexDefinition::PropertyEquality { key }, Match::Equal(value)) => {
1379                view.typed_property_equal(*key, value)
1380            }
1381            (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1382                "property equality index expects equality lookup",
1383            )),
1384            (IndexDefinition::PropertyRange { key }, Match::Range { min, max }) => {
1385                view.typed_property_range(*key, min, max)
1386            }
1387            (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1388                "property range index expects range lookup",
1389            )),
1390            (IndexDefinition::CompositeEquality { keys }, Match::Composite(values)) => {
1391                view.typed_property_composite_equal(keys, values)
1392            }
1393            (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1394                "composite equality index expects composite equality lookup",
1395            )),
1396            (IndexDefinition::Projection { projection }, Match::All) => {
1397                self.projection_index_subjects(*projection)
1398            }
1399            (IndexDefinition::Projection { .. }, _lookup) => {
1400                Err(DbError::unsupported("projection index expects all lookup"))
1401            }
1402        }
1403    }
1404
1405    /// Materializes a graph projection.
1406    ///
1407    /// # Errors
1408    ///
1409    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1410    /// fails validation against current topology.
1411    ///
1412    /// # Performance
1413    ///
1414    /// This method is `O(relation count * incidence count)`.
1415    pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1416        let view = self.snapshot.view();
1417        let entry = view
1418            .catalog()
1419            .projection(id)
1420            .ok_or(DbError::UnknownProjection { id })?;
1421        match &entry.definition {
1422            ProjectionDefinition::Graph(definition) => {
1423                projection::GraphProjection::from_state(&view, definition.clone())
1424            }
1425            ProjectionDefinition::Hypergraph(_definition) => {
1426                Err(DbError::invalid_projection("projection is not a graph"))
1427            }
1428        }
1429    }
1430
1431    /// Materializes a graph projection by catalog name.
1432    ///
1433    /// # Errors
1434    ///
1435    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1436    /// fails validation against current topology.
1437    ///
1438    /// # Performance
1439    ///
1440    /// This method is `O(log projection count + relation count * incidence count)`.
1441    pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1442        let id = self
1443            .snapshot
1444            .view()
1445            .catalog()
1446            .projection_id(name)
1447            .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1448        self.graph_projection(id)
1449    }
1450
1451    /// Walks a cataloged graph projection from canonical seed elements,
1452    /// returning the discovered nodes AND the projection edges among them.
1453    ///
1454    /// Nodes are unique canonical elements in BFS first-discovery order; depth is
1455    /// the shortest discovered hop count from any seed. Edges connect two
1456    /// discovered nodes, ordered deterministically and unique by relation, so the
1457    /// [`Subgraph`] never references a node it omitted.
1458    ///
1459    /// # Errors
1460    ///
1461    /// Returns [`DbError`] when the projection is unknown, is not a graph,
1462    /// cannot be materialized, or a seed element is not part of the projection.
1463    ///
1464    /// # Performance
1465    ///
1466    /// This method is `O(relation count * incidence count + visited edges)`.
1467    pub fn walk(
1468        &self,
1469        projection: ProjectionId,
1470        seeds: &[ElementId],
1471        walk: Walk,
1472    ) -> Result<Subgraph, DbError> {
1473        if seeds.is_empty() || walk.limit == 0 {
1474            return Ok(Subgraph::default());
1475        }
1476        let graph = self.graph_projection(projection)?;
1477        traversal::walk_graph_projection(&graph, seeds, walk)
1478    }
1479
1480    /// Materializes a hypergraph projection.
1481    ///
1482    /// # Errors
1483    ///
1484    /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1485    /// or fails validation against current topology.
1486    ///
1487    /// # Performance
1488    ///
1489    /// This method is `O(relation count * incidence count)`.
1490    pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1491        let view = self.snapshot.view();
1492        let entry = view
1493            .catalog()
1494            .projection(id)
1495            .ok_or(DbError::UnknownProjection { id })?;
1496        match &entry.definition {
1497            ProjectionDefinition::Hypergraph(definition) => {
1498                projection::HypergraphProjection::from_state(&view, definition.clone())
1499            }
1500            ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1501                "projection is not a hypergraph",
1502            )),
1503        }
1504    }
1505
1506    /// Executes a prepared query.
1507    ///
1508    /// # Errors
1509    ///
1510    /// Returns [`DbError`] when execution cannot materialize a referenced
1511    /// projection.
1512    ///
1513    /// # Performance
1514    ///
1515    /// This method is `O(plan output + projection build cost when used)`.
1516    pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1517        query.execute(&self.snapshot.view())
1518    }
1519
1520    /// Explains a prepared query.
1521    ///
1522    /// # Performance
1523    ///
1524    /// This method is `O(plan size)`.
1525    #[must_use]
1526    pub fn explain(&self, query: &PreparedQuery) -> String {
1527        query.explain()
1528    }
1529
1530    /// Materializes subjects represented by a projection index.
1531    ///
1532    /// # Errors
1533    ///
1534    /// Returns [`DbError`] when the projection is unknown or cannot be
1535    /// materialized.
1536    ///
1537    /// # Performance
1538    ///
1539    /// This method is `O(relation count * incidence count)`.
1540    fn projection_index_subjects(
1541        &self,
1542        projection: ProjectionId,
1543    ) -> Result<Vec<PropertySubject>, DbError> {
1544        let view = self.snapshot.view();
1545        let entry = view
1546            .catalog()
1547            .projection(projection)
1548            .ok_or(DbError::UnknownProjection { id: projection })?;
1549        match &entry.definition {
1550            ProjectionDefinition::Graph(definition) => {
1551                Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1552            }
1553            ProjectionDefinition::Hypergraph(definition) => Ok(
1554                projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1555            ),
1556        }
1557    }
1558}
1559
1560/// Single writer transaction.
1561///
1562/// Mutations accumulate into a private write overlay layered over the parent
1563/// snapshot; reads fall through the overlay then the base. `commit` appends the
1564/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1565/// `rollback` drops the overlay and appends nothing.
1566///
1567/// # Performance
1568///
1569/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1570pub struct Writer<'db> {
1571    /// Db receiving the commit.
1572    database: &'db mut Db,
1573    /// Parent snapshot the writer layers over (its base + frozen overlay).
1574    parent: Arc<Snapshot>,
1575    /// Private mutable delta this writer accumulates.
1576    delta: WriteOverlay,
1577    /// Writer transaction id (session-local until a dirty commit makes it
1578    /// durable).
1579    transaction_id: TransactionId,
1580    /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1581    /// transaction ends (on `rollback`, or on any early-return error path); a
1582    /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1583    /// triggered auto-checkpoint can re-acquire it.
1584    lock: WriterLock,
1585}
1586
1587impl Writer<'_> {
1588    /// Registers a structural incidence role.
1589    ///
1590    /// # Errors
1591    ///
1592    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1593    ///
1594    /// # Performance
1595    ///
1596    /// This method is `O(log role count + name length)`.
1597    pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1598        self.delta.register_role(name.into())
1599    }
1600
1601    /// Registers an element or relation label.
1602    ///
1603    /// # Errors
1604    ///
1605    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1606    ///
1607    /// # Performance
1608    ///
1609    /// This method is `O(log label count + name length)`.
1610    pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1611        self.delta.register_label(name.into())
1612    }
1613
1614    /// Registers a relation type.
1615    ///
1616    /// # Errors
1617    ///
1618    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1619    ///
1620    /// # Performance
1621    ///
1622    /// This method is `O(log relation type count + name length)`.
1623    pub fn register_relation_type(
1624        &mut self,
1625        name: impl Into<String>,
1626    ) -> Result<RelationTypeId, DbError> {
1627        self.delta.register_relation_type(name.into())
1628    }
1629
1630    /// Registers a typed property key.
1631    ///
1632    /// # Errors
1633    ///
1634    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1635    ///
1636    /// # Performance
1637    ///
1638    /// This method is `O(log property key count + name length)`.
1639    pub fn register_property_key(
1640        &mut self,
1641        name: impl Into<String>,
1642        family: PropertyFamily,
1643        value_type: PropertyType,
1644    ) -> Result<PropertyKeyId, DbError> {
1645        self.delta
1646            .register_property_key(name.into(), family, value_type)
1647    }
1648
1649    /// Defines a physical projection.
1650    ///
1651    /// # Errors
1652    ///
1653    /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1654    /// projection name already exists, or ID allocation fails.
1655    ///
1656    /// # Performance
1657    ///
1658    /// This method is `O(definition size + catalog lookup cost)`.
1659    pub fn define_projection(
1660        &mut self,
1661        definition: ProjectionDefinition,
1662    ) -> Result<ProjectionId, DbError> {
1663        self.validate_projection_definition(&definition)?;
1664        self.delta.register_projection(definition)
1665    }
1666
1667    /// Defines an index.
1668    ///
1669    /// # Errors
1670    ///
1671    /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1672    /// name already exists, or ID allocation fails.
1673    ///
1674    /// # Performance
1675    ///
1676    /// This method is `O(definition size + catalog lookup cost)`.
1677    pub fn define_index(
1678        &mut self,
1679        name: impl Into<String>,
1680        definition: IndexDefinition,
1681    ) -> Result<IndexId, DbError> {
1682        self.validate_index_definition(&definition)?;
1683        self.delta.register_index(name.into(), definition)
1684    }
1685
1686    /// Applies a declarative [`Schema`] idempotently (register-or-get every
1687    /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
1688    /// the same schema reuses existing ids; a name that already exists with a
1689    /// conflicting shape is a [`DbError::SchemaConflict`].
1690    ///
1691    /// # Errors
1692    ///
1693    /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
1694    /// index's key, a projection's role/type), or id-allocation failure.
1695    ///
1696    /// # Performance
1697    ///
1698    /// This method is `O(declared items × log catalog)`.
1699    pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
1700        let mut bound = Bound::default();
1701        for name in &schema.roles {
1702            let id = match self.merged().catalog().role_id(name) {
1703                Some(id) => id,
1704                None => self.register_role(name.clone())?,
1705            };
1706            bound.roles.insert(name.clone(), id);
1707        }
1708        for name in &schema.labels {
1709            let id = match self.merged().catalog().label_id(name) {
1710                Some(id) => id,
1711                None => self.register_label(name.clone())?,
1712            };
1713            bound.labels.insert(name.clone(), id);
1714        }
1715        for name in &schema.relation_types {
1716            let id = match self.merged().catalog().relation_type_id(name) {
1717                Some(id) => id,
1718                None => self.register_relation_type(name.clone())?,
1719            };
1720            bound.relation_types.insert(name.clone(), id);
1721        }
1722        for (name, family, value_type) in &schema.keys {
1723            let id = self.register_key_or_get(name, *family, *value_type)?;
1724            bound.keys.insert(name.clone(), (id, *value_type));
1725        }
1726        for (name, key_name) in &schema.equality_indexes {
1727            let (key_id, value_type) =
1728                *bound
1729                    .keys
1730                    .get(key_name)
1731                    .ok_or_else(|| DbError::UnknownName {
1732                        kind: "property key",
1733                        name: key_name.clone(),
1734                    })?;
1735            let id = match self.merged().catalog().index_id(name) {
1736                Some(id) => id,
1737                None => self.define_index(
1738                    name.clone(),
1739                    IndexDefinition::PropertyEquality { key: key_id },
1740                )?,
1741            };
1742            bound
1743                .equality_indexes
1744                .insert(name.clone(), (id, value_type));
1745        }
1746        for spec in &schema.graph_projections {
1747            let id = match self.merged().catalog().projection_id(&spec.name) {
1748                Some(id) => id,
1749                None => self.define_graph_projection(spec, &bound)?,
1750            };
1751            bound.projections.insert(spec.name.clone(), id);
1752        }
1753        Ok(bound)
1754    }
1755
1756    /// Registers a property key, or returns the existing id when the name is
1757    /// already present with a matching family and value type.
1758    ///
1759    /// # Errors
1760    ///
1761    /// Returns [`DbError::SchemaConflict`] when the name exists with a different
1762    /// family or value type.
1763    ///
1764    /// # Performance
1765    ///
1766    /// This method is `O(log catalog)`.
1767    fn register_key_or_get(
1768        &mut self,
1769        name: &str,
1770        family: PropertyFamily,
1771        value_type: PropertyType,
1772    ) -> Result<PropertyKeyId, DbError> {
1773        let Some(existing) = self.merged().catalog().property_key_id(name) else {
1774            return self.register_property_key(name.to_owned(), family, value_type);
1775        };
1776        let matches = self
1777            .merged()
1778            .catalog()
1779            .property_key(existing)
1780            .is_some_and(|def| def.family == family && def.value_type == value_type);
1781        if matches {
1782            Ok(existing)
1783        } else {
1784            Err(DbError::SchemaConflict {
1785                name: name.to_owned(),
1786                reason: "property key family/value type differs from the existing catalog entry",
1787            })
1788        }
1789    }
1790
1791    /// Defines a graph projection from a spec, resolving its relation-type and
1792    /// role names through `bound`.
1793    ///
1794    /// # Errors
1795    ///
1796    /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
1797    /// a definition error.
1798    ///
1799    /// # Performance
1800    ///
1801    /// This method is `O(relation-type count × log catalog)`.
1802    fn define_graph_projection(
1803        &mut self,
1804        spec: &GraphProjectionSpec,
1805        bound: &Bound,
1806    ) -> Result<ProjectionId, DbError> {
1807        let mut relation_types = BTreeSet::new();
1808        for name in &spec.relation_types {
1809            relation_types.insert(bound.relation_type(name)?);
1810        }
1811        let source_role = bound.role(&spec.source_role)?;
1812        let target_role = bound.role(&spec.target_role)?;
1813        self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
1814            name: spec.name.clone(),
1815            relation_types,
1816            source_role,
1817            target_role,
1818        }))
1819    }
1820
1821    /// Creates a canonical element.
1822    ///
1823    /// # Errors
1824    ///
1825    /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1826    ///
1827    /// # Performance
1828    ///
1829    /// This method is `O(log element change)`.
1830    pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1831        self.delta.create_element()
1832    }
1833
1834    /// Creates a canonical relation.
1835    ///
1836    /// # Errors
1837    ///
1838    /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1839    ///
1840    /// # Performance
1841    ///
1842    /// This method is `O(log relation change)`.
1843    pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1844        self.delta.create_relation()
1845    }
1846
1847    /// Creates a canonical incidence.
1848    ///
1849    /// # Errors
1850    ///
1851    /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1852    /// exhausted.
1853    ///
1854    /// # Performance
1855    ///
1856    /// This method is `O(log incidence change + reference lookup cost)`.
1857    pub fn create_incidence(
1858        &mut self,
1859        relation: RelationId,
1860        element: ElementId,
1861        role: RoleId,
1862    ) -> Result<IncidenceId, DbError> {
1863        self.require_relation(relation)?;
1864        self.require_element(element)?;
1865        self.require_role(role)?;
1866        self.delta.create_incidence(relation, element, role)
1867    }
1868
1869    /// Tombstones a canonical element and its incidences.
1870    ///
1871    /// # Errors
1872    ///
1873    /// Returns [`DbError::UnknownElement`] when the element is not visible.
1874    ///
1875    /// # Performance
1876    ///
1877    /// This method is `O(log n + degree)` via the reverse-adjacency index.
1878    pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
1879        self.require_element(id)?;
1880        // Cascade: every incidence on the element — resolved in O(log n + degree)
1881        // through the reverse-adjacency index, not a full incidence scan — is
1882        // tombstoned too.
1883        let incidences: Vec<IncidenceId> = self
1884            .merged()
1885            .element_incidences(id)
1886            .into_iter()
1887            .map(|record| record.id)
1888            .collect();
1889        let base = self.parent.base_records();
1890        self.delta.tombstone_element(base, id);
1891        for incidence in incidences {
1892            self.delta
1893                .tombstone_incidence(self.parent.base_records(), incidence);
1894        }
1895        Ok(())
1896    }
1897
1898    /// Tombstones a canonical relation and its incidences.
1899    ///
1900    /// # Errors
1901    ///
1902    /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
1903    ///
1904    /// # Performance
1905    ///
1906    /// This method is `O(log n + degree)` via the reverse-adjacency index.
1907    pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
1908        self.require_relation(id)?;
1909        // Cascade: every incidence in the relation — resolved in O(log n + degree)
1910        // through the reverse-adjacency index, not a full incidence scan.
1911        let incidences: Vec<IncidenceId> = self
1912            .merged()
1913            .relation_incidences(id)
1914            .into_iter()
1915            .map(|record| record.id)
1916            .collect();
1917        let base = self.parent.base_records();
1918        self.delta.tombstone_relation(base, id);
1919        for incidence in incidences {
1920            self.delta
1921                .tombstone_incidence(self.parent.base_records(), incidence);
1922        }
1923        Ok(())
1924    }
1925
1926    /// Tombstones a canonical incidence.
1927    ///
1928    /// # Errors
1929    ///
1930    /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
1931    ///
1932    /// # Performance
1933    ///
1934    /// This method is `O(log incidence change)`.
1935    pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
1936        self.require_incidence(id)?;
1937        self.delta
1938            .tombstone_incidence(self.parent.base_records(), id);
1939        Ok(())
1940    }
1941
1942    /// Adds a label to an element.
1943    ///
1944    /// # Errors
1945    ///
1946    /// Returns [`DbError`] when the element or label is unknown.
1947    ///
1948    /// # Performance
1949    ///
1950    /// This method is `O(log element change + log label count)`.
1951    pub(crate) fn add_element_label(
1952        &mut self,
1953        element: ElementId,
1954        label: LabelId,
1955    ) -> Result<(), DbError> {
1956        self.require_element(element)?;
1957        self.require_label(label)?;
1958        self.delta
1959            .add_element_label(self.parent.base_records(), element, label);
1960        Ok(())
1961    }
1962
1963    /// Adds a label to a relation.
1964    ///
1965    /// # Errors
1966    ///
1967    /// Returns [`DbError`] when the relation or label is unknown.
1968    ///
1969    /// # Performance
1970    ///
1971    /// This method is `O(log relation change + log label count)`.
1972    pub(crate) fn add_relation_label(
1973        &mut self,
1974        relation: RelationId,
1975        label: LabelId,
1976    ) -> Result<(), DbError> {
1977        self.require_relation(relation)?;
1978        self.require_label(label)?;
1979        self.delta
1980            .add_relation_label(self.parent.base_records(), relation, label);
1981        Ok(())
1982    }
1983
1984    /// Sets a relation type.
1985    ///
1986    /// # Errors
1987    ///
1988    /// Returns [`DbError`] when the relation or relation type is unknown.
1989    ///
1990    /// # Performance
1991    ///
1992    /// This method is `O(log relation change + log relation type count)`.
1993    pub fn set_relation_type(
1994        &mut self,
1995        relation: RelationId,
1996        relation_type: RelationTypeId,
1997    ) -> Result<(), DbError> {
1998        self.require_relation(relation)?;
1999        self.require_relation_type(relation_type)?;
2000        self.delta
2001            .set_relation_type(self.parent.base_records(), relation, relation_type);
2002        Ok(())
2003    }
2004
2005    /// Sets a property value.
2006    ///
2007    /// # Errors
2008    ///
2009    /// Returns [`DbError`] when the subject or key is unknown, or the value
2010    /// does not match the key schema.
2011    ///
2012    /// # Performance
2013    ///
2014    /// This method is `O(log subject change + log key count)`.
2015    pub(crate) fn set_property(
2016        &mut self,
2017        subject: PropertySubject,
2018        key: PropertyKeyId,
2019        value: PropertyValue,
2020    ) -> Result<(), DbError> {
2021        // Referential integrity: the subject must be visible (this rejects an
2022        // orphan property against a tombstoned/absent subject at the transaction
2023        // boundary — the overlay layer is permissive by design).
2024        self.require_subject(subject)?;
2025        let definition = self
2026            .merged()
2027            .catalog()
2028            .property_key(key)
2029            .cloned()
2030            .ok_or(DbError::UnknownPropertyKey { id: key })?;
2031        if definition.family != subject.family() {
2032            return Err(DbError::WrongPropertyFamily {
2033                expected: definition.family,
2034                actual: subject.family(),
2035            });
2036        }
2037        if definition.value_type != value.value_type() {
2038            return Err(DbError::PropertyTypeMismatch {
2039                expected: definition.value_type,
2040                actual: value.value_type(),
2041            });
2042        }
2043        self.delta
2044            .set_property(self.parent.base_records(), subject, key, value);
2045        Ok(())
2046    }
2047
2048    /// Removes a property value.
2049    ///
2050    /// # Errors
2051    ///
2052    /// Returns [`DbError`] when the subject or key is unknown.
2053    ///
2054    /// # Performance
2055    ///
2056    /// This method is `O(log subject change + log key count)`.
2057    pub(crate) fn remove_property(
2058        &mut self,
2059        subject: PropertySubject,
2060        key: PropertyKeyId,
2061    ) -> Result<(), DbError> {
2062        self.require_subject(subject)?;
2063        if self.merged().catalog().property_key(key).is_none() {
2064            return Err(DbError::UnknownPropertyKey { id: key });
2065        }
2066        self.delta
2067            .remove_property(self.parent.base_records(), subject, key);
2068        Ok(())
2069    }
2070
2071    /// Resolves the property key an equality index covers.
2072    ///
2073    /// # Errors
2074    ///
2075    /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
2076    /// unsupported-query error when it is not a property-equality index.
2077    ///
2078    /// # Performance
2079    ///
2080    /// This method is `O(log index count)`.
2081    fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
2082        let view = self.merged();
2083        let entry = view
2084            .catalog()
2085            .index(index)
2086            .ok_or(DbError::UnknownIndex { id: index })?;
2087        match &entry.definition {
2088            IndexDefinition::PropertyEquality { key } => Ok(*key),
2089            _other => Err(DbError::unsupported(
2090                "reconcile requires a property-equality index",
2091            )),
2092        }
2093    }
2094
2095    /// Inserts or updates the element whose value under `index` equals `value`,
2096    /// returning its canonical id — reused when an element already carries that
2097    /// identity value (id stable across reconcile), freshly minted (a never-reused
2098    /// id, with the identity property set) otherwise.
2099    ///
2100    /// # Errors
2101    ///
2102    /// Returns [`DbError`] when `index` is not an equality index or the value
2103    /// type mismatches the key schema.
2104    ///
2105    /// # Performance
2106    ///
2107    /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
2108    pub fn upsert_element<T: ValueType>(
2109        &mut self,
2110        index: EqualityIndex<T>,
2111        value: impl Assignable<T>,
2112    ) -> Result<ElementId, DbError> {
2113        let value = value.into_value()?;
2114        let key = self.equality_index_key(index.id())?;
2115        let existing = self
2116            .merged()
2117            .property_equal(key, &value)
2118            .into_iter()
2119            .find_map(|subject| match subject {
2120                PropertySubject::Element(id) => Some(id),
2121                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
2122            });
2123        if let Some(id) = existing {
2124            return Ok(id);
2125        }
2126        let element = self.create_element()?;
2127        self.set_property(PropertySubject::Element(element), key, value)?;
2128        Ok(element)
2129    }
2130
2131    /// Inserts or updates the relation whose value under `index` equals `value`,
2132    /// returning its canonical id. On a miss it mints the relation, sets its type
2133    /// and identity property, and creates one incidence per `(element, role)`
2134    /// endpoint; on a hit the existing relation (with its endpoints) is reused
2135    /// unchanged — the identity value encodes the endpoints, so they are immutable.
2136    ///
2137    /// # Errors
2138    ///
2139    /// Returns [`DbError`] when `index` is not an equality index, the value type
2140    /// mismatches, or an endpoint element does not exist.
2141    ///
2142    /// # Performance
2143    ///
2144    /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
2145    pub fn upsert_relation<T: ValueType>(
2146        &mut self,
2147        index: EqualityIndex<T>,
2148        value: impl Assignable<T>,
2149        relation_type: RelationTypeId,
2150        endpoints: &[(ElementId, RoleId)],
2151    ) -> Result<RelationId, DbError> {
2152        let value = value.into_value()?;
2153        let key = self.equality_index_key(index.id())?;
2154        let existing = self
2155            .merged()
2156            .property_equal(key, &value)
2157            .into_iter()
2158            .find_map(|subject| match subject {
2159                PropertySubject::Relation(id) => Some(id),
2160                PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
2161            });
2162        if let Some(id) = existing {
2163            return Ok(id);
2164        }
2165        let relation = self.create_relation()?;
2166        self.set_relation_type(relation, relation_type)?;
2167        self.set_property(PropertySubject::Relation(relation), key, value)?;
2168        for (element, role) in endpoints {
2169            self.create_incidence(relation, *element, *role)?;
2170        }
2171        Ok(relation)
2172    }
2173
2174    /// Tombstones every subject carried by `index` whose identity value is NOT in
2175    /// `keep`, cascading each subject's incidences in `O(degree)` via the
2176    /// reverse-adjacency index. The prune half of a reconcile: after upserting
2177    /// every desired subject, `retain` removes the vanished complement.
2178    ///
2179    /// # Errors
2180    ///
2181    /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
2182    /// type mismatches the key schema.
2183    ///
2184    /// # Performance
2185    ///
2186    /// This method is `O(family size + removed × degree)`.
2187    pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
2188        &mut self,
2189        index: EqualityIndex<T>,
2190        keep: &[V],
2191    ) -> Result<(), DbError> {
2192        let key = self.equality_index_key(index.id())?;
2193        let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
2194        for value in keep {
2195            keep_values.insert((*value).into_value()?);
2196        }
2197        let stale: Vec<PropertySubject> = self
2198            .merged()
2199            .property_key_subjects(key)
2200            .into_iter()
2201            .filter(|(_subject, value)| !keep_values.contains(value))
2202            .map(|(subject, _value)| subject)
2203            .collect();
2204        for subject in stale {
2205            match subject {
2206                PropertySubject::Element(id) => self.tombstone_element(id)?,
2207                PropertySubject::Relation(id) => self.tombstone_relation(id)?,
2208                PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
2209            }
2210        }
2211        Ok(())
2212    }
2213
2214    /// Sets a typed property on a subject; the value type is checked at compile
2215    /// time against the key.
2216    ///
2217    /// # Errors
2218    ///
2219    /// Returns [`DbError`] when the subject is absent, the value is out of range,
2220    /// or the value type mismatches the key schema.
2221    ///
2222    /// # Performance
2223    ///
2224    /// This method is `O(log change + log keys)`.
2225    pub fn set<T: ValueType>(
2226        &mut self,
2227        subject: impl Into<PropertySubject>,
2228        key: Key<T>,
2229        value: impl Assignable<T>,
2230    ) -> Result<(), DbError> {
2231        self.set_property(subject.into(), key.id(), value.into_value()?)
2232    }
2233
2234    /// Removes a typed property from a subject.
2235    ///
2236    /// # Errors
2237    ///
2238    /// Returns [`DbError`] when the subject is absent or the key is unknown.
2239    ///
2240    /// # Performance
2241    ///
2242    /// This method is `O(log change + log keys)`.
2243    pub fn unset<T: ValueType>(
2244        &mut self,
2245        subject: impl Into<PropertySubject>,
2246        key: Key<T>,
2247    ) -> Result<(), DbError> {
2248        self.remove_property(subject.into(), key.id())
2249    }
2250
2251    /// Adds a label to an element or relation subject.
2252    ///
2253    /// # Errors
2254    ///
2255    /// Returns [`DbError`] when the subject is absent, the label is unknown, or
2256    /// the subject is an incidence (incidences carry no labels).
2257    ///
2258    /// # Performance
2259    ///
2260    /// This method is `O(log change + log labels)`.
2261    pub fn add_label(
2262        &mut self,
2263        subject: impl Into<PropertySubject>,
2264        label: LabelId,
2265    ) -> Result<(), DbError> {
2266        match subject.into() {
2267            PropertySubject::Element(id) => self.add_element_label(id, label),
2268            PropertySubject::Relation(id) => self.add_relation_label(id, label),
2269            PropertySubject::Incidence(_) => {
2270                Err(DbError::unsupported("incidences do not carry labels"))
2271            }
2272        }
2273    }
2274
2275    /// Tombstones any subject by id, cascading a relation's or element's
2276    /// incidences in `O(degree)` via the reverse-adjacency index.
2277    ///
2278    /// # Errors
2279    ///
2280    /// Returns [`DbError`] when the subject is not visible.
2281    ///
2282    /// # Performance
2283    ///
2284    /// This method is `O(log change + degree)`.
2285    pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
2286        match subject.into() {
2287            PropertySubject::Element(id) => self.tombstone_element(id),
2288            PropertySubject::Relation(id) => self.tombstone_relation(id),
2289            PropertySubject::Incidence(id) => self.tombstone_incidence(id),
2290        }
2291    }
2292
2293    /// Commits this write transaction durably.
2294    ///
2295    /// A non-dirty commit returns the parent's commit sequence without appending
2296    /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
2297    /// log into one WAL frame (with the watermark op last), appends it with an
2298    /// fsync (truncating back to the captured EOF on any write error so no
2299    /// interior torn record survives), THEN folds the delta into a fresh
2300    /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
2301    ///
2302    /// After publishing, a dirty commit consults the configured
2303    /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
2304    /// re-acquire it), then folds when the delta-log has outgrown the base. The
2305    /// committed frame is already durable, so an auto-fold failure does not lose
2306    /// data; it is surfaced to the caller.
2307    ///
2308    /// # Errors
2309    ///
2310    /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
2311    /// durable append, or a triggered auto-checkpoint fold fails.
2312    ///
2313    /// # Performance
2314    ///
2315    /// This method is `O(change)` for the dirty path — flat as the base grows.
2316    /// The publish step shares the parent snapshot's already-materialized
2317    /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
2318    /// folds, so the base is byte-identical within the generation), so it neither
2319    /// re-decodes the base nor rebuilds the index. A triggered fold adds
2320    /// `O(visible state bytes)` on top.
2321    pub(crate) fn commit(self) -> Result<CommitSeq, DbError> {
2322        if self.delta.is_empty() {
2323            // Non-dirty commit: no append, no publish, no durable id advance.
2324            return Ok(self.parent.lsn());
2325        }
2326        let lsn = self
2327            .parent
2328            .lsn()
2329            .checked_next()
2330            .ok_or(DbError::CommitSeqOverflow)?;
2331        let (ops, blob) = self.delta.encode_frame();
2332        let frame = wal::encode_commit(
2333            lsn.get(),
2334            self.transaction_id.get(),
2335            self.database.base_generation,
2336            &ops,
2337            &blob,
2338        )?;
2339        let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
2340        wal::append_commit(&mut log, &frame)?;
2341
2342        // Durable: the delta was seeded from the parent overlay and only added
2343        // this writer's changes, so freezing it directly is the full new
2344        // published overlay (parent state + this commit). The parent overlay was
2345        // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
2346        // pinning the parent is unaffected.
2347        let new_overlay = Arc::new(self.delta.freeze());
2348        // A commit never folds, so the new snapshot pins the SAME base generation
2349        // as the parent — the base wire bytes are byte-identical, and so are the
2350        // owned records and the derived index built from them. Share the parent's
2351        // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
2352        // and rebuilding the index, which keeps a single-element commit `O(change)`
2353        // rather than `O(base)` regardless of how large the base has grown.
2354        let snapshot = Snapshot::with_shared_base_records(
2355            self.parent.generation(),
2356            lsn,
2357            Arc::clone(self.parent.base()),
2358            new_overlay,
2359            Arc::clone(self.parent.base_records()),
2360        );
2361        self.database.current = Arc::new(snapshot);
2362        self.database.last_transaction_id = self.transaction_id;
2363        // Release the writer lock before any auto-fold so the fold can re-acquire
2364        // it (a partial move out of `self`, legal because `Writer` has
2365        // no `Drop` impl; the remaining `&mut Db` borrow stays live).
2366        drop(self.lock);
2367        self.database.maybe_auto_checkpoint()?;
2368        Ok(lsn)
2369    }
2370
2371    /// Returns the merged read view this writer sees (overlay over base).
2372    ///
2373    /// # Performance
2374    ///
2375    /// This method is `O(1)` to construct.
2376    fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
2377        crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
2378    }
2379
2380    /// Requires an element to be visible in the writer's merged view.
2381    ///
2382    /// # Errors
2383    ///
2384    /// Returns [`DbError::UnknownElement`] when absent.
2385    ///
2386    /// # Performance
2387    ///
2388    /// This method is `O(log change + log n)`.
2389    fn require_element(&self, id: ElementId) -> Result<(), DbError> {
2390        if self.merged().contains_element(id) {
2391            Ok(())
2392        } else {
2393            Err(DbError::UnknownElement { id })
2394        }
2395    }
2396
2397    /// Requires a relation to be visible.
2398    ///
2399    /// # Errors
2400    ///
2401    /// Returns [`DbError::UnknownRelation`] when absent.
2402    ///
2403    /// # Performance
2404    ///
2405    /// This method is `O(log change + log n)`.
2406    fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
2407        if self.merged().contains_relation(id) {
2408            Ok(())
2409        } else {
2410            Err(DbError::UnknownRelation { id })
2411        }
2412    }
2413
2414    /// Requires an incidence to be visible.
2415    ///
2416    /// # Errors
2417    ///
2418    /// Returns [`DbError::UnknownIncidence`] when absent.
2419    ///
2420    /// # Performance
2421    ///
2422    /// This method is `O(log change + log n)`.
2423    fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
2424        if self.merged().contains_incidence(id) {
2425            Ok(())
2426        } else {
2427            Err(DbError::UnknownIncidence { id })
2428        }
2429    }
2430
2431    /// Requires a role to exist in the merged catalog.
2432    ///
2433    /// # Errors
2434    ///
2435    /// Returns [`DbError::UnknownRole`] when absent.
2436    ///
2437    /// # Performance
2438    ///
2439    /// This method is `O(log role count)`.
2440    fn require_role(&self, id: RoleId) -> Result<(), DbError> {
2441        if self.delta.catalog().role(id).is_some() {
2442            Ok(())
2443        } else {
2444            Err(DbError::UnknownRole { id })
2445        }
2446    }
2447
2448    /// Requires a label to exist in the merged catalog.
2449    ///
2450    /// # Errors
2451    ///
2452    /// Returns [`DbError::UnknownLabel`] when absent.
2453    ///
2454    /// # Performance
2455    ///
2456    /// This method is `O(log label count)`.
2457    fn require_label(&self, id: LabelId) -> Result<(), DbError> {
2458        if self.delta.catalog().label(id).is_some() {
2459            Ok(())
2460        } else {
2461            Err(DbError::UnknownLabel { id })
2462        }
2463    }
2464
2465    /// Requires a relation type to exist in the merged catalog.
2466    ///
2467    /// # Errors
2468    ///
2469    /// Returns [`DbError::UnknownRelationType`] when absent.
2470    ///
2471    /// # Performance
2472    ///
2473    /// This method is `O(log relation type count)`.
2474    fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
2475        if self.delta.catalog().relation_type(id).is_some() {
2476            Ok(())
2477        } else {
2478            Err(DbError::UnknownRelationType { id })
2479        }
2480    }
2481
2482    /// Requires a property subject to be visible.
2483    ///
2484    /// # Errors
2485    ///
2486    /// Returns the matching `Unknown*` error when the subject is absent.
2487    ///
2488    /// # Performance
2489    ///
2490    /// This method is `O(log change + log n)`.
2491    fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
2492        match subject {
2493            PropertySubject::Element(id) => self.require_element(id),
2494            PropertySubject::Relation(id) => self.require_relation(id),
2495            PropertySubject::Incidence(id) => self.require_incidence(id),
2496        }
2497    }
2498
2499    /// Validates one projection definition against the merged catalog.
2500    ///
2501    /// # Errors
2502    ///
2503    /// Returns [`DbError`] when a referenced role or relation type is unknown.
2504    ///
2505    /// # Performance
2506    ///
2507    /// This method is `O(definition size)`.
2508    fn validate_projection_definition(
2509        &self,
2510        definition: &ProjectionDefinition,
2511    ) -> Result<(), DbError> {
2512        match definition {
2513            ProjectionDefinition::Graph(graph) => {
2514                self.require_role(graph.source_role)?;
2515                self.require_role(graph.target_role)?;
2516                for relation_type in &graph.relation_types {
2517                    self.require_relation_type(*relation_type)?;
2518                }
2519                Ok(())
2520            }
2521            ProjectionDefinition::Hypergraph(hyper) => {
2522                for role in &hyper.source_roles {
2523                    self.require_role(*role)?;
2524                }
2525                for role in &hyper.target_roles {
2526                    self.require_role(*role)?;
2527                }
2528                for relation_type in &hyper.relation_types {
2529                    self.require_relation_type(*relation_type)?;
2530                }
2531                Ok(())
2532            }
2533        }
2534    }
2535
2536    /// Validates one index definition against the merged catalog.
2537    ///
2538    /// # Errors
2539    ///
2540    /// Returns [`DbError`] when a referenced catalog id is unknown or a
2541    /// composite index has no keys.
2542    ///
2543    /// # Performance
2544    ///
2545    /// This method is `O(definition size)`.
2546    fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
2547        let catalog = self.delta.catalog();
2548        match definition {
2549            IndexDefinition::Label { label } => self.require_label(*label),
2550            IndexDefinition::RelationType { relation_type } => {
2551                self.require_relation_type(*relation_type)
2552            }
2553            IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
2554                self.require_property_key(*key)
2555            }
2556            IndexDefinition::CompositeEquality { keys } => {
2557                if keys.is_empty() {
2558                    return Err(DbError::unsupported(
2559                        "composite equality index requires at least one key",
2560                    ));
2561                }
2562                for key in keys {
2563                    self.require_property_key(*key)?;
2564                }
2565                Ok(())
2566            }
2567            IndexDefinition::Projection { projection } => catalog
2568                .projection(*projection)
2569                .is_some()
2570                .then_some(())
2571                .ok_or(DbError::UnknownProjection { id: *projection }),
2572        }
2573    }
2574
2575    /// Requires a property key to exist in the merged catalog.
2576    ///
2577    /// # Errors
2578    ///
2579    /// Returns [`DbError::UnknownPropertyKey`] when absent.
2580    ///
2581    /// # Performance
2582    ///
2583    /// This method is `O(log property key count)`.
2584    fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
2585        if self.delta.catalog().property_key(id).is_some() {
2586            Ok(())
2587        } else {
2588            Err(DbError::UnknownPropertyKey { id })
2589        }
2590    }
2591}
2592
2593#[cfg(test)]
2594#[cfg(not(miri))]
2595mod tests {
2596    use std::{
2597        path::PathBuf,
2598        sync::atomic::{AtomicU64, Ordering},
2599    };
2600
2601    use super::*;
2602
2603    /// Per-process path counter for unique temporary store directories.
2604    static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
2605
2606    /// Returns a unique temporary store path and removes any prior contents.
2607    fn temp_store(name: &str) -> PathBuf {
2608        let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
2609        let path =
2610            std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
2611        let _ = std::fs::remove_dir_all(&path);
2612        path
2613    }
2614
2615    /// Manual measurement harness (run with
2616    /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2617    /// --nocapture`): builds a folded base at roughly the measured-problem scale
2618    /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2619    /// Open must be dominated by the record decode + page faults, NOT the
2620    /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2621    /// Number of element/relation records the open-latency harness builds and the
2622    /// number of timed open runs it averages.
2623    #[cfg(not(debug_assertions))]
2624    const OPEN_LATENCY_ELEMENTS: usize = 100_000;
2625    /// Relations the open-latency harness builds (each with two incidences).
2626    #[cfg(not(debug_assertions))]
2627    const OPEN_LATENCY_RELATIONS: usize = 320_000;
2628    /// Timed open runs the open-latency harness averages.
2629    #[cfg(not(debug_assertions))]
2630    const OPEN_LATENCY_RUNS: u32 = 5;
2631
2632    /// Populates `database` with `OPEN_LATENCY_ELEMENTS` ranked elements and
2633    /// `OPEN_LATENCY_RELATIONS` weighted relations (two incidences each).
2634    #[cfg(not(debug_assertions))]
2635    fn populate_large_store(database: &mut Db) {
2636        database.set_checkpoint_policy(CheckpointPolicy::Manual);
2637        database
2638            .write(|writer| {
2639                let rank = writer.register_property_key(
2640                    "rank",
2641                    PropertyFamily::Element,
2642                    PropertyType::Integer,
2643                )?;
2644                let weight = writer.register_property_key(
2645                    "weight",
2646                    PropertyFamily::Relation,
2647                    PropertyType::Integer,
2648                )?;
2649                let role = writer.register_role("party")?;
2650                let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
2651                for index in 0..OPEN_LATENCY_ELEMENTS {
2652                    let element = writer.create_element()?;
2653                    writer.set_property(
2654                        PropertySubject::Element(element),
2655                        rank,
2656                        PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
2657                    )?;
2658                    elements.push(element);
2659                }
2660                for index in 0..OPEN_LATENCY_RELATIONS {
2661                    let relation = writer.create_relation()?;
2662                    writer.set_property(
2663                        PropertySubject::Relation(relation),
2664                        weight,
2665                        PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
2666                    )?;
2667                    let source = elements[index % OPEN_LATENCY_ELEMENTS];
2668                    let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
2669                    writer.create_incidence(relation, source, role)?;
2670                    writer.create_incidence(relation, target, role)?;
2671                }
2672                Ok(())
2673            })
2674            .expect("populate");
2675        // Fold everything into the base so open pays the base term, not log replay.
2676        database.compact().expect("compact");
2677    }
2678
2679    /// Mean elapsed time of `OPEN_LATENCY_RUNS` full `Db::open` calls on `path`.
2680    #[cfg(not(debug_assertions))]
2681    fn mean_open_ms(path: &std::path::Path) -> f64 {
2682        let mut total = std::time::Duration::ZERO;
2683        for _run in 0..OPEN_LATENCY_RUNS {
2684            let start = std::time::Instant::now();
2685            let opened = Db::open(path).expect("timed open");
2686            total += start.elapsed();
2687            drop(opened);
2688        }
2689        total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2690    }
2691
2692    /// Mean elapsed time of the prior design's open-time heavy work — record
2693    /// decode + `from_records` index rebuild (`BaseRecords::from_view`) — over
2694    /// `OPEN_LATENCY_RUNS` runs, the BEFORE proxy for the borrowed open.
2695    #[cfg(not(debug_assertions))]
2696    fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
2697        let superblock = wal::read_superblock(path).expect("superblock");
2698        let base_path = path.join(base_file(superblock.base_generation.get()));
2699        let mut total = std::time::Duration::ZERO;
2700        for _run in 0..OPEN_LATENCY_RUNS {
2701            let base = Base::open(&base_path, false).expect("base open");
2702            let start = std::time::Instant::now();
2703            let records =
2704                crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
2705            total += start.elapsed();
2706            drop(records);
2707            drop(base);
2708        }
2709        total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2710    }
2711
2712    /// Manual measurement harness (run with
2713    /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2714    /// --nocapture`): builds a folded base at roughly the measured-problem scale
2715    /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2716    /// Open must be dominated by the record decode + page faults, NOT the
2717    /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2718    /// Debug builds skip it (the open-time `debug_assert!` differential check
2719    /// would itself rebuild the index and skew the timing); run in `--release`.
2720    #[test]
2721    #[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
2722    #[cfg(not(debug_assertions))]
2723    fn open_latency_large_base() {
2724        let path = temp_store("open-latency");
2725        let mut database = Db::create(&path).expect("create");
2726        populate_large_store(&mut database);
2727        drop(database);
2728
2729        let _warm = Db::open(&path).expect("warm open");
2730        let after_ms = mean_open_ms(&path);
2731        let before_ms = mean_old_from_view_ms(&path);
2732
2733        println!(
2734            "open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
2735             {OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
2736            OPEN_LATENCY_RELATIONS * 2,
2737            OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
2738        );
2739        println!("  BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
2740        println!("  AFTER  full Db::open (decode + BORROWED index):   {after_ms:.1} ms / open");
2741
2742        let _ = std::fs::remove_dir_all(&path);
2743    }
2744
2745    #[test]
2746    fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
2747        let path = temp_store("reconcile");
2748        let mut database = Db::create(&path).expect("create");
2749        let index = {
2750            let mut writer = database.begin_write().expect("begin write");
2751            let key = writer
2752                .register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
2753                .expect("key");
2754            let index = writer
2755                .define_index(
2756                    "element_stable_key_eq",
2757                    IndexDefinition::PropertyEquality { key },
2758                )
2759                .expect("index");
2760            writer.commit().expect("commit schema");
2761            index
2762        };
2763        let eq = EqualityIndex::<crate::Text>::from_id(index);
2764
2765        let (a1, b1) = {
2766            let mut writer = database.begin_write().expect("begin write");
2767            let a = writer.upsert_element(eq, "a").expect("upsert a");
2768            let b = writer.upsert_element(eq, "b").expect("upsert b");
2769            writer.commit().expect("commit");
2770            (a, b)
2771        };
2772
2773        let (a2, c1) = {
2774            let mut writer = database.begin_write().expect("begin write");
2775            let a = writer.upsert_element(eq, "a").expect("re-upsert a");
2776            let c = writer.upsert_element(eq, "c").expect("upsert c");
2777            writer.retain(eq, &["a", "c"]).expect("retain");
2778            writer.commit().expect("commit");
2779            (a, c)
2780        };
2781
2782        assert_eq!(a1, a2, "an unchanged identity reuses its element id");
2783        assert_ne!(c1, a1);
2784        assert_ne!(c1, b1);
2785
2786        let read = database.reader();
2787        assert!(read.contains_element(a1), "kept a");
2788        assert!(read.contains_element(c1), "kept c");
2789        assert!(!read.contains_element(b1), "retain tombstoned b");
2790        assert_eq!(
2791            read.element_by_key(eq, "a")
2792                .expect("lookup a")
2793                .map(|element| element.id),
2794            Some(a1)
2795        );
2796        assert!(
2797            read.element_by_key(eq, "b").expect("lookup b").is_none(),
2798            "b is not resolvable after the prune"
2799        );
2800        let _ = std::fs::remove_dir_all(&path);
2801    }
2802
2803    #[test]
2804    fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
2805        let path = temp_store("write-closure");
2806        let mut database = Db::create(&path).expect("create");
2807
2808        // Ok with a change → committed; the read closure observes it.
2809        let (id, outcome) = database
2810            .write(|writer| {
2811                let id = writer.create_element()?;
2812                Ok(id)
2813            })
2814            .expect("write");
2815        assert!(matches!(outcome, CommitOutcome::Committed(_)));
2816        database
2817            .read(|read| {
2818                assert!(read.contains_element(id));
2819                Ok(())
2820            })
2821            .expect("read");
2822
2823        // A no-op write reports Empty (no frame appended).
2824        let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
2825        assert_eq!(outcome, CommitOutcome::Empty);
2826
2827        // An Err from the closure rolls back the staged delta.
2828        let before = database
2829            .read(|read| Ok(read.element_count()))
2830            .expect("count");
2831        let result = database.write(|writer| {
2832            writer.create_element()?;
2833            Err::<(), DbError>(DbError::EmptyQuery)
2834        });
2835        assert!(result.is_err());
2836        let after = database
2837            .read(|read| Ok(read.element_count()))
2838            .expect("count");
2839        assert_eq!(before, after, "the failed write staged nothing durable");
2840
2841        let _ = std::fs::remove_dir_all(&path);
2842    }
2843
2844    #[test]
2845    fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
2846        // The reconcile/reindex contract: re-asserting a property's existing value
2847        // must log NO mutation, so an incremental reconcile that re-sets every
2848        // property of every unchanged subject stays O(change). Without the no-op
2849        // gate the commit logs the whole graph every reindex.
2850        let path = temp_store("set-noop");
2851        let mut database = Db::create(&path).expect("create");
2852        let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
2853
2854        // Create an element and set its name (a real change → committed).
2855        let id = database
2856            .write(|writer| {
2857                let bound = writer.apply_schema(&schema)?;
2858                let name = bound.key::<crate::Text>("name")?;
2859                let id = writer.create_element()?;
2860                writer.set(id, name, "alpha")?;
2861                Ok(id)
2862            })
2863            .expect("first write")
2864            .0;
2865
2866        // Re-asserting the SAME value mutates nothing → the commit is Empty.
2867        let ((), outcome) = database
2868            .write(|writer| {
2869                let bound = writer.apply_schema(&schema)?;
2870                let name = bound.key::<crate::Text>("name")?;
2871                writer.set(id, name, "alpha")?;
2872                Ok(())
2873            })
2874            .expect("idempotent set");
2875        assert_eq!(
2876            outcome,
2877            CommitOutcome::Empty,
2878            "re-setting the same property value must log no mutation"
2879        );
2880
2881        // Setting a DIFFERENT value is a real change → committed, and visible.
2882        let ((), outcome) = database
2883            .write(|writer| {
2884                let bound = writer.apply_schema(&schema)?;
2885                let name = bound.key::<crate::Text>("name")?;
2886                writer.set(id, name, "beta")?;
2887                Ok(())
2888            })
2889            .expect("changed set");
2890        assert!(matches!(outcome, CommitOutcome::Committed(_)));
2891        let name = database
2892            .bind(&schema)
2893            .expect("bind")
2894            .key::<crate::Text>("name")
2895            .expect("name key");
2896        let value = database
2897            .read(|read| {
2898                Ok(read
2899                    .element(id)
2900                    .and_then(|element| element.properties().get::<crate::Text, String>(name)))
2901            })
2902            .expect("read");
2903        assert_eq!(value.as_deref(), Some("beta"));
2904
2905        let _ = std::fs::remove_dir_all(&path);
2906    }
2907
2908    #[test]
2909    fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
2910        let path = temp_store("schema");
2911        let mut database = Db::create(&path).expect("create");
2912        let schema = Schema::new()
2913            .label("function")
2914            .key::<crate::Text>("name", PropertyFamily::Element)
2915            .equality_index("name_eq", "name");
2916
2917        // First apply registers the catalog and upserts two elements by identity.
2918        let (alpha, beta) = database
2919            .write(|writer| {
2920                let bound = writer.apply_schema(&schema)?;
2921                let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
2922                let function = bound.label("function")?;
2923                let alpha = writer.upsert_element(name_eq, "alpha")?;
2924                writer.add_label(alpha, function)?;
2925                let beta = writer.upsert_element(name_eq, "beta")?;
2926                Ok((alpha, beta))
2927            })
2928            .expect("apply + write")
2929            .0;
2930        assert_ne!(alpha, beta);
2931
2932        // Re-applying the same schema is idempotent: nothing new registers, so the
2933        // commit is empty.
2934        let (_bound, outcome) = database
2935            .write(|writer| writer.apply_schema(&schema))
2936            .expect("re-apply");
2937        assert_eq!(
2938            outcome,
2939            CommitOutcome::Empty,
2940            "re-applying a schema registers nothing new"
2941        );
2942
2943        // bind() resolves the schema read-only on a reopened store; the typed
2944        // handle round-trips, and a wrong value type is rejected.
2945        let reopened = Db::open(&path).expect("open");
2946        let bound = reopened.bind(&schema).expect("bind");
2947        let name_eq = bound
2948            .equality_index::<crate::Text>("name_eq")
2949            .expect("typed index");
2950        assert!(
2951            bound.equality_index::<crate::Int>("name_eq").is_err(),
2952            "a wrong-value-type handle request is a SchemaConflict"
2953        );
2954        let found = reopened
2955            .read(|read| read.element_by_key(name_eq, "alpha"))
2956            .expect("read")
2957            .expect("alpha present");
2958        assert_eq!(found.id, alpha);
2959
2960        let _ = std::fs::remove_dir_all(&path);
2961    }
2962
2963    /// The exact logical state the crash-matrix asserts recovery preserves: the
2964    /// visible element ids, the rank-keyed property values, and the `Person`
2965    /// label membership.
2966    #[derive(Debug, Eq, PartialEq)]
2967    struct LogicalState {
2968        /// Visible element ids in ascending order.
2969        elements: Vec<ElementId>,
2970        /// Subjects whose `rank` equals each probed value, by value.
2971        rank_eq_500: Vec<PropertySubject>,
2972        /// Element ids carrying the `Person` label.
2973        person_members: Vec<ElementId>,
2974    }
2975
2976    /// Catalog/topology fixture ids returned by [`build_fixture`].
2977    struct Fixture {
2978        /// `rank` integer property key.
2979        rank: PropertyKeyId,
2980        /// `Person` label.
2981        person: LabelId,
2982    }
2983
2984    /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
2985    /// even-indexed ones labelled `Person`. Returns the fixture ids.
2986    fn build_fixture(database: &mut Db) -> Fixture {
2987        let mut writer = database.begin_write().expect("begin write");
2988        let rank = writer
2989            .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
2990            .expect("rank key");
2991        let person = writer.register_label("Person").expect("person label");
2992        for index in 0..8u64 {
2993            let element = writer.create_element().expect("element");
2994            writer
2995                .set(
2996                    element,
2997                    Key::<crate::Int>::from_id(rank),
2998                    i64::try_from(index).expect("index") * 100,
2999                )
3000                .expect("set rank");
3001            if index % 2 == 0 {
3002                writer.add_label(element, person).expect("add label");
3003            }
3004        }
3005        writer.commit().expect("commit fixture");
3006        Fixture { rank, person }
3007    }
3008
3009    /// Reads the logical state through the index-backed read surface.
3010    fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
3011        let read = database.reader();
3012        let elements = read.element_ids();
3013        let rank_eq_500 = read
3014            .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
3015            .expect("rank lookup");
3016        let person_members = read.snapshot.view().elements_with_label(fixture.person);
3017        LogicalState {
3018            elements,
3019            rank_eq_500,
3020            person_members,
3021        }
3022    }
3023
3024    /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
3025    /// `database` mints must take the id one past the current maximum visible
3026    /// element id, i.e. the recovered watermark survived the fold. A regression
3027    /// that dropped the watermark on fold (so the recovered record set is
3028    /// unchanged but the next-id counter reset) would reuse an existing id and
3029    /// fail this assertion — which the unchanged-record-set checks alone miss.
3030    ///
3031    /// The probe element is rolled back, so it does not perturb the logical state
3032    /// the surrounding test re-reads.
3033    fn assert_no_id_reuse_across_fold(database: &mut Db) {
3034        let max_existing = database
3035            .reader()
3036            .element_ids()
3037            .into_iter()
3038            .map(ElementId::get)
3039            .max()
3040            .unwrap_or(0);
3041        let expected = ElementId::new(max_existing + 1);
3042        let mut writer = database.begin_write().expect("watermark probe writer");
3043        let minted = writer.create_element().expect("watermark probe element");
3044        assert_eq!(
3045            minted, expected,
3046            "the next minted id must be one past the max existing id (watermark \
3047             survived the fold; ids are never reused)",
3048        );
3049        // Drop the probe writer so it leaves no trace in the logical state.
3050        drop(writer);
3051    }
3052
3053    /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
3054    /// recovers EXACTLY the correct logical state. After a crash before the
3055    /// superblock lands, the OLD generation stays authoritative (the orphan new
3056    /// base is ignored); after a crash once the superblock names the new
3057    /// generation, the NEW base is authoritative. The completed checkpoint
3058    /// recovers the same logical state from the folded base. In every case the
3059    /// index-backed lookups return the same answers as before the (attempted)
3060    /// fold.
3061    #[test]
3062    fn checkpoint_crash_matrix_recovers_exact_state() {
3063        for stop in [
3064            CheckpointStop::BeforeSuperblock,
3065            CheckpointStop::BeforeRotate,
3066            CheckpointStop::Complete,
3067        ] {
3068            let path = temp_store(&format!("crash-{stop:?}"));
3069            let mut database = Db::create(&path).expect("create");
3070            let fixture = build_fixture(&mut database);
3071            let before = read_logical(&database, &fixture);
3072            let before_generation = database.base_generation;
3073
3074            // Simulate a crash at `stop`: the checkpoint returns right after the
3075            // chosen fsync, leaving the intermediate files in place. We then drop
3076            // the handle (as a crash would) and reopen from disk.
3077            database
3078                .checkpoint_inner(stop)
3079                .expect("checkpoint stop returns ok");
3080            drop(database);
3081
3082            let mut recovered = Db::open(&path).expect("reopen after crash");
3083            let after = read_logical(&recovered, &fixture);
3084            assert_eq!(
3085                after, before,
3086                "crash at {stop:?} must recover the exact logical state",
3087            );
3088
3089            // The recovered watermark survives every crash window: the next minted
3090            // id is one past the max recovered element id, so ids are never reused
3091            // across the (attempted) fold — asserted behaviorally, not merely
3092            // inferred from the unchanged record set.
3093            assert_no_id_reuse_across_fold(&mut recovered);
3094
3095            // Generation expectation per crash window.
3096            match stop {
3097                CheckpointStop::BeforeSuperblock => assert_eq!(
3098                    recovered.base_generation, before_generation,
3099                    "old superblock stays authoritative before the new one lands",
3100                ),
3101                CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
3102                    recovered.base_generation,
3103                    before_generation + 1,
3104                    "the new superblock names the folded generation",
3105                ),
3106            }
3107
3108            // A second open is idempotent (orphan files from a partial crash do
3109            // not derail a repeat recovery).
3110            let reopened = Db::open(&path).expect("second reopen");
3111            assert_eq!(read_logical(&reopened, &fixture), before);
3112
3113            drop(reopened);
3114            let _ = std::fs::remove_dir_all(&path);
3115        }
3116    }
3117
3118    /// The auto-checkpoint policy folds the delta-log into a fresh base once the
3119    /// log outgrows the base by the configured factor: under a tiny factor, a
3120    /// run of dirty commits advances the live generation (the log was folded),
3121    /// and the logical state is preserved across the fold. The manual policy
3122    /// never auto-folds.
3123    #[test]
3124    fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
3125        // Manual policy: many commits, generation never advances on its own.
3126        let manual_path = temp_store("auto-manual");
3127        let mut manual = Db::create(&manual_path).expect("create manual");
3128        manual.set_checkpoint_policy(CheckpointPolicy::Manual);
3129        let _fixture = build_fixture(&mut manual);
3130        for _ in 0..200 {
3131            let mut writer = manual.begin_write().expect("writer");
3132            writer.create_element().expect("element");
3133            writer.commit().expect("commit");
3134        }
3135        assert_eq!(
3136            manual.live_generation(),
3137            CheckpointGeneration::new(0),
3138            "manual policy must never auto-fold",
3139        );
3140        drop(manual);
3141        let _ = std::fs::remove_dir_all(&manual_path);
3142
3143        // Size-ratio policy with the smallest factor: the log soon outgrows the
3144        // tiny base floor, so a run of commits triggers at least one fold.
3145        let auto_path = temp_store("auto-ratio");
3146        let mut auto = Db::create(&auto_path).expect("create auto");
3147        auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
3148        let fixture = build_fixture(&mut auto);
3149        let before = read_logical(&auto, &fixture);
3150        for _ in 0..400 {
3151            let mut writer = auto.begin_write().expect("writer");
3152            writer.create_element().expect("element");
3153            writer.commit().expect("commit");
3154        }
3155        assert!(
3156            auto.live_generation() > CheckpointGeneration::new(0),
3157            "size-ratio policy must auto-fold once the log outgrows the base",
3158        );
3159        // The pre-existing logical state survives every fold; the policy is also
3160        // surfaced in status and preserved across the fold.
3161        let after = read_logical(&auto, &fixture);
3162        assert_eq!(after.rank_eq_500, before.rank_eq_500);
3163        assert_eq!(after.person_members, before.person_members);
3164        // Ids are never reused across the auto-fold: the next minted id is one
3165        // past the max existing id (the watermark folded into the new base).
3166        assert_no_id_reuse_across_fold(&mut auto);
3167        assert_eq!(
3168            auto.checkpoint_policy(),
3169            CheckpointPolicy::SizeRatio { factor: 1 },
3170            "the auto-fold reopen must preserve the configured policy",
3171        );
3172        // Status surfaces the live generation and the (now small) log size.
3173        let status = auto.stats();
3174        assert_eq!(status.live_generation, auto.live_generation());
3175        assert!(status.base_byte_size > 0, "live base has bytes");
3176        drop(auto);
3177        let _ = std::fs::remove_dir_all(&auto_path);
3178    }
3179}