Skip to main content

oxgraph_db/
database.rs

1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13    borrow::Cow,
14    collections::BTreeSet,
15    path::{Path, PathBuf},
16    sync::Arc,
17};
18
19use oxgraph_algo::{
20    PageRankConfig, PageRankError, PageRankWorkspace, Uniform, longest_path_dag,
21    pagerank_graph_with_workspace,
22};
23use oxgraph_graph::{CanonicalElementIdentity, ElementIndex, LocalElementIdentity};
24
25use crate::{
26    Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId,
27    GraphProjectionDefinition, GraphProjectionSpec, IncidenceId, IncidenceRecord, IndexId, LabelId,
28    PreparedQuery, ProjectionDefinition, ProjectionId, Properties, PropertyKeyId, PropertySubject,
29    PropertyType, PropertyValue, QueryResult, Relation, RelationId, RelationTypeId, RoleId, Schema,
30    TransactionId,
31    backing::Base,
32    catalog::{IndexDefinition, PropertyFamily},
33    freeze::{self, FreezeStamps},
34    lock::WriterLock,
35    overlay::{Overlay, Snapshot, StateView, WriteOverlay},
36    projection::{self, GraphProjection, HypergraphProjection, ProjectionElementId},
37    state::NextIds,
38    storage,
39    traversal::{self, Direction, Subgraph, Walk},
40    typed::{Assignable, EqualityIndex, Key, ValueType},
41    wal,
42    wire::SuperblockRecord,
43};
44
45/// Lookup input for a cataloged index.
46///
47/// This type makes index lookup shape explicit: membership indexes accept
48/// [`Match::All`], single-property indexes accept scalar equality or
49/// range inputs, and composite equality indexes accept an ordered value tuple.
50///
51/// # Performance
52///
53/// Copying this value is `O(1)`.
54#[derive(Clone, Copy, Debug)]
55pub enum Match<'value> {
56    /// Lookup every subject represented by a membership-style index.
57    All,
58    /// Lookup one scalar equality value.
59    Equal(&'value PropertyValue),
60    /// Lookup one inclusive scalar range.
61    Range {
62        /// Inclusive lower bound.
63        min: &'value PropertyValue,
64        /// Inclusive upper bound.
65        max: &'value PropertyValue,
66    },
67    /// Lookup one ordered composite equality tuple.
68    Composite(&'value [PropertyValue]),
69}
70
71/// Auto-checkpoint policy: decides when a dirty commit should fold the
72/// delta-log into a fresh base generation, bounding the log tail that recovery
73/// must replay.
74///
75/// The default is size-ratio: trigger when the delta-log grows past `factor`
76/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
77/// disables auto-triggering entirely (folded only by an explicit
78/// [`Db::compact`]).
79///
80/// # Performance
81///
82/// Copying this value is `O(1)`.
83#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum CheckpointPolicy {
85    /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
86    Manual,
87    /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
88    /// times the live base size (a small floor guards a tiny/empty base so the
89    /// gen-0 store does not checkpoint on its first commit).
90    SizeRatio {
91        /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
92        /// before the next dirty commit folds it.
93        factor: u32,
94    },
95}
96
97impl CheckpointPolicy {
98    /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
99    /// four times the live base size.
100    pub const DEFAULT_FACTOR: u32 = 4;
101
102    /// The base-size floor (bytes) below which the size-ratio policy never fires,
103    /// so a freshly created (near-empty) base is not checkpointed on its first
104    /// commits before it carries meaningful data.
105    const MIN_BASE_BYTES: u64 = 4 * 1024;
106
107    /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
108    /// should trigger an auto-checkpoint under this policy.
109    ///
110    /// # Performance
111    ///
112    /// This method is `O(1)`.
113    #[must_use]
114    const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
115        match self {
116            Self::Manual => false,
117            Self::SizeRatio { factor } => {
118                let floor = if base_bytes < Self::MIN_BASE_BYTES {
119                    Self::MIN_BASE_BYTES
120                } else {
121                    base_bytes
122                };
123                log_bytes > floor.saturating_mul(factor as u64)
124            }
125        }
126    }
127}
128
129impl Default for CheckpointPolicy {
130    /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
131    ///
132    /// # Performance
133    ///
134    /// This function is `O(1)`.
135    fn default() -> Self {
136        Self::SizeRatio {
137            factor: Self::DEFAULT_FACTOR,
138        }
139    }
140}
141
142/// The durable result of a [`Db::write`]: whether a frame landed, and at which
143/// commit sequence.
144///
145/// # Performance
146///
147/// Copying this value is `O(1)`.
148#[derive(Clone, Copy, Debug, Eq, PartialEq)]
149#[non_exhaustive]
150pub enum CommitOutcome {
151    /// The transaction made no changes; no WAL frame was appended.
152    Empty,
153    /// A durable frame landed at this commit sequence.
154    Committed(CommitSeq),
155}
156
157/// Builds the base filename for generation `generation`.
158///
159/// # Performance
160///
161/// This function is `O(1)`.
162fn base_file(generation: u64) -> String {
163    format!("base-{generation}.oxgdb")
164}
165
166/// Builds the delta-log filename for generation `generation`.
167///
168/// # Performance
169///
170/// This function is `O(1)`.
171fn delta_file(generation: u64) -> String {
172    format!("delta-{generation}.log")
173}
174
175/// Open OXGDB database handle.
176///
177/// # Performance
178///
179/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
180/// delta-log handle.
181pub struct Db {
182    /// Root database directory.
183    root: PathBuf,
184    /// The current visible snapshot (base generation + published overlay),
185    /// shared by readers through an atomically reference-counted handle.
186    current: Arc<Snapshot>,
187    /// Live base generation named by the superblock; every delta frame and the
188    /// per-generation log filename carry it.
189    base_generation: u64,
190    /// Last writer transaction id durably recorded (the last dirty commit's id).
191    /// A rollback burns a session-local id above this but does not advance it.
192    last_transaction_id: TransactionId,
193    /// Auto-checkpoint policy consulted after each dirty commit.
194    checkpoint_policy: CheckpointPolicy,
195}
196
197impl Db {
198    /// Creates a new empty OXGDB database at `path`.
199    ///
200    /// The create order is base-0 then empty delta-0.log then the writer lock
201    /// file then the superblock (written LAST as the create-complete marker), so
202    /// a half-created store is detected on open rather than silently opened
203    /// empty.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`DbError::AlreadyExists`] when a store already exists, or
208    /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
209    ///
210    /// # Performance
211    ///
212    /// This function is `O(empty base bytes)`.
213    pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
214        let root = path.as_ref().to_path_buf();
215        if root.join(wal::SUPERBLOCK_FILE).exists() {
216            return Err(DbError::AlreadyExists);
217        }
218        // Base-0: an empty merged view (empty base under an empty overlay).
219        let empty_base = crate::overlay::BaseRecords::empty();
220        let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
221        let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
222        let base_bytes = freeze::freeze_view(
223            &view,
224            FreezeStamps {
225                commit_seq: 0,
226                transaction_id: 0,
227                generation: 0,
228            },
229        )?;
230        storage::atomic_write(
231            &root,
232            &root.join(format!("{}.tmp", base_file(0))),
233            &root.join(base_file(0)),
234            &base_bytes,
235        )?;
236        // Empty delta-0.log, durably created.
237        create_empty_log(&root, 0)?;
238        // Superblock is written LAST; its existence is the create-complete marker.
239        write_superblock(&root, 0, 0, 0, 0)?;
240        Self::open(&root)
241    }
242
243    /// Opens an existing OXGDB database, recovering the live frontier from the
244    /// valid prefix of the delta-log replayed over the base named by the
245    /// superblock.
246    ///
247    /// # Errors
248    ///
249    /// Returns [`DbError`] when the store is missing, malformed, or the log is
250    /// corrupt beyond a torn tail.
251    ///
252    /// # Performance
253    ///
254    /// This function is `O(base bytes + log bytes)`.
255    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
256        let root = path.as_ref().to_path_buf();
257        let superblock = wal::read_superblock(&root)?;
258        let generation = superblock.base_generation.get();
259
260        let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
261        let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
262        let base_header = *base.get().header();
263        let base_catalog = base.get().catalog().clone();
264        let base_next = NextIds::from_header(&base_header);
265
266        // Replay the valid prefix of the per-generation delta-log.
267        let log_path = root.join(delta_file(generation));
268        let log_bytes = read_log(&log_path)?;
269        let outcome = wal::replay(generation, &log_bytes)?;
270        // A torn tail truncates the log back to its last-good byte length.
271        if outcome.valid_len < log_bytes.len() {
272            truncate_log(&log_path, outcome.valid_len)?;
273        }
274
275        // Fold the replayed frames into a fresh overlay over the base, deriving
276        // the live frontier (commit_seq/txn_id) from the last good frame.
277        let mut write = WriteOverlay::new(base_next, base_catalog);
278        let mut recovered_next = base_next;
279        let mut last_commit_seq = superblock.commit_seq.get();
280        let mut last_txn = superblock.transaction_id.get();
281        for frame in &outcome.frames {
282            for op in &frame.ops {
283                write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
284            }
285            recovered_next = recovered_next.elementwise_max(write.next_ids());
286            last_commit_seq = frame.lsn;
287            last_txn = last_txn.max(frame.txn_id);
288        }
289        // ids are never reused: the recovered watermark is the elementwise max of
290        // the base header and every replayed frame's watermark.
291        write.set_next_ids(recovered_next);
292        let overlay = Arc::new(write.freeze());
293
294        // Reuse the records already decoded for replay instead of decoding the base
295        // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
296        // the records (and their derived index) match. Halves open's base-decode cost.
297        let snapshot = Arc::new(Snapshot::with_shared_base_records(
298            CheckpointGeneration::new(generation),
299            CommitSeq::new(last_commit_seq),
300            base,
301            overlay,
302            base_records,
303        ));
304
305        Ok(Self {
306            root,
307            current: snapshot,
308            base_generation: generation,
309            last_transaction_id: TransactionId::new(last_txn),
310            checkpoint_policy: CheckpointPolicy::default(),
311        })
312    }
313
314    /// Returns the live base generation named by the superblock (the count of
315    /// folds this store has undergone; gen-0 is the freshly created store).
316    ///
317    /// # Performance
318    ///
319    /// This method is `O(1)`.
320    #[must_use]
321    pub const fn live_generation(&self) -> CheckpointGeneration {
322        CheckpointGeneration::new(self.base_generation)
323    }
324
325    /// Returns the configured auto-checkpoint policy.
326    ///
327    /// # Performance
328    ///
329    /// This method is `O(1)`.
330    #[must_use]
331    pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
332        self.checkpoint_policy
333    }
334
335    /// Sets the auto-checkpoint policy consulted after each dirty commit.
336    ///
337    /// # Performance
338    ///
339    /// This method is `O(1)`.
340    pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
341        self.checkpoint_policy = policy;
342    }
343
344    /// Validates the current handle by re-reading the superblock and verifying
345    /// the live base's content CRC.
346    ///
347    /// # Errors
348    ///
349    /// Returns [`DbError`] when the superblock or base fails validation.
350    ///
351    /// # Performance
352    ///
353    /// This method is `O(base bytes)`.
354    pub fn validate(&self) -> Result<(), DbError> {
355        wal::read_superblock(&self.root)?;
356        Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
357    }
358
359    /// Validates an OXGDB database at `path`.
360    ///
361    /// # Errors
362    ///
363    /// Returns [`DbError`] when the store fails to open and recover.
364    ///
365    /// # Performance
366    ///
367    /// This function is `O(base bytes + log bytes)`.
368    pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
369        Self::open(path).map(|_database| ())
370    }
371
372    /// Folds the current base+overlay into a new base generation, rotating the
373    /// delta-log and republishing the superblock (a manual checkpoint).
374    ///
375    /// This is the checkpoint primitive, exposed here so the existing `compact`
376    /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
377    /// configured separately via [`Db::set_checkpoint_policy`].
378    ///
379    /// # Errors
380    ///
381    /// Returns [`DbError`] when encoding, writing, or publishing the new
382    /// generation fails.
383    ///
384    /// # Performance
385    ///
386    /// This method is `O(visible state bytes)`.
387    pub fn compact(&mut self) -> Result<(), DbError> {
388        self.checkpoint()
389    }
390
391    /// Folds the current base+overlay into base-`{g+1}`, creates an empty
392    /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
393    /// linearization point), then unlinks the old base and log.
394    ///
395    /// The order is crash-safe: the new base is fully durable BEFORE the
396    /// superblock names it (so a crash before the superblock leaves the OLD
397    /// superblock authoritative and the orphan new base is ignored), and the old
398    /// base/log are unlinked only AFTER the superblock names the new generation
399    /// (so a crash before the unlink leaves the NEW superblock authoritative and
400    /// the orphan old files are ignored). The
401    /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
402    ///
403    /// # Errors
404    ///
405    /// Returns [`DbError`] when encoding, writing, or publishing fails.
406    ///
407    /// # Performance
408    ///
409    /// This method is `O(visible state bytes)`.
410    pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
411        self.checkpoint_inner(
412            #[cfg(test)]
413            CheckpointStop::Complete,
414        )
415    }
416
417    /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
418    /// [`CheckpointStop`] that simulates a crash by returning early right after a
419    /// chosen fsync point, leaving the on-disk files exactly as a real crash
420    /// there would, so the crash-matrix test can reopen and assert recovery.
421    ///
422    /// # Errors
423    ///
424    /// Returns [`DbError`] when encoding, writing, or publishing fails.
425    ///
426    /// # Performance
427    ///
428    /// This method is `O(visible state bytes)`.
429    fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
430        let _lock = WriterLock::acquire(&self.root)?;
431        let next_generation = self
432            .base_generation
433            .checked_add(1)
434            .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
435        let view = self.current.view();
436        let commit_seq = self.current.lsn().get();
437        let base_bytes = freeze::freeze_view(
438            &view,
439            FreezeStamps {
440                commit_seq,
441                transaction_id: self.last_transaction_id.get(),
442                generation: next_generation,
443            },
444        )?;
445        // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
446        storage::atomic_write(
447            &self.root,
448            &self
449                .root
450                .join(format!("{}.tmp", base_file(next_generation))),
451            &self.root.join(base_file(next_generation)),
452            &base_bytes,
453        )?;
454        // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
455        create_empty_log(&self.root, next_generation)?;
456        // Crash point A: new base + new log durable, superblock NOT yet
457        // published. The OLD superblock still names `g`, so recovery uses the old
458        // generation; the new base/log are orphans.
459        #[cfg(test)]
460        if matches!(stop, CheckpointStop::BeforeSuperblock) {
461            return Ok(());
462        }
463        // (3) publish the superblock naming g+1 — the linearization point.
464        write_superblock(
465            &self.root,
466            next_generation,
467            commit_seq,
468            commit_seq,
469            self.last_transaction_id.get(),
470        )?;
471        // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
472        // Recovery uses the new generation; the old base/log are orphans.
473        #[cfg(test)]
474        if matches!(stop, CheckpointStop::BeforeRotate) {
475            return Ok(());
476        }
477        // Re-open over the new generation, then (4) unlink the old base + log.
478        let reopened = Self::open(&self.root)?;
479        let old_generation = self.base_generation;
480        let policy = self.checkpoint_policy;
481        self.current = reopened.current;
482        self.base_generation = reopened.base_generation;
483        self.last_transaction_id = reopened.last_transaction_id;
484        // The reopen reset the policy to the default; restore the caller's.
485        self.checkpoint_policy = policy;
486        let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
487        let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
488        let _ = storage::sync_directory(&self.root);
489        Ok(())
490    }
491
492    /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
493    /// delta-log has grown too large relative to the base. Called after a dirty
494    /// commit publishes its frame. A failed fold is surfaced so the caller can
495    /// observe it; the committed data is already durable in the log regardless.
496    ///
497    /// # Errors
498    ///
499    /// Returns [`DbError`] when the triggered fold fails.
500    ///
501    /// # Performance
502    ///
503    /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
504    fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
505        let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
506        let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
507        if self
508            .checkpoint_policy
509            .should_checkpoint(log_bytes, base_bytes)
510        {
511            self.checkpoint()?;
512        }
513        Ok(())
514    }
515
516    /// Returns operational status for this handle, including the live generation
517    /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
518    /// weighs.
519    ///
520    /// # Performance
521    ///
522    /// This method is `O(visible state)` for the merged counts plus two `stat`
523    /// syscalls for the file sizes.
524    #[must_use]
525    pub fn stats(&self) -> Stats {
526        let view = self.current.view();
527        Stats {
528            visible_commit_seq: self.current.lsn(),
529            last_transaction_id: self.last_transaction_id,
530            live_generation: CheckpointGeneration::new(self.base_generation),
531            base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
532            log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
533            element_count: view.element_count(),
534            relation_count: view.relation_count(),
535            incidence_count: view.incidence_count(),
536            catalog: self.catalog_summary(),
537        }
538    }
539
540    /// Returns a catalog-size summary.
541    ///
542    /// # Performance
543    ///
544    /// This method is `O(catalog entry count)`.
545    #[must_use]
546    pub fn catalog_summary(&self) -> CatalogSummary {
547        CatalogSummary::from_catalog(self.current.view().catalog())
548    }
549
550    /// Starts a read transaction pinned to the current visible snapshot.
551    ///
552    /// # Performance
553    ///
554    /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
555    /// observes a fixed state even across later commits and checkpoints.
556    #[must_use]
557    pub fn reader(&self) -> Reader {
558        Reader {
559            snapshot: Arc::clone(&self.current),
560        }
561    }
562
563    /// Starts the single writer transaction, acquiring the cross-process writer
564    /// lock for the transaction's lifetime.
565    ///
566    /// # Errors
567    ///
568    /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
569    /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
570    ///
571    /// # Performance
572    ///
573    /// This method is `O(1)`: the writer layers a fresh empty write overlay over
574    /// the current snapshot.
575    pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
576        let lock = WriterLock::acquire(&self.root)?;
577        let transaction_id = self
578            .last_transaction_id
579            .checked_next()
580            .ok_or(DbError::TransactionIdOverflow)?;
581        // Burn the id eagerly so it is session-local-visible even on rollback;
582        // it only becomes durable when a dirty commit writes its frame, and a
583        // reopen recovers the durable high-water mark from the log.
584        self.last_transaction_id = transaction_id;
585        let parent = Arc::clone(&self.current);
586        // Seed the writer delta from the parent's published overlay so the
587        // writer reads every committed record; the parent overlay is never
588        // mutated (the seed clones its maps).
589        let delta = WriteOverlay::from_overlay(parent.overlay());
590        Ok(Writer {
591            database: self,
592            parent,
593            delta,
594            transaction_id,
595            lock,
596        })
597    }
598
599    /// Runs `f` against a read transaction pinned to the current snapshot. The
600    /// primary read entry point.
601    ///
602    /// # Errors
603    ///
604    /// Propagates whatever error `f` returns.
605    ///
606    /// # Performance
607    ///
608    /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
609    pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
610        f(&self.reader())
611    }
612
613    /// Runs `f` against the single write transaction, committing on `Ok` and
614    /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
615    /// value with the [`CommitOutcome`] (whether a durable frame landed). The
616    /// primary write entry point.
617    ///
618    /// # Errors
619    ///
620    /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock,
621    /// `f`'s error (after rolling back the staged delta), or a commit error.
622    ///
623    /// # Performance
624    ///
625    /// Begin is `O(1)`; commit is `O(change)`. A triggered auto-fold adds
626    /// `O(visible bytes)`.
627    pub fn write<R>(
628        &mut self,
629        f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
630    ) -> Result<(R, CommitOutcome), DbError> {
631        let mut writer = self.begin_write()?;
632        // On `Err` the `?` drops `writer` here, releasing the lock and discarding
633        // the staged delta — no frame is appended (rollback).
634        let value = f(&mut writer)?;
635        let committed = !writer.delta.is_empty();
636        let lsn = writer.commit()?;
637        let outcome = if committed {
638            CommitOutcome::Committed(lsn)
639        } else {
640            CommitOutcome::Empty
641        };
642        Ok((value, outcome))
643    }
644
645    /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
646    /// writing, returning the [`Bound`] handle bag (for a store already
647    /// bootstrapped with this schema).
648    ///
649    /// # Errors
650    ///
651    /// Returns [`DbError::UnknownName`] when a declared item is absent.
652    ///
653    /// # Performance
654    ///
655    /// This method is `O(declared items × log catalog)`.
656    pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
657        let view = self.current.view();
658        let catalog = view.catalog();
659        let mut bound = Bound::default();
660        for name in &schema.roles {
661            let id = catalog.role_id(name).ok_or_else(|| DbError::UnknownName {
662                kind: "role",
663                name: name.clone(),
664            })?;
665            bound.roles.insert(name.clone(), id);
666        }
667        for name in &schema.labels {
668            let id = catalog.label_id(name).ok_or_else(|| DbError::UnknownName {
669                kind: "label",
670                name: name.clone(),
671            })?;
672            bound.labels.insert(name.clone(), id);
673        }
674        for name in &schema.relation_types {
675            let id = catalog
676                .relation_type_id(name)
677                .ok_or_else(|| DbError::UnknownName {
678                    kind: "relation type",
679                    name: name.clone(),
680                })?;
681            bound.relation_types.insert(name.clone(), id);
682        }
683        for (name, _family, value_type) in &schema.keys {
684            let id = catalog
685                .property_key_id(name)
686                .ok_or_else(|| DbError::UnknownName {
687                    kind: "property key",
688                    name: name.clone(),
689                })?;
690            bound.keys.insert(name.clone(), (id, *value_type));
691        }
692        for (name, key_name) in &schema.equality_indexes {
693            let (_key_id, value_type) =
694                *bound
695                    .keys
696                    .get(key_name)
697                    .ok_or_else(|| DbError::UnknownName {
698                        kind: "property key",
699                        name: key_name.clone(),
700                    })?;
701            let id = catalog.index_id(name).ok_or_else(|| DbError::UnknownName {
702                kind: "index",
703                name: name.clone(),
704            })?;
705            bound
706                .equality_indexes
707                .insert(name.clone(), (id, value_type));
708        }
709        for spec in &schema.graph_projections {
710            let id = catalog
711                .projection_id(&spec.name)
712                .ok_or_else(|| DbError::UnknownName {
713                    kind: "projection",
714                    name: spec.name.clone(),
715                })?;
716            bound.projections.insert(spec.name.clone(), id);
717        }
718        Ok(bound)
719    }
720
721    /// Prepares a query against the current catalog.
722    ///
723    /// # Errors
724    ///
725    /// Returns [`DbError`] when parsing or semantic analysis fails.
726    ///
727    /// # Performance
728    ///
729    /// This method is `O(query length + catalog lookup cost)`.
730    pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
731        PreparedQuery::prepare(query, &self.current.view())
732    }
733}
734
735/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
736/// be stat'd (size is advisory — used for status reporting and the
737/// auto-checkpoint heuristic, never for correctness).
738///
739/// # Performance
740///
741/// This function is `O(1)`: one `stat` syscall.
742fn file_len(path: &Path) -> u64 {
743    std::fs::metadata(path).map_or(0, |meta| meta.len())
744}
745
746/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
747/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
748/// the recovered state at each crash window.
749///
750/// The crash-matrix test that constructs the non-`Complete` variants is
751/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
752/// miri's isolation cannot model), so under miri only `Complete` is constructed
753/// and the other variants are expectedly unused.
754///
755/// # Performance
756///
757/// `perf: unspecified`; a test-only control tag.
758#[cfg(test)]
759#[cfg_attr(
760    miri,
761    expect(
762        dead_code,
763        reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
764    )
765)]
766#[derive(Clone, Copy, Debug, Eq, PartialEq)]
767enum CheckpointStop {
768    /// Run the whole checkpoint (the production path).
769    Complete,
770    /// Stop after the new base + new log are durable, before the superblock is
771    /// published (the old superblock stays authoritative).
772    BeforeSuperblock,
773    /// Stop after the superblock names the new generation, before the old
774    /// base/log are unlinked (the new superblock is authoritative).
775    BeforeRotate,
776}
777
778/// Reads the whole delta-log into memory, treating a missing file as empty.
779///
780/// # Errors
781///
782/// Returns [`DbError::Io`] when the file cannot be read.
783///
784/// # Performance
785///
786/// This function is `O(log bytes)`.
787fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
788    match std::fs::read(path) {
789        Ok(bytes) => Ok(bytes),
790        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
791        Err(error) => Err(DbError::io("read delta-log", error)),
792    }
793}
794
795/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
796/// discarding a torn tail under the open path.
797///
798/// # Errors
799///
800/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
801///
802/// # Performance
803///
804/// This function is `O(1)`.
805fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
806    let file = std::fs::OpenOptions::new()
807        .write(true)
808        .open(path)
809        .map_err(|error| DbError::io("open delta-log for truncate", error))?;
810    let len = u64::try_from(len)
811        .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
812    file.set_len(len)
813        .map_err(|error| DbError::io("truncate delta-log", error))?;
814    file.sync_all()
815        .map_err(|error| DbError::io("sync truncated delta-log", error))
816}
817
818/// Creates an empty per-generation delta-log, fsyncing the file and the
819/// directory entry so the new (empty) log is durable.
820///
821/// # Errors
822///
823/// Returns [`DbError::Io`] when creation or syncing fails.
824///
825/// # Performance
826///
827/// This function is `O(1)`.
828fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
829    let path = root.join(delta_file(generation));
830    let file =
831        std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
832    file.sync_all()
833        .map_err(|error| DbError::io("sync delta-log", error))?;
834    storage::sync_directory(root)
835}
836
837/// Opens the live delta-log for appending (create when absent, read+append).
838///
839/// # Errors
840///
841/// Returns [`DbError::Io`] when the log cannot be opened.
842///
843/// # Performance
844///
845/// This function is `O(1)`.
846fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
847    std::fs::OpenOptions::new()
848        .create(true)
849        .truncate(false)
850        .read(true)
851        .append(true)
852        .open(root.join(delta_file(generation)))
853        .map_err(|error| DbError::io("open delta-log for append", error))
854}
855
856/// Writes the superblock naming `generation` with the given frontier stamps.
857///
858/// # Errors
859///
860/// Returns [`DbError::Io`] when publishing fails.
861///
862/// # Performance
863///
864/// This function is `O(1)`.
865fn write_superblock(
866    root: &Path,
867    generation: u64,
868    checkpoint_lsn: u64,
869    commit_seq: u64,
870    transaction_id: u64,
871) -> Result<(), DbError> {
872    wal::write_superblock(
873        root,
874        &SuperblockRecord {
875            magic: crate::wire::SUPERBLOCK_MAGIC,
876            base_generation: generation.into(),
877            checkpoint_lsn: checkpoint_lsn.into(),
878            log_byte_offset: 0u64.into(),
879            commit_seq: commit_seq.into(),
880            transaction_id: transaction_id.into(),
881            format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
882            flags: 0u32.into(),
883            crc32c: 0u32.into(),
884            pad: 0u32.into(),
885        },
886    )
887}
888
889/// Snapshot of database status.
890///
891/// # Performance
892///
893/// Copying and comparing status is `O(1)`.
894#[derive(Clone, Copy, Debug, Eq, PartialEq)]
895pub struct Stats {
896    /// Last visible commit sequence.
897    pub visible_commit_seq: CommitSeq,
898    /// Last writer transaction ID burned by this handle.
899    ///
900    /// This value is durable after a dirty commit and session-local after
901    /// rollback.
902    pub last_transaction_id: TransactionId,
903    /// Live base generation named by the superblock (the count of folds this
904    /// store has undergone; gen-0 is the freshly created store).
905    pub live_generation: CheckpointGeneration,
906    /// On-disk byte size of the live base file.
907    pub base_byte_size: u64,
908    /// On-disk byte size of the live delta-log (the tail recovery replays and
909    /// the auto-checkpoint policy weighs against the base size).
910    pub log_byte_size: u64,
911    /// Visible element count.
912    pub element_count: usize,
913    /// Visible relation count.
914    pub relation_count: usize,
915    /// Visible incidence count.
916    pub incidence_count: usize,
917    /// Catalog-size summary.
918    pub catalog: CatalogSummary,
919}
920
921/// Catalog-size summary.
922///
923/// # Performance
924///
925/// Copying and comparing are `O(1)`.
926#[derive(Clone, Copy, Debug, Eq, PartialEq)]
927pub struct CatalogSummary {
928    /// Role count.
929    pub role_count: usize,
930    /// Label count.
931    pub label_count: usize,
932    /// Relation type count.
933    pub relation_type_count: usize,
934    /// Property key count.
935    pub property_key_count: usize,
936    /// Projection count.
937    pub projection_count: usize,
938    /// Index count.
939    pub index_count: usize,
940}
941
942impl CatalogSummary {
943    /// Builds a summary from a catalog.
944    ///
945    /// # Performance
946    ///
947    /// This function is `O(catalog entry count)`.
948    #[must_use]
949    pub fn from_catalog(catalog: &Catalog) -> Self {
950        Self {
951            role_count: catalog.roles().count(),
952            label_count: catalog.labels().count(),
953            relation_type_count: catalog.relation_types().count(),
954            property_key_count: catalog.property_keys().count(),
955            projection_count: catalog.projections().count(),
956            index_count: catalog.indexes().count(),
957        }
958    }
959}
960
961/// Reader pin identifying the visible database generation.
962///
963/// # Performance
964///
965/// Copying and comparing a pin is `O(1)`.
966#[derive(Clone, Copy, Debug, Eq, PartialEq)]
967pub struct ReadPin {
968    /// Pinned visible commit sequence.
969    pub visible_commit_seq: CommitSeq,
970    /// Pinned checkpoint generation.
971    pub generation: CheckpointGeneration,
972}
973
974/// Read transaction over a pinned snapshot.
975///
976/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
977/// [`Db`], so it stays valid across a later `begin_write`/`checkpoint` on
978/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
979/// is [`Send`] + [`Sync`] (asserted below).
980///
981/// # Performance
982///
983/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
984/// snapshot through an `Arc`, not by copying.
985pub struct Reader {
986    /// The pinned snapshot this reader observes.
987    snapshot: Arc<Snapshot>,
988}
989
990/// Returns whether a [`Reader::neighbors`] walk should follow the edge from the
991/// incidence `from` (the queried element's incidence) to the incidence `to`
992/// (a candidate neighbor's incidence) under `direction`.
993///
994/// Endpoint roles are encoded by incidence-creation order: the source endpoint
995/// has the lower incidence id. `Outgoing` follows source→target (the queried
996/// element is the source, so `from < to`), `Incoming` follows target→source, and
997/// `Both` follows either side.
998///
999/// # Performance
1000///
1001/// This function is `O(1)`.
1002const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
1003    match direction {
1004        Direction::Outgoing => from.get() < to.get(),
1005        Direction::Incoming => from.get() > to.get(),
1006        Direction::Both => true,
1007    }
1008}
1009
1010/// `Reader` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
1011/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
1012const fn assert_send_sync<T: Send + Sync>() {}
1013const _: () = assert_send_sync::<Reader>();
1014const _: () = assert_send_sync::<Arc<Snapshot>>();
1015
1016impl Reader {
1017    /// Returns this transaction's reader pin.
1018    ///
1019    /// # Performance
1020    ///
1021    /// This method is `O(1)`.
1022    #[must_use]
1023    pub fn pin(&self) -> ReadPin {
1024        ReadPin {
1025            visible_commit_seq: self.snapshot.lsn(),
1026            generation: self.snapshot.generation(),
1027        }
1028    }
1029
1030    /// Returns catalog metadata.
1031    ///
1032    /// # Performance
1033    ///
1034    /// This method is `O(1)`.
1035    #[must_use]
1036    pub fn catalog(&self) -> &Catalog {
1037        self.snapshot.view().catalog_ref()
1038    }
1039
1040    /// Returns visible element count.
1041    ///
1042    /// # Performance
1043    ///
1044    /// This method is `O(base + overlay change)`.
1045    #[must_use]
1046    pub fn element_count(&self) -> usize {
1047        self.snapshot.view().element_count()
1048    }
1049
1050    /// Returns visible relation count.
1051    ///
1052    /// # Performance
1053    ///
1054    /// This method is `O(base + overlay change)`.
1055    #[must_use]
1056    pub fn relation_count(&self) -> usize {
1057        self.snapshot.view().relation_count()
1058    }
1059
1060    /// Returns visible incidence count.
1061    ///
1062    /// # Performance
1063    ///
1064    /// This method is `O(base + overlay change)`.
1065    #[must_use]
1066    pub fn incidence_count(&self) -> usize {
1067        self.snapshot.view().incidence_count()
1068    }
1069
1070    /// Returns every visible element id in id order.
1071    ///
1072    /// # Performance
1073    ///
1074    /// This method is `O(element count)`.
1075    #[must_use]
1076    pub fn element_ids(&self) -> Vec<ElementId> {
1077        self.snapshot
1078            .view()
1079            .elements()
1080            .map(|record| record.id)
1081            .collect()
1082    }
1083
1084    /// Returns every visible relation id in id order.
1085    ///
1086    /// # Performance
1087    ///
1088    /// This method is `O(relation count)`.
1089    #[must_use]
1090    pub fn relation_ids(&self) -> Vec<RelationId> {
1091        self.snapshot
1092            .view()
1093            .relations()
1094            .map(|record| record.id)
1095            .collect()
1096    }
1097
1098    /// Returns whether an element exists.
1099    ///
1100    /// # Performance
1101    ///
1102    /// This method is `O(log change + log n)`.
1103    #[must_use]
1104    pub fn contains_element(&self, id: ElementId) -> bool {
1105        self.snapshot.view().contains_element(id)
1106    }
1107
1108    /// Returns whether a relation exists.
1109    ///
1110    /// # Performance
1111    ///
1112    /// This method is `O(log change + log n)`.
1113    #[must_use]
1114    pub fn contains_relation(&self, id: RelationId) -> bool {
1115        self.snapshot.view().contains_relation(id)
1116    }
1117
1118    /// Returns whether an incidence exists.
1119    ///
1120    /// # Performance
1121    ///
1122    /// This method is `O(log change + log n)`.
1123    #[must_use]
1124    pub fn contains_incidence(&self, id: IncidenceId) -> bool {
1125        self.snapshot.view().contains_incidence(id)
1126    }
1127
1128    /// Returns an owned element view — id, labels, and all properties read in one
1129    /// call.
1130    ///
1131    /// # Performance
1132    ///
1133    /// This method is `O(log n + label count + property count)`.
1134    #[must_use]
1135    pub fn element(&self, id: ElementId) -> Option<Element> {
1136        let view = self.snapshot.view();
1137        let record = view.element_ref(id)?;
1138        let labels = record.labels.iter().copied().collect();
1139        let properties =
1140            Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
1141        Some(Element::new(id, labels, properties))
1142    }
1143
1144    /// Returns an owned relation view — id, type, labels, and all properties read
1145    /// in one call.
1146    ///
1147    /// # Performance
1148    ///
1149    /// This method is `O(log n + label count + property count)`.
1150    #[must_use]
1151    pub fn relation(&self, id: RelationId) -> Option<Relation> {
1152        let view = self.snapshot.view();
1153        let record = view.relation_ref(id)?;
1154        let labels = record.labels.iter().copied().collect();
1155        let properties =
1156            Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
1157        Some(Relation::new(id, record.relation_type, labels, properties))
1158    }
1159
1160    /// Returns an owned incidence record.
1161    ///
1162    /// # Performance
1163    ///
1164    /// This method is `O(log change + log n)`.
1165    #[must_use]
1166    pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
1167        self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
1168    }
1169
1170    /// Returns every visible incidence attached to an element, in ascending
1171    /// incidence-id order.
1172    ///
1173    /// The merged set mixes overlay-owned and base-borrowed records, so this
1174    /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
1175    /// cheap).
1176    ///
1177    /// # Performance
1178    ///
1179    /// This method is `O(base incidences + overlay incidence change)`.
1180    #[must_use]
1181    pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1182        self.snapshot.view().element_incidences(id)
1183    }
1184
1185    /// Returns a binary relation's two endpoint elements, ordered by ascending
1186    /// incidence id.
1187    ///
1188    /// Reads the relation's incidences from the reverse-adjacency index and
1189    /// returns the elements carried by its first two incidences in id order. A
1190    /// relation with fewer than two visible incidences returns `None`. This
1191    /// reports endpoints structurally, without consulting any projection's
1192    /// source/target roles — use [`Self::neighbors`] when role direction matters.
1193    ///
1194    /// # Performance
1195    ///
1196    /// This method is `O(degree)` over the relation's incidences.
1197    #[must_use]
1198    pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
1199        let incidences = self.snapshot.view().relation_incidences(relation);
1200        match incidences.as_slice() {
1201            [first, second, ..] => Some((first.element, second.element)),
1202            _too_few => None,
1203        }
1204    }
1205
1206    /// Returns the elements reachable from `element` along relations of
1207    /// `relation_type`, in ascending element-id order.
1208    ///
1209    /// Direction selects the role `element` must play on each relation. Endpoint
1210    /// roles are encoded by incidence-creation order: a binary relation's source
1211    /// is its lower incidence id and its target the higher (see
1212    /// [`Self::endpoints`]). `Outgoing` requires `element` to be the source (and
1213    /// yields the target), `Incoming` requires it to be the target (and yields
1214    /// the source), and `Both` yields the opposite endpoint either way. Resolved
1215    /// over the reverse-adjacency index — each incidence of `element` whose
1216    /// relation has the requested type contributes that relation's other
1217    /// endpoint — so this works for any binary relation without a materialized
1218    /// projection.
1219    ///
1220    /// # Performance
1221    ///
1222    /// This method is `O(degree of element + sum of touched relation degrees)`.
1223    #[must_use]
1224    pub fn neighbors(
1225        &self,
1226        element: ElementId,
1227        relation_type: RelationTypeId,
1228        direction: Direction,
1229    ) -> Vec<ElementId> {
1230        let view = self.snapshot.view();
1231        let mut neighbors = BTreeSet::new();
1232        for incidence in view.element_incidences(element) {
1233            let matches_type = view
1234                .relation_ref(incidence.relation)
1235                .is_some_and(|record| record.relation_type == Some(relation_type));
1236            if !matches_type {
1237                continue;
1238            }
1239            // The incidence id encodes the endpoint role: the source endpoint is
1240            // created first (lower incidence id), the target second. Compare
1241            // `element`'s incidence id against each other endpoint's to decide
1242            // which side `element` is on, then follow per the requested direction.
1243            neighbors.extend(
1244                view.relation_incidences(incidence.relation)
1245                    .into_iter()
1246                    .filter(|other| other.element != element)
1247                    .filter(|other| follow_direction(direction, incidence.id, other.id))
1248                    .map(|other| other.element),
1249            );
1250        }
1251        neighbors.into_iter().collect()
1252    }
1253
1254    /// Returns one owned property value.
1255    ///
1256    /// # Performance
1257    ///
1258    /// This method is `O(log subjects + log keys)`.
1259    #[must_use]
1260    pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
1261        self.snapshot
1262            .view()
1263            .property_ref(subject, key)
1264            .map(Cow::into_owned)
1265    }
1266
1267    /// Returns the owned element whose value in `index` equals `value`, or `None`
1268    /// when no element matches.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns [`DbError`] when the index is unknown or is not an equality index.
1273    ///
1274    /// # Performance
1275    ///
1276    /// This method is `O(log n + label count + property count)`.
1277    pub fn element_by_key<T: ValueType>(
1278        &self,
1279        index: EqualityIndex<T>,
1280        value: impl Assignable<T>,
1281    ) -> Result<Option<Element>, DbError> {
1282        let value = value.into_value()?;
1283        let matched = self
1284            .lookup(index.id(), Match::Equal(&value))?
1285            .into_iter()
1286            .find_map(|subject| match subject {
1287                PropertySubject::Element(id) => Some(id),
1288                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
1289            });
1290        Ok(matched.and_then(|id| self.element(id)))
1291    }
1292
1293    /// Returns the number of subjects carried by a membership index (a label or
1294    /// relation-type index).
1295    ///
1296    /// # Errors
1297    ///
1298    /// Returns [`DbError`] when the index is unknown or does not support
1299    /// membership enumeration.
1300    ///
1301    /// # Performance
1302    ///
1303    /// This method is `O(indexed family size)`.
1304    pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
1305        self.lookup(index, Match::All)
1306            .map(|subjects| subjects.len())
1307    }
1308
1309    /// Looks up subjects with a property value.
1310    ///
1311    /// # Errors
1312    ///
1313    /// Returns [`DbError`] when the property key is unknown or `value` does not
1314    /// match the key schema.
1315    ///
1316    /// # Performance
1317    ///
1318    /// This method is `O(property subject count)`.
1319    pub fn lookup_property_equal(
1320        &self,
1321        key: PropertyKeyId,
1322        value: &PropertyValue,
1323    ) -> Result<Vec<PropertySubject>, DbError> {
1324        self.snapshot.view().typed_property_equal(key, value)
1325    }
1326
1327    /// Looks up subjects with a property inside an inclusive range.
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns [`DbError`] when the property key is unknown or either bound
1332    /// does not match the key schema.
1333    ///
1334    /// # Performance
1335    ///
1336    /// This method is `O(property subject count)`.
1337    pub fn lookup_property_range(
1338        &self,
1339        key: PropertyKeyId,
1340        min: &PropertyValue,
1341        max: &PropertyValue,
1342    ) -> Result<Vec<PropertySubject>, DbError> {
1343        self.snapshot.view().typed_property_range(key, min, max)
1344    }
1345
1346    /// Executes an index lookup.
1347    ///
1348    /// # Errors
1349    ///
1350    /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1351    /// match the index kind, or supplied property values do not match catalog
1352    /// schemas.
1353    ///
1354    /// # Performance
1355    ///
1356    /// This method is `O(indexed family size)`.
1357    pub fn lookup(
1358        &self,
1359        index: IndexId,
1360        lookup: Match<'_>,
1361    ) -> Result<Vec<PropertySubject>, DbError> {
1362        let view = self.snapshot.view();
1363        let entry = view
1364            .catalog()
1365            .index(index)
1366            .ok_or(DbError::UnknownIndex { id: index })?;
1367        match (&entry.definition, lookup) {
1368            (IndexDefinition::Label { label }, Match::All) => Ok(view
1369                .elements_with_label(*label)
1370                .into_iter()
1371                .map(PropertySubject::Element)
1372                .collect()),
1373            (IndexDefinition::Label { .. }, _lookup) => {
1374                Err(DbError::unsupported("label index expects all lookup"))
1375            }
1376            (IndexDefinition::RelationType { relation_type }, Match::All) => Ok(view
1377                .relations_with_type(*relation_type)
1378                .into_iter()
1379                .map(PropertySubject::Relation)
1380                .collect()),
1381            (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1382                "relation type index expects all lookup",
1383            )),
1384            (IndexDefinition::PropertyEquality { key }, Match::Equal(value)) => {
1385                view.typed_property_equal(*key, value)
1386            }
1387            (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1388                "property equality index expects equality lookup",
1389            )),
1390            (IndexDefinition::PropertyRange { key }, Match::Range { min, max }) => {
1391                view.typed_property_range(*key, min, max)
1392            }
1393            (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1394                "property range index expects range lookup",
1395            )),
1396            (IndexDefinition::CompositeEquality { keys }, Match::Composite(values)) => {
1397                view.typed_property_composite_equal(keys, values)
1398            }
1399            (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1400                "composite equality index expects composite equality lookup",
1401            )),
1402            (IndexDefinition::Projection { projection }, Match::All) => {
1403                self.projection_index_subjects(*projection)
1404            }
1405            (IndexDefinition::Projection { .. }, _lookup) => {
1406                Err(DbError::unsupported("projection index expects all lookup"))
1407            }
1408        }
1409    }
1410
1411    /// Materializes a graph projection.
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1416    /// fails validation against current topology.
1417    ///
1418    /// # Performance
1419    ///
1420    /// This method is `O(relation count * incidence count)`.
1421    pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1422        let view = self.snapshot.view();
1423        let entry = view
1424            .catalog()
1425            .projection(id)
1426            .ok_or(DbError::UnknownProjection { id })?;
1427        match &entry.definition {
1428            ProjectionDefinition::Graph(definition) => {
1429                projection::GraphProjection::from_state(&view, definition.clone())
1430            }
1431            ProjectionDefinition::Hypergraph(_definition) => {
1432                Err(DbError::invalid_projection("projection is not a graph"))
1433            }
1434        }
1435    }
1436
1437    /// Materializes a graph projection by catalog name.
1438    ///
1439    /// # Errors
1440    ///
1441    /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1442    /// fails validation against current topology.
1443    ///
1444    /// # Performance
1445    ///
1446    /// This method is `O(log projection count + relation count * incidence count)`.
1447    pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1448        let id = self
1449            .snapshot
1450            .view()
1451            .catalog()
1452            .projection_id(name)
1453            .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1454        self.graph_projection(id)
1455    }
1456
1457    /// Walks a cataloged graph projection from canonical seed elements,
1458    /// returning the discovered nodes AND the projection edges among them.
1459    ///
1460    /// Nodes are unique canonical elements in BFS first-discovery order; depth is
1461    /// the shortest discovered hop count from any seed. Edges connect two
1462    /// discovered nodes, ordered deterministically and unique by relation, so the
1463    /// [`Subgraph`] never references a node it omitted.
1464    ///
1465    /// # Errors
1466    ///
1467    /// Returns [`DbError`] when the projection is unknown, is not a graph,
1468    /// cannot be materialized, or a seed element is not part of the projection.
1469    ///
1470    /// # Performance
1471    ///
1472    /// This method is `O(relation count * incidence count + visited edges)`.
1473    pub fn walk(
1474        &self,
1475        projection: ProjectionId,
1476        seeds: &[ElementId],
1477        walk: Walk,
1478    ) -> Result<Subgraph, DbError> {
1479        if seeds.is_empty() || walk.limit == 0 {
1480            return Ok(Subgraph::default());
1481        }
1482        let graph = self.graph_projection(projection)?;
1483        traversal::walk_graph_projection(&graph, seeds, walk)
1484    }
1485
1486    /// Ranks a cataloged graph projection by personalized `PageRank`, returning
1487    /// every projection element paired with its rank, ordered highest first.
1488    ///
1489    /// `seeds` are the restart (teleport) set: rank mass returns to them on each
1490    /// damping step, biasing the stationary distribution toward elements
1491    /// reachable from the seeds (random walk with restart). The seed weights are
1492    /// normalized internally, so passing the seed elements is sufficient. With no
1493    /// seeds this is the uniform-teleport `PageRank` over the projection.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
1498    /// be materialized, or `PageRank` rejects the configuration (the
1499    /// [`PageRankConfig`] was invalid or the power iteration did not converge).
1500    /// Seeds absent from the projection are ignored rather than erroring; with no
1501    /// resolvable seed this is the uniform-teleport rank.
1502    ///
1503    /// # Performance
1504    ///
1505    /// This method is `O(relation count * incidence count + iterations *
1506    /// (visible elements + visible edges) + visible elements * log(visible
1507    /// elements))` — the trailing term is the final rank sort.
1508    pub fn personalized_pagerank(
1509        &self,
1510        projection: ProjectionId,
1511        seeds: &[ElementId],
1512        config: PageRankConfig<f64>,
1513    ) -> Result<Vec<(ElementId, f64)>, DbError> {
1514        let graph = self.graph_projection(projection)?;
1515        let bound = graph.element_bound();
1516        let element_count = u32::try_from(bound).map_err(|_| {
1517            DbError::traversal("projection exceeds the personalized pagerank index bound")
1518        })?;
1519
1520        let mut personalization = vec![0.0_f64; bound];
1521        let mut seeded = false;
1522        for &seed in seeds {
1523            if let Some(local) = graph.local_element_id(seed) {
1524                personalization[graph.element_index(local)] = 1.0;
1525                seeded = true;
1526            }
1527        }
1528
1529        let mut ranks = vec![0.0_f64; bound];
1530        let mut workspace = PageRankWorkspace::for_graph(&graph);
1531        pagerank_graph_with_workspace(
1532            &graph,
1533            &Uniform,
1534            (0..element_count).map(ProjectionElementId::new),
1535            config,
1536            seeded.then_some(personalization.as_slice()),
1537            &mut ranks,
1538            &mut workspace,
1539        )
1540        .map_err(|error| {
1541            DbError::traversal(match error {
1542                PageRankError::InvalidDamping { .. }
1543                | PageRankError::InvalidTolerance { .. }
1544                | PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
1545                PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
1546                _ => "personalized pagerank failed",
1547            })
1548        })?;
1549
1550        let mut ranked: Vec<(ElementId, f64)> = (0..element_count)
1551            .map(|index| {
1552                let local = ProjectionElementId::new(index);
1553                (
1554                    graph.canonical_element_id(local),
1555                    ranks[graph.element_index(local)],
1556                )
1557            })
1558            .collect();
1559        ranked.sort_by(|left, right| right.1.total_cmp(&left.1));
1560        Ok(ranked)
1561    }
1562
1563    /// Returns the longest chain of canonical elements along the projection's
1564    /// outgoing edges within the subgraph induced by `elements`.
1565    ///
1566    /// Only edges whose endpoints are both in `elements` participate. The path
1567    /// lists each element once from start to end; its length in edges is
1568    /// `path.len() - 1`. An empty `elements` slice yields an empty path.
1569    ///
1570    /// # Errors
1571    ///
1572    /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
1573    /// be materialized, or the induced subgraph contains a cycle. Elements absent
1574    /// from the projection are ignored, so the chain is computed over the present
1575    /// subset.
1576    ///
1577    /// # Performance
1578    ///
1579    /// This method is `O(relation count * incidence count + visible elements +
1580    /// visible edges)`.
1581    pub fn longest_path(
1582        &self,
1583        projection: ProjectionId,
1584        elements: &[ElementId],
1585    ) -> Result<Vec<ElementId>, DbError> {
1586        if elements.is_empty() {
1587            return Ok(Vec::new());
1588        }
1589        let graph = self.graph_projection(projection)?;
1590        let locals = elements
1591            .iter()
1592            .filter_map(|&element| graph.local_element_id(element))
1593            .collect::<Vec<ProjectionElementId>>();
1594        let path = longest_path_dag(&graph, &locals)
1595            .map_err(|_| DbError::traversal("longest path found a cycle"))?;
1596        Ok(path
1597            .into_iter()
1598            .map(|local| graph.canonical_element_id(local))
1599            .collect())
1600    }
1601
1602    /// Materializes a hypergraph projection.
1603    ///
1604    /// # Errors
1605    ///
1606    /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1607    /// or fails validation against current topology.
1608    ///
1609    /// # Performance
1610    ///
1611    /// This method is `O(relation count * incidence count)`.
1612    pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1613        let view = self.snapshot.view();
1614        let entry = view
1615            .catalog()
1616            .projection(id)
1617            .ok_or(DbError::UnknownProjection { id })?;
1618        match &entry.definition {
1619            ProjectionDefinition::Hypergraph(definition) => {
1620                projection::HypergraphProjection::from_state(&view, definition.clone())
1621            }
1622            ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1623                "projection is not a hypergraph",
1624            )),
1625        }
1626    }
1627
1628    /// Executes a prepared query.
1629    ///
1630    /// # Errors
1631    ///
1632    /// Returns [`DbError`] when execution cannot materialize a referenced
1633    /// projection.
1634    ///
1635    /// # Performance
1636    ///
1637    /// This method is `O(plan output + projection build cost when used)`.
1638    pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1639        query.execute(&self.snapshot.view())
1640    }
1641
1642    /// Explains a prepared query.
1643    ///
1644    /// # Performance
1645    ///
1646    /// This method is `O(plan size)`.
1647    #[must_use]
1648    pub fn explain(&self, query: &PreparedQuery) -> String {
1649        query.explain()
1650    }
1651
1652    /// Materializes subjects represented by a projection index.
1653    ///
1654    /// # Errors
1655    ///
1656    /// Returns [`DbError`] when the projection is unknown or cannot be
1657    /// materialized.
1658    ///
1659    /// # Performance
1660    ///
1661    /// This method is `O(relation count * incidence count)`.
1662    fn projection_index_subjects(
1663        &self,
1664        projection: ProjectionId,
1665    ) -> Result<Vec<PropertySubject>, DbError> {
1666        let view = self.snapshot.view();
1667        let entry = view
1668            .catalog()
1669            .projection(projection)
1670            .ok_or(DbError::UnknownProjection { id: projection })?;
1671        match &entry.definition {
1672            ProjectionDefinition::Graph(definition) => {
1673                Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1674            }
1675            ProjectionDefinition::Hypergraph(definition) => Ok(
1676                projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1677            ),
1678        }
1679    }
1680}
1681
1682/// Single writer transaction.
1683///
1684/// Mutations accumulate into a private write overlay layered over the parent
1685/// snapshot; reads fall through the overlay then the base. `commit` appends the
1686/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1687/// `rollback` drops the overlay and appends nothing.
1688///
1689/// # Performance
1690///
1691/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1692pub struct Writer<'db> {
1693    /// Db receiving the commit.
1694    database: &'db mut Db,
1695    /// Parent snapshot the writer layers over (its base + frozen overlay).
1696    parent: Arc<Snapshot>,
1697    /// Private mutable delta this writer accumulates.
1698    delta: WriteOverlay,
1699    /// Writer transaction id (session-local until a dirty commit makes it
1700    /// durable).
1701    transaction_id: TransactionId,
1702    /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1703    /// transaction ends (on `rollback`, or on any early-return error path); a
1704    /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1705    /// triggered auto-checkpoint can re-acquire it.
1706    lock: WriterLock,
1707}
1708
1709impl Writer<'_> {
1710    /// Registers a structural incidence role.
1711    ///
1712    /// # Errors
1713    ///
1714    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1715    ///
1716    /// # Performance
1717    ///
1718    /// This method is `O(log role count + name length)`.
1719    pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1720        self.delta.register_role(name.into())
1721    }
1722
1723    /// Registers an element or relation label.
1724    ///
1725    /// # Errors
1726    ///
1727    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1728    ///
1729    /// # Performance
1730    ///
1731    /// This method is `O(log label count + name length)`.
1732    pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1733        self.delta.register_label(name.into())
1734    }
1735
1736    /// Registers a relation type.
1737    ///
1738    /// # Errors
1739    ///
1740    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1741    ///
1742    /// # Performance
1743    ///
1744    /// This method is `O(log relation type count + name length)`.
1745    pub fn register_relation_type(
1746        &mut self,
1747        name: impl Into<String>,
1748    ) -> Result<RelationTypeId, DbError> {
1749        self.delta.register_relation_type(name.into())
1750    }
1751
1752    /// Registers a typed property key.
1753    ///
1754    /// # Errors
1755    ///
1756    /// Returns [`DbError`] when the name already exists or ID allocation fails.
1757    ///
1758    /// # Performance
1759    ///
1760    /// This method is `O(log property key count + name length)`.
1761    pub fn register_property_key(
1762        &mut self,
1763        name: impl Into<String>,
1764        family: PropertyFamily,
1765        value_type: PropertyType,
1766    ) -> Result<PropertyKeyId, DbError> {
1767        self.delta
1768            .register_property_key(name.into(), family, value_type)
1769    }
1770
1771    /// Defines a physical projection.
1772    ///
1773    /// # Errors
1774    ///
1775    /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1776    /// projection name already exists, or ID allocation fails.
1777    ///
1778    /// # Performance
1779    ///
1780    /// This method is `O(definition size + catalog lookup cost)`.
1781    pub fn define_projection(
1782        &mut self,
1783        definition: ProjectionDefinition,
1784    ) -> Result<ProjectionId, DbError> {
1785        self.validate_projection_definition(&definition)?;
1786        self.delta.register_projection(definition)
1787    }
1788
1789    /// Defines an index.
1790    ///
1791    /// # Errors
1792    ///
1793    /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1794    /// name already exists, or ID allocation fails.
1795    ///
1796    /// # Performance
1797    ///
1798    /// This method is `O(definition size + catalog lookup cost)`.
1799    pub fn define_index(
1800        &mut self,
1801        name: impl Into<String>,
1802        definition: IndexDefinition,
1803    ) -> Result<IndexId, DbError> {
1804        self.validate_index_definition(&definition)?;
1805        self.delta.register_index(name.into(), definition)
1806    }
1807
1808    /// Applies a declarative [`Schema`] idempotently (register-or-get every
1809    /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
1810    /// the same schema reuses existing ids; a name that already exists with a
1811    /// conflicting shape is a [`DbError::SchemaConflict`].
1812    ///
1813    /// # Errors
1814    ///
1815    /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
1816    /// index's key, a projection's role/type), or id-allocation failure.
1817    ///
1818    /// # Performance
1819    ///
1820    /// This method is `O(declared items × log catalog)`.
1821    pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
1822        let mut bound = Bound::default();
1823        for name in &schema.roles {
1824            let id = match self.merged().catalog().role_id(name) {
1825                Some(id) => id,
1826                None => self.register_role(name.clone())?,
1827            };
1828            bound.roles.insert(name.clone(), id);
1829        }
1830        for name in &schema.labels {
1831            let id = match self.merged().catalog().label_id(name) {
1832                Some(id) => id,
1833                None => self.register_label(name.clone())?,
1834            };
1835            bound.labels.insert(name.clone(), id);
1836        }
1837        for name in &schema.relation_types {
1838            let id = match self.merged().catalog().relation_type_id(name) {
1839                Some(id) => id,
1840                None => self.register_relation_type(name.clone())?,
1841            };
1842            bound.relation_types.insert(name.clone(), id);
1843        }
1844        for (name, family, value_type) in &schema.keys {
1845            let id = self.register_key_or_get(name, *family, *value_type)?;
1846            bound.keys.insert(name.clone(), (id, *value_type));
1847        }
1848        for (name, key_name) in &schema.equality_indexes {
1849            let (key_id, value_type) =
1850                *bound
1851                    .keys
1852                    .get(key_name)
1853                    .ok_or_else(|| DbError::UnknownName {
1854                        kind: "property key",
1855                        name: key_name.clone(),
1856                    })?;
1857            let id = match self.merged().catalog().index_id(name) {
1858                Some(id) => id,
1859                None => self.define_index(
1860                    name.clone(),
1861                    IndexDefinition::PropertyEquality { key: key_id },
1862                )?,
1863            };
1864            bound
1865                .equality_indexes
1866                .insert(name.clone(), (id, value_type));
1867        }
1868        for spec in &schema.graph_projections {
1869            let id = match self.merged().catalog().projection_id(&spec.name) {
1870                Some(id) => id,
1871                None => self.define_graph_projection(spec, &bound)?,
1872            };
1873            bound.projections.insert(spec.name.clone(), id);
1874        }
1875        Ok(bound)
1876    }
1877
1878    /// Registers a property key, or returns the existing id when the name is
1879    /// already present with a matching family and value type.
1880    ///
1881    /// # Errors
1882    ///
1883    /// Returns [`DbError::SchemaConflict`] when the name exists with a different
1884    /// family or value type.
1885    ///
1886    /// # Performance
1887    ///
1888    /// This method is `O(log catalog)`.
1889    fn register_key_or_get(
1890        &mut self,
1891        name: &str,
1892        family: PropertyFamily,
1893        value_type: PropertyType,
1894    ) -> Result<PropertyKeyId, DbError> {
1895        let Some(existing) = self.merged().catalog().property_key_id(name) else {
1896            return self.register_property_key(name.to_owned(), family, value_type);
1897        };
1898        let matches = self
1899            .merged()
1900            .catalog()
1901            .property_key(existing)
1902            .is_some_and(|def| def.family == family && def.value_type == value_type);
1903        if matches {
1904            Ok(existing)
1905        } else {
1906            Err(DbError::SchemaConflict {
1907                name: name.to_owned(),
1908                reason: "property key family/value type differs from the existing catalog entry",
1909            })
1910        }
1911    }
1912
1913    /// Defines a graph projection from a spec, resolving its relation-type and
1914    /// role names through `bound`.
1915    ///
1916    /// # Errors
1917    ///
1918    /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
1919    /// a definition error.
1920    ///
1921    /// # Performance
1922    ///
1923    /// This method is `O(relation-type count × log catalog)`.
1924    fn define_graph_projection(
1925        &mut self,
1926        spec: &GraphProjectionSpec,
1927        bound: &Bound,
1928    ) -> Result<ProjectionId, DbError> {
1929        let mut relation_types = BTreeSet::new();
1930        for name in &spec.relation_types {
1931            relation_types.insert(bound.relation_type(name)?);
1932        }
1933        let source_role = bound.role(&spec.source_role)?;
1934        let target_role = bound.role(&spec.target_role)?;
1935        self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
1936            name: spec.name.clone(),
1937            relation_types,
1938            source_role,
1939            target_role,
1940        }))
1941    }
1942
1943    /// Creates a canonical element.
1944    ///
1945    /// # Errors
1946    ///
1947    /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1948    ///
1949    /// # Performance
1950    ///
1951    /// This method is `O(log element change)`.
1952    pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1953        self.delta.create_element()
1954    }
1955
1956    /// Creates a canonical relation.
1957    ///
1958    /// # Errors
1959    ///
1960    /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1961    ///
1962    /// # Performance
1963    ///
1964    /// This method is `O(log relation change)`.
1965    pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1966        self.delta.create_relation()
1967    }
1968
1969    /// Creates a canonical incidence.
1970    ///
1971    /// # Errors
1972    ///
1973    /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1974    /// exhausted.
1975    ///
1976    /// # Performance
1977    ///
1978    /// This method is `O(log incidence change + reference lookup cost)`.
1979    pub fn create_incidence(
1980        &mut self,
1981        relation: RelationId,
1982        element: ElementId,
1983        role: RoleId,
1984    ) -> Result<IncidenceId, DbError> {
1985        self.require_relation(relation)?;
1986        self.require_element(element)?;
1987        self.require_role(role)?;
1988        self.delta.create_incidence(relation, element, role)
1989    }
1990
1991    /// Tombstones a canonical element and its incidences.
1992    ///
1993    /// # Errors
1994    ///
1995    /// Returns [`DbError::UnknownElement`] when the element is not visible.
1996    ///
1997    /// # Performance
1998    ///
1999    /// This method is `O(log n + degree)` via the reverse-adjacency index.
2000    pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
2001        self.require_element(id)?;
2002        // Cascade: every incidence on the element — resolved in O(log n + degree)
2003        // through the reverse-adjacency index, not a full incidence scan — is
2004        // tombstoned too.
2005        let incidences: Vec<IncidenceId> = self
2006            .merged()
2007            .element_incidences(id)
2008            .into_iter()
2009            .map(|record| record.id)
2010            .collect();
2011        let base = self.parent.base_records();
2012        self.delta.tombstone_element(base, id);
2013        for incidence in incidences {
2014            self.delta
2015                .tombstone_incidence(self.parent.base_records(), incidence);
2016        }
2017        Ok(())
2018    }
2019
2020    /// Tombstones a canonical relation and its incidences.
2021    ///
2022    /// # Errors
2023    ///
2024    /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
2025    ///
2026    /// # Performance
2027    ///
2028    /// This method is `O(log n + degree)` via the reverse-adjacency index.
2029    pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
2030        self.require_relation(id)?;
2031        // Cascade: every incidence in the relation — resolved in O(log n + degree)
2032        // through the reverse-adjacency index, not a full incidence scan.
2033        let incidences: Vec<IncidenceId> = self
2034            .merged()
2035            .relation_incidences(id)
2036            .into_iter()
2037            .map(|record| record.id)
2038            .collect();
2039        let base = self.parent.base_records();
2040        self.delta.tombstone_relation(base, id);
2041        for incidence in incidences {
2042            self.delta
2043                .tombstone_incidence(self.parent.base_records(), incidence);
2044        }
2045        Ok(())
2046    }
2047
2048    /// Tombstones a canonical incidence.
2049    ///
2050    /// # Errors
2051    ///
2052    /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
2053    ///
2054    /// # Performance
2055    ///
2056    /// This method is `O(log incidence change)`.
2057    pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
2058        self.require_incidence(id)?;
2059        self.delta
2060            .tombstone_incidence(self.parent.base_records(), id);
2061        Ok(())
2062    }
2063
2064    /// Adds a label to an element.
2065    ///
2066    /// # Errors
2067    ///
2068    /// Returns [`DbError`] when the element or label is unknown.
2069    ///
2070    /// # Performance
2071    ///
2072    /// This method is `O(log element change + log label count)`.
2073    pub(crate) fn add_element_label(
2074        &mut self,
2075        element: ElementId,
2076        label: LabelId,
2077    ) -> Result<(), DbError> {
2078        self.require_element(element)?;
2079        self.require_label(label)?;
2080        self.delta
2081            .add_element_label(self.parent.base_records(), element, label);
2082        Ok(())
2083    }
2084
2085    /// Adds a label to a relation.
2086    ///
2087    /// # Errors
2088    ///
2089    /// Returns [`DbError`] when the relation or label is unknown.
2090    ///
2091    /// # Performance
2092    ///
2093    /// This method is `O(log relation change + log label count)`.
2094    pub(crate) fn add_relation_label(
2095        &mut self,
2096        relation: RelationId,
2097        label: LabelId,
2098    ) -> Result<(), DbError> {
2099        self.require_relation(relation)?;
2100        self.require_label(label)?;
2101        self.delta
2102            .add_relation_label(self.parent.base_records(), relation, label);
2103        Ok(())
2104    }
2105
2106    /// Sets a relation type.
2107    ///
2108    /// # Errors
2109    ///
2110    /// Returns [`DbError`] when the relation or relation type is unknown.
2111    ///
2112    /// # Performance
2113    ///
2114    /// This method is `O(log relation change + log relation type count)`.
2115    pub fn set_relation_type(
2116        &mut self,
2117        relation: RelationId,
2118        relation_type: RelationTypeId,
2119    ) -> Result<(), DbError> {
2120        self.require_relation(relation)?;
2121        self.require_relation_type(relation_type)?;
2122        self.delta
2123            .set_relation_type(self.parent.base_records(), relation, relation_type);
2124        Ok(())
2125    }
2126
2127    /// Sets a property value.
2128    ///
2129    /// # Errors
2130    ///
2131    /// Returns [`DbError`] when the subject or key is unknown, or the value
2132    /// does not match the key schema.
2133    ///
2134    /// # Performance
2135    ///
2136    /// This method is `O(log subject change + log key count)`.
2137    pub(crate) fn set_property(
2138        &mut self,
2139        subject: PropertySubject,
2140        key: PropertyKeyId,
2141        value: PropertyValue,
2142    ) -> Result<(), DbError> {
2143        // Referential integrity: the subject must be visible (this rejects an
2144        // orphan property against a tombstoned/absent subject at the transaction
2145        // boundary — the overlay layer is permissive by design).
2146        self.require_subject(subject)?;
2147        let definition = self
2148            .merged()
2149            .catalog()
2150            .property_key(key)
2151            .cloned()
2152            .ok_or(DbError::UnknownPropertyKey { id: key })?;
2153        if definition.family != subject.family() {
2154            return Err(DbError::WrongPropertyFamily {
2155                expected: definition.family,
2156                actual: subject.family(),
2157            });
2158        }
2159        if definition.value_type != value.value_type() {
2160            return Err(DbError::PropertyTypeMismatch {
2161                expected: definition.value_type,
2162                actual: value.value_type(),
2163            });
2164        }
2165        self.delta
2166            .set_property(self.parent.base_records(), subject, key, value);
2167        Ok(())
2168    }
2169
2170    /// Removes a property value.
2171    ///
2172    /// # Errors
2173    ///
2174    /// Returns [`DbError`] when the subject or key is unknown.
2175    ///
2176    /// # Performance
2177    ///
2178    /// This method is `O(log subject change + log key count)`.
2179    pub(crate) fn remove_property(
2180        &mut self,
2181        subject: PropertySubject,
2182        key: PropertyKeyId,
2183    ) -> Result<(), DbError> {
2184        self.require_subject(subject)?;
2185        if self.merged().catalog().property_key(key).is_none() {
2186            return Err(DbError::UnknownPropertyKey { id: key });
2187        }
2188        self.delta
2189            .remove_property(self.parent.base_records(), subject, key);
2190        Ok(())
2191    }
2192
2193    /// Resolves the property key an equality index covers.
2194    ///
2195    /// # Errors
2196    ///
2197    /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
2198    /// unsupported-query error when it is not a property-equality index.
2199    ///
2200    /// # Performance
2201    ///
2202    /// This method is `O(log index count)`.
2203    fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
2204        let view = self.merged();
2205        let entry = view
2206            .catalog()
2207            .index(index)
2208            .ok_or(DbError::UnknownIndex { id: index })?;
2209        match &entry.definition {
2210            IndexDefinition::PropertyEquality { key } => Ok(*key),
2211            _other => Err(DbError::unsupported(
2212                "reconcile requires a property-equality index",
2213            )),
2214        }
2215    }
2216
2217    /// Inserts or updates the element whose value under `index` equals `value`,
2218    /// returning its canonical id — reused when an element already carries that
2219    /// identity value (id stable across reconcile), freshly minted (a never-reused
2220    /// id, with the identity property set) otherwise.
2221    ///
2222    /// # Errors
2223    ///
2224    /// Returns [`DbError`] when `index` is not an equality index or the value
2225    /// type mismatches the key schema.
2226    ///
2227    /// # Performance
2228    ///
2229    /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
2230    pub fn upsert_element<T: ValueType>(
2231        &mut self,
2232        index: EqualityIndex<T>,
2233        value: impl Assignable<T>,
2234    ) -> Result<ElementId, DbError> {
2235        let value = value.into_value()?;
2236        let key = self.equality_index_key(index.id())?;
2237        let existing = self
2238            .merged()
2239            .property_equal(key, &value)
2240            .into_iter()
2241            .find_map(|subject| match subject {
2242                PropertySubject::Element(id) => Some(id),
2243                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
2244            });
2245        if let Some(id) = existing {
2246            return Ok(id);
2247        }
2248        let element = self.create_element()?;
2249        self.set_property(PropertySubject::Element(element), key, value)?;
2250        Ok(element)
2251    }
2252
2253    /// Inserts or updates the relation whose value under `index` equals `value`,
2254    /// returning its canonical id. On a miss it mints the relation, sets its type
2255    /// and identity property, and creates one incidence per `(element, role)`
2256    /// endpoint; on a hit the existing relation (with its endpoints) is reused
2257    /// unchanged — the identity value encodes the endpoints, so they are immutable.
2258    ///
2259    /// # Errors
2260    ///
2261    /// Returns [`DbError`] when `index` is not an equality index, the value type
2262    /// mismatches, or an endpoint element does not exist.
2263    ///
2264    /// # Performance
2265    ///
2266    /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
2267    pub fn upsert_relation<T: ValueType>(
2268        &mut self,
2269        index: EqualityIndex<T>,
2270        value: impl Assignable<T>,
2271        relation_type: RelationTypeId,
2272        endpoints: &[(ElementId, RoleId)],
2273    ) -> Result<RelationId, DbError> {
2274        let value = value.into_value()?;
2275        let key = self.equality_index_key(index.id())?;
2276        let existing = self
2277            .merged()
2278            .property_equal(key, &value)
2279            .into_iter()
2280            .find_map(|subject| match subject {
2281                PropertySubject::Relation(id) => Some(id),
2282                PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
2283            });
2284        if let Some(id) = existing {
2285            return Ok(id);
2286        }
2287        let relation = self.create_relation()?;
2288        self.set_relation_type(relation, relation_type)?;
2289        self.set_property(PropertySubject::Relation(relation), key, value)?;
2290        for (element, role) in endpoints {
2291            self.create_incidence(relation, *element, *role)?;
2292        }
2293        Ok(relation)
2294    }
2295
2296    /// Tombstones every subject carried by `index` whose identity value is NOT in
2297    /// `keep`, cascading each subject's incidences in `O(degree)` via the
2298    /// reverse-adjacency index. The prune half of a reconcile: after upserting
2299    /// every desired subject, `retain` removes the vanished complement.
2300    ///
2301    /// # Errors
2302    ///
2303    /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
2304    /// type mismatches the key schema.
2305    ///
2306    /// # Performance
2307    ///
2308    /// This method is `O(family size + removed × degree)`.
2309    pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
2310        &mut self,
2311        index: EqualityIndex<T>,
2312        keep: &[V],
2313    ) -> Result<(), DbError> {
2314        let key = self.equality_index_key(index.id())?;
2315        let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
2316        for value in keep {
2317            keep_values.insert((*value).into_value()?);
2318        }
2319        let stale: Vec<PropertySubject> = self
2320            .merged()
2321            .property_key_subjects(key)
2322            .into_iter()
2323            .filter(|(_subject, value)| !keep_values.contains(value))
2324            .map(|(subject, _value)| subject)
2325            .collect();
2326        for subject in stale {
2327            match subject {
2328                PropertySubject::Element(id) => self.tombstone_element(id)?,
2329                PropertySubject::Relation(id) => self.tombstone_relation(id)?,
2330                PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
2331            }
2332        }
2333        Ok(())
2334    }
2335
2336    /// Sets a typed property on a subject; the value type is checked at compile
2337    /// time against the key.
2338    ///
2339    /// # Errors
2340    ///
2341    /// Returns [`DbError`] when the subject is absent, the value is out of range,
2342    /// or the value type mismatches the key schema.
2343    ///
2344    /// # Performance
2345    ///
2346    /// This method is `O(log change + log keys)`.
2347    pub fn set<T: ValueType>(
2348        &mut self,
2349        subject: impl Into<PropertySubject>,
2350        key: Key<T>,
2351        value: impl Assignable<T>,
2352    ) -> Result<(), DbError> {
2353        self.set_property(subject.into(), key.id(), value.into_value()?)
2354    }
2355
2356    /// Removes a typed property from a subject.
2357    ///
2358    /// # Errors
2359    ///
2360    /// Returns [`DbError`] when the subject is absent or the key is unknown.
2361    ///
2362    /// # Performance
2363    ///
2364    /// This method is `O(log change + log keys)`.
2365    pub fn unset<T: ValueType>(
2366        &mut self,
2367        subject: impl Into<PropertySubject>,
2368        key: Key<T>,
2369    ) -> Result<(), DbError> {
2370        self.remove_property(subject.into(), key.id())
2371    }
2372
2373    /// Adds a label to an element or relation subject.
2374    ///
2375    /// # Errors
2376    ///
2377    /// Returns [`DbError`] when the subject is absent, the label is unknown, or
2378    /// the subject is an incidence (incidences carry no labels).
2379    ///
2380    /// # Performance
2381    ///
2382    /// This method is `O(log change + log labels)`.
2383    pub fn add_label(
2384        &mut self,
2385        subject: impl Into<PropertySubject>,
2386        label: LabelId,
2387    ) -> Result<(), DbError> {
2388        match subject.into() {
2389            PropertySubject::Element(id) => self.add_element_label(id, label),
2390            PropertySubject::Relation(id) => self.add_relation_label(id, label),
2391            PropertySubject::Incidence(_) => {
2392                Err(DbError::unsupported("incidences do not carry labels"))
2393            }
2394        }
2395    }
2396
2397    /// Tombstones any subject by id, cascading a relation's or element's
2398    /// incidences in `O(degree)` via the reverse-adjacency index.
2399    ///
2400    /// # Errors
2401    ///
2402    /// Returns [`DbError`] when the subject is not visible.
2403    ///
2404    /// # Performance
2405    ///
2406    /// This method is `O(log change + degree)`.
2407    pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
2408        match subject.into() {
2409            PropertySubject::Element(id) => self.tombstone_element(id),
2410            PropertySubject::Relation(id) => self.tombstone_relation(id),
2411            PropertySubject::Incidence(id) => self.tombstone_incidence(id),
2412        }
2413    }
2414
2415    /// Commits this write transaction durably.
2416    ///
2417    /// A non-dirty commit returns the parent's commit sequence without appending
2418    /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
2419    /// log into one WAL frame (with the watermark op last), appends it with an
2420    /// fsync (truncating back to the captured EOF on any write error so no
2421    /// interior torn record survives), THEN folds the delta into a fresh
2422    /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
2423    ///
2424    /// After publishing, a dirty commit consults the configured
2425    /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
2426    /// re-acquire it), then folds when the delta-log has outgrown the base. The
2427    /// committed frame is already durable, so an auto-fold failure does not lose
2428    /// data; it is surfaced to the caller.
2429    ///
2430    /// # Errors
2431    ///
2432    /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
2433    /// durable append, or a triggered auto-checkpoint fold fails.
2434    ///
2435    /// # Performance
2436    ///
2437    /// This method is `O(change)` for the dirty path — flat as the base grows.
2438    /// The publish step shares the parent snapshot's already-materialized
2439    /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
2440    /// folds, so the base is byte-identical within the generation), so it neither
2441    /// re-decodes the base nor rebuilds the index. A triggered fold adds
2442    /// `O(visible state bytes)` on top.
2443    pub(crate) fn commit(self) -> Result<CommitSeq, DbError> {
2444        if self.delta.is_empty() {
2445            // Non-dirty commit: no append, no publish, no durable id advance.
2446            return Ok(self.parent.lsn());
2447        }
2448        let lsn = self
2449            .parent
2450            .lsn()
2451            .checked_next()
2452            .ok_or(DbError::CommitSeqOverflow)?;
2453        let (ops, blob) = self.delta.encode_frame();
2454        let frame = wal::encode_commit(
2455            lsn.get(),
2456            self.transaction_id.get(),
2457            self.database.base_generation,
2458            &ops,
2459            &blob,
2460        )?;
2461        let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
2462        wal::append_commit(&mut log, &frame)?;
2463
2464        // Durable: the delta was seeded from the parent overlay and only added
2465        // this writer's changes, so freezing it directly is the full new
2466        // published overlay (parent state + this commit). The parent overlay was
2467        // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
2468        // pinning the parent is unaffected.
2469        let new_overlay = Arc::new(self.delta.freeze());
2470        // A commit never folds, so the new snapshot pins the SAME base generation
2471        // as the parent — the base wire bytes are byte-identical, and so are the
2472        // owned records and the derived index built from them. Share the parent's
2473        // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
2474        // and rebuilding the index, which keeps a single-element commit `O(change)`
2475        // rather than `O(base)` regardless of how large the base has grown.
2476        let snapshot = Snapshot::with_shared_base_records(
2477            self.parent.generation(),
2478            lsn,
2479            Arc::clone(self.parent.base()),
2480            new_overlay,
2481            Arc::clone(self.parent.base_records()),
2482        );
2483        self.database.current = Arc::new(snapshot);
2484        self.database.last_transaction_id = self.transaction_id;
2485        // Release the writer lock before any auto-fold so the fold can re-acquire
2486        // it (a partial move out of `self`, legal because `Writer` has
2487        // no `Drop` impl; the remaining `&mut Db` borrow stays live).
2488        drop(self.lock);
2489        self.database.maybe_auto_checkpoint()?;
2490        Ok(lsn)
2491    }
2492
2493    /// Returns the merged read view this writer sees (overlay over base).
2494    ///
2495    /// # Performance
2496    ///
2497    /// This method is `O(1)` to construct.
2498    fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
2499        crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
2500    }
2501
2502    /// Requires an element to be visible in the writer's merged view.
2503    ///
2504    /// # Errors
2505    ///
2506    /// Returns [`DbError::UnknownElement`] when absent.
2507    ///
2508    /// # Performance
2509    ///
2510    /// This method is `O(log change + log n)`.
2511    fn require_element(&self, id: ElementId) -> Result<(), DbError> {
2512        if self.merged().contains_element(id) {
2513            Ok(())
2514        } else {
2515            Err(DbError::UnknownElement { id })
2516        }
2517    }
2518
2519    /// Requires a relation to be visible.
2520    ///
2521    /// # Errors
2522    ///
2523    /// Returns [`DbError::UnknownRelation`] when absent.
2524    ///
2525    /// # Performance
2526    ///
2527    /// This method is `O(log change + log n)`.
2528    fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
2529        if self.merged().contains_relation(id) {
2530            Ok(())
2531        } else {
2532            Err(DbError::UnknownRelation { id })
2533        }
2534    }
2535
2536    /// Requires an incidence to be visible.
2537    ///
2538    /// # Errors
2539    ///
2540    /// Returns [`DbError::UnknownIncidence`] when absent.
2541    ///
2542    /// # Performance
2543    ///
2544    /// This method is `O(log change + log n)`.
2545    fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
2546        if self.merged().contains_incidence(id) {
2547            Ok(())
2548        } else {
2549            Err(DbError::UnknownIncidence { id })
2550        }
2551    }
2552
2553    /// Requires a role to exist in the merged catalog.
2554    ///
2555    /// # Errors
2556    ///
2557    /// Returns [`DbError::UnknownRole`] when absent.
2558    ///
2559    /// # Performance
2560    ///
2561    /// This method is `O(log role count)`.
2562    fn require_role(&self, id: RoleId) -> Result<(), DbError> {
2563        if self.delta.catalog().role(id).is_some() {
2564            Ok(())
2565        } else {
2566            Err(DbError::UnknownRole { id })
2567        }
2568    }
2569
2570    /// Requires a label to exist in the merged catalog.
2571    ///
2572    /// # Errors
2573    ///
2574    /// Returns [`DbError::UnknownLabel`] when absent.
2575    ///
2576    /// # Performance
2577    ///
2578    /// This method is `O(log label count)`.
2579    fn require_label(&self, id: LabelId) -> Result<(), DbError> {
2580        if self.delta.catalog().label(id).is_some() {
2581            Ok(())
2582        } else {
2583            Err(DbError::UnknownLabel { id })
2584        }
2585    }
2586
2587    /// Requires a relation type to exist in the merged catalog.
2588    ///
2589    /// # Errors
2590    ///
2591    /// Returns [`DbError::UnknownRelationType`] when absent.
2592    ///
2593    /// # Performance
2594    ///
2595    /// This method is `O(log relation type count)`.
2596    fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
2597        if self.delta.catalog().relation_type(id).is_some() {
2598            Ok(())
2599        } else {
2600            Err(DbError::UnknownRelationType { id })
2601        }
2602    }
2603
2604    /// Requires a property subject to be visible.
2605    ///
2606    /// # Errors
2607    ///
2608    /// Returns the matching `Unknown*` error when the subject is absent.
2609    ///
2610    /// # Performance
2611    ///
2612    /// This method is `O(log change + log n)`.
2613    fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
2614        match subject {
2615            PropertySubject::Element(id) => self.require_element(id),
2616            PropertySubject::Relation(id) => self.require_relation(id),
2617            PropertySubject::Incidence(id) => self.require_incidence(id),
2618        }
2619    }
2620
2621    /// Validates one projection definition against the merged catalog.
2622    ///
2623    /// # Errors
2624    ///
2625    /// Returns [`DbError`] when a referenced role or relation type is unknown.
2626    ///
2627    /// # Performance
2628    ///
2629    /// This method is `O(definition size)`.
2630    fn validate_projection_definition(
2631        &self,
2632        definition: &ProjectionDefinition,
2633    ) -> Result<(), DbError> {
2634        match definition {
2635            ProjectionDefinition::Graph(graph) => {
2636                self.require_role(graph.source_role)?;
2637                self.require_role(graph.target_role)?;
2638                for relation_type in &graph.relation_types {
2639                    self.require_relation_type(*relation_type)?;
2640                }
2641                Ok(())
2642            }
2643            ProjectionDefinition::Hypergraph(hyper) => {
2644                for role in &hyper.source_roles {
2645                    self.require_role(*role)?;
2646                }
2647                for role in &hyper.target_roles {
2648                    self.require_role(*role)?;
2649                }
2650                for relation_type in &hyper.relation_types {
2651                    self.require_relation_type(*relation_type)?;
2652                }
2653                Ok(())
2654            }
2655        }
2656    }
2657
2658    /// Validates one index definition against the merged catalog.
2659    ///
2660    /// # Errors
2661    ///
2662    /// Returns [`DbError`] when a referenced catalog id is unknown or a
2663    /// composite index has no keys.
2664    ///
2665    /// # Performance
2666    ///
2667    /// This method is `O(definition size)`.
2668    fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
2669        let catalog = self.delta.catalog();
2670        match definition {
2671            IndexDefinition::Label { label } => self.require_label(*label),
2672            IndexDefinition::RelationType { relation_type } => {
2673                self.require_relation_type(*relation_type)
2674            }
2675            IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
2676                self.require_property_key(*key)
2677            }
2678            IndexDefinition::CompositeEquality { keys } => {
2679                if keys.is_empty() {
2680                    return Err(DbError::unsupported(
2681                        "composite equality index requires at least one key",
2682                    ));
2683                }
2684                for key in keys {
2685                    self.require_property_key(*key)?;
2686                }
2687                Ok(())
2688            }
2689            IndexDefinition::Projection { projection } => catalog
2690                .projection(*projection)
2691                .is_some()
2692                .then_some(())
2693                .ok_or(DbError::UnknownProjection { id: *projection }),
2694        }
2695    }
2696
2697    /// Requires a property key to exist in the merged catalog.
2698    ///
2699    /// # Errors
2700    ///
2701    /// Returns [`DbError::UnknownPropertyKey`] when absent.
2702    ///
2703    /// # Performance
2704    ///
2705    /// This method is `O(log property key count)`.
2706    fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
2707        if self.delta.catalog().property_key(id).is_some() {
2708            Ok(())
2709        } else {
2710            Err(DbError::UnknownPropertyKey { id })
2711        }
2712    }
2713}
2714
2715#[cfg(test)]
2716#[cfg(not(miri))]
2717mod tests {
2718    use std::{
2719        path::PathBuf,
2720        sync::atomic::{AtomicU64, Ordering},
2721    };
2722
2723    use super::*;
2724
2725    /// Per-process path counter for unique temporary store directories.
2726    static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
2727
2728    /// Returns a unique temporary store path and removes any prior contents.
2729    fn temp_store(name: &str) -> PathBuf {
2730        let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
2731        let path =
2732            std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
2733        let _ = std::fs::remove_dir_all(&path);
2734        path
2735    }
2736
2737    /// Manual measurement harness (run with
2738    /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2739    /// --nocapture`): builds a folded base at roughly the measured-problem scale
2740    /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2741    /// Open must be dominated by the record decode + page faults, NOT the
2742    /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2743    /// Number of element/relation records the open-latency harness builds and the
2744    /// number of timed open runs it averages.
2745    #[cfg(not(debug_assertions))]
2746    const OPEN_LATENCY_ELEMENTS: usize = 100_000;
2747    /// Relations the open-latency harness builds (each with two incidences).
2748    #[cfg(not(debug_assertions))]
2749    const OPEN_LATENCY_RELATIONS: usize = 320_000;
2750    /// Timed open runs the open-latency harness averages.
2751    #[cfg(not(debug_assertions))]
2752    const OPEN_LATENCY_RUNS: u32 = 5;
2753
2754    /// Populates `database` with `OPEN_LATENCY_ELEMENTS` ranked elements and
2755    /// `OPEN_LATENCY_RELATIONS` weighted relations (two incidences each).
2756    #[cfg(not(debug_assertions))]
2757    fn populate_large_store(database: &mut Db) {
2758        database.set_checkpoint_policy(CheckpointPolicy::Manual);
2759        database
2760            .write(|writer| {
2761                let rank = writer.register_property_key(
2762                    "rank",
2763                    PropertyFamily::Element,
2764                    PropertyType::Integer,
2765                )?;
2766                let weight = writer.register_property_key(
2767                    "weight",
2768                    PropertyFamily::Relation,
2769                    PropertyType::Integer,
2770                )?;
2771                let role = writer.register_role("party")?;
2772                let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
2773                for index in 0..OPEN_LATENCY_ELEMENTS {
2774                    let element = writer.create_element()?;
2775                    writer.set_property(
2776                        PropertySubject::Element(element),
2777                        rank,
2778                        PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
2779                    )?;
2780                    elements.push(element);
2781                }
2782                for index in 0..OPEN_LATENCY_RELATIONS {
2783                    let relation = writer.create_relation()?;
2784                    writer.set_property(
2785                        PropertySubject::Relation(relation),
2786                        weight,
2787                        PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
2788                    )?;
2789                    let source = elements[index % OPEN_LATENCY_ELEMENTS];
2790                    let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
2791                    writer.create_incidence(relation, source, role)?;
2792                    writer.create_incidence(relation, target, role)?;
2793                }
2794                Ok(())
2795            })
2796            .expect("populate");
2797        // Fold everything into the base so open pays the base term, not log replay.
2798        database.compact().expect("compact");
2799    }
2800
2801    /// Mean elapsed time of `OPEN_LATENCY_RUNS` full `Db::open` calls on `path`.
2802    #[cfg(not(debug_assertions))]
2803    fn mean_open_ms(path: &std::path::Path) -> f64 {
2804        let mut total = std::time::Duration::ZERO;
2805        for _run in 0..OPEN_LATENCY_RUNS {
2806            let start = std::time::Instant::now();
2807            let opened = Db::open(path).expect("timed open");
2808            total += start.elapsed();
2809            drop(opened);
2810        }
2811        total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2812    }
2813
2814    /// Mean elapsed time of the prior design's open-time heavy work — record
2815    /// decode + `from_records` index rebuild (`BaseRecords::from_view`) — over
2816    /// `OPEN_LATENCY_RUNS` runs, the BEFORE proxy for the borrowed open.
2817    #[cfg(not(debug_assertions))]
2818    fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
2819        let superblock = wal::read_superblock(path).expect("superblock");
2820        let base_path = path.join(base_file(superblock.base_generation.get()));
2821        let mut total = std::time::Duration::ZERO;
2822        for _run in 0..OPEN_LATENCY_RUNS {
2823            let base = Base::open(&base_path, false).expect("base open");
2824            let start = std::time::Instant::now();
2825            let records =
2826                crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
2827            total += start.elapsed();
2828            drop(records);
2829            drop(base);
2830        }
2831        total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2832    }
2833
2834    /// Manual measurement harness (run with
2835    /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2836    /// --nocapture`): builds a folded base at roughly the measured-problem scale
2837    /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2838    /// Open must be dominated by the record decode + page faults, NOT the
2839    /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2840    /// Debug builds skip it (the open-time `debug_assert!` differential check
2841    /// would itself rebuild the index and skew the timing); run in `--release`.
2842    #[test]
2843    #[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
2844    #[cfg(not(debug_assertions))]
2845    fn open_latency_large_base() {
2846        let path = temp_store("open-latency");
2847        let mut database = Db::create(&path).expect("create");
2848        populate_large_store(&mut database);
2849        drop(database);
2850
2851        let _warm = Db::open(&path).expect("warm open");
2852        let after_ms = mean_open_ms(&path);
2853        let before_ms = mean_old_from_view_ms(&path);
2854
2855        println!(
2856            "open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
2857             {OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
2858            OPEN_LATENCY_RELATIONS * 2,
2859            OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
2860        );
2861        println!("  BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
2862        println!("  AFTER  full Db::open (decode + BORROWED index):   {after_ms:.1} ms / open");
2863
2864        let _ = std::fs::remove_dir_all(&path);
2865    }
2866
2867    #[test]
2868    fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
2869        let path = temp_store("reconcile");
2870        let mut database = Db::create(&path).expect("create");
2871        let index = {
2872            let mut writer = database.begin_write().expect("begin write");
2873            let key = writer
2874                .register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
2875                .expect("key");
2876            let index = writer
2877                .define_index(
2878                    "element_stable_key_eq",
2879                    IndexDefinition::PropertyEquality { key },
2880                )
2881                .expect("index");
2882            writer.commit().expect("commit schema");
2883            index
2884        };
2885        let eq = EqualityIndex::<crate::Text>::from_id(index);
2886
2887        let (a1, b1) = {
2888            let mut writer = database.begin_write().expect("begin write");
2889            let a = writer.upsert_element(eq, "a").expect("upsert a");
2890            let b = writer.upsert_element(eq, "b").expect("upsert b");
2891            writer.commit().expect("commit");
2892            (a, b)
2893        };
2894
2895        let (a2, c1) = {
2896            let mut writer = database.begin_write().expect("begin write");
2897            let a = writer.upsert_element(eq, "a").expect("re-upsert a");
2898            let c = writer.upsert_element(eq, "c").expect("upsert c");
2899            writer.retain(eq, &["a", "c"]).expect("retain");
2900            writer.commit().expect("commit");
2901            (a, c)
2902        };
2903
2904        assert_eq!(a1, a2, "an unchanged identity reuses its element id");
2905        assert_ne!(c1, a1);
2906        assert_ne!(c1, b1);
2907
2908        let read = database.reader();
2909        assert!(read.contains_element(a1), "kept a");
2910        assert!(read.contains_element(c1), "kept c");
2911        assert!(!read.contains_element(b1), "retain tombstoned b");
2912        assert_eq!(
2913            read.element_by_key(eq, "a")
2914                .expect("lookup a")
2915                .map(|element| element.id),
2916            Some(a1)
2917        );
2918        assert!(
2919            read.element_by_key(eq, "b").expect("lookup b").is_none(),
2920            "b is not resolvable after the prune"
2921        );
2922        let _ = std::fs::remove_dir_all(&path);
2923    }
2924
2925    #[test]
2926    fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
2927        let path = temp_store("write-closure");
2928        let mut database = Db::create(&path).expect("create");
2929
2930        // Ok with a change → committed; the read closure observes it.
2931        let (id, outcome) = database
2932            .write(|writer| {
2933                let id = writer.create_element()?;
2934                Ok(id)
2935            })
2936            .expect("write");
2937        assert!(matches!(outcome, CommitOutcome::Committed(_)));
2938        database
2939            .read(|read| {
2940                assert!(read.contains_element(id));
2941                Ok(())
2942            })
2943            .expect("read");
2944
2945        // A no-op write reports Empty (no frame appended).
2946        let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
2947        assert_eq!(outcome, CommitOutcome::Empty);
2948
2949        // An Err from the closure rolls back the staged delta.
2950        let before = database
2951            .read(|read| Ok(read.element_count()))
2952            .expect("count");
2953        let result = database.write(|writer| {
2954            writer.create_element()?;
2955            Err::<(), DbError>(DbError::EmptyQuery)
2956        });
2957        assert!(result.is_err());
2958        let after = database
2959            .read(|read| Ok(read.element_count()))
2960            .expect("count");
2961        assert_eq!(before, after, "the failed write staged nothing durable");
2962
2963        let _ = std::fs::remove_dir_all(&path);
2964    }
2965
2966    #[test]
2967    fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
2968        // The reconcile/reindex contract: re-asserting a property's existing value
2969        // must log NO mutation, so an incremental reconcile that re-sets every
2970        // property of every unchanged subject stays O(change). Without the no-op
2971        // gate the commit logs the whole graph every reindex.
2972        let path = temp_store("set-noop");
2973        let mut database = Db::create(&path).expect("create");
2974        let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
2975
2976        // Create an element and set its name (a real change → committed).
2977        let id = database
2978            .write(|writer| {
2979                let bound = writer.apply_schema(&schema)?;
2980                let name = bound.key::<crate::Text>("name")?;
2981                let id = writer.create_element()?;
2982                writer.set(id, name, "alpha")?;
2983                Ok(id)
2984            })
2985            .expect("first write")
2986            .0;
2987
2988        // Re-asserting the SAME value mutates nothing → the commit is Empty.
2989        let ((), outcome) = database
2990            .write(|writer| {
2991                let bound = writer.apply_schema(&schema)?;
2992                let name = bound.key::<crate::Text>("name")?;
2993                writer.set(id, name, "alpha")?;
2994                Ok(())
2995            })
2996            .expect("idempotent set");
2997        assert_eq!(
2998            outcome,
2999            CommitOutcome::Empty,
3000            "re-setting the same property value must log no mutation"
3001        );
3002
3003        // Setting a DIFFERENT value is a real change → committed, and visible.
3004        let ((), outcome) = database
3005            .write(|writer| {
3006                let bound = writer.apply_schema(&schema)?;
3007                let name = bound.key::<crate::Text>("name")?;
3008                writer.set(id, name, "beta")?;
3009                Ok(())
3010            })
3011            .expect("changed set");
3012        assert!(matches!(outcome, CommitOutcome::Committed(_)));
3013        let name = database
3014            .bind(&schema)
3015            .expect("bind")
3016            .key::<crate::Text>("name")
3017            .expect("name key");
3018        let value = database
3019            .read(|read| {
3020                Ok(read
3021                    .element(id)
3022                    .and_then(|element| element.properties().get::<crate::Text, String>(name)))
3023            })
3024            .expect("read");
3025        assert_eq!(value.as_deref(), Some("beta"));
3026
3027        let _ = std::fs::remove_dir_all(&path);
3028    }
3029
3030    #[test]
3031    fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
3032        let path = temp_store("schema");
3033        let mut database = Db::create(&path).expect("create");
3034        let schema = Schema::new()
3035            .label("function")
3036            .key::<crate::Text>("name", PropertyFamily::Element)
3037            .equality_index("name_eq", "name");
3038
3039        // First apply registers the catalog and upserts two elements by identity.
3040        let (alpha, beta) = database
3041            .write(|writer| {
3042                let bound = writer.apply_schema(&schema)?;
3043                let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
3044                let function = bound.label("function")?;
3045                let alpha = writer.upsert_element(name_eq, "alpha")?;
3046                writer.add_label(alpha, function)?;
3047                let beta = writer.upsert_element(name_eq, "beta")?;
3048                Ok((alpha, beta))
3049            })
3050            .expect("apply + write")
3051            .0;
3052        assert_ne!(alpha, beta);
3053
3054        // Re-applying the same schema is idempotent: nothing new registers, so the
3055        // commit is empty.
3056        let (_bound, outcome) = database
3057            .write(|writer| writer.apply_schema(&schema))
3058            .expect("re-apply");
3059        assert_eq!(
3060            outcome,
3061            CommitOutcome::Empty,
3062            "re-applying a schema registers nothing new"
3063        );
3064
3065        // bind() resolves the schema read-only on a reopened store; the typed
3066        // handle round-trips, and a wrong value type is rejected.
3067        let reopened = Db::open(&path).expect("open");
3068        let bound = reopened.bind(&schema).expect("bind");
3069        let name_eq = bound
3070            .equality_index::<crate::Text>("name_eq")
3071            .expect("typed index");
3072        assert!(
3073            bound.equality_index::<crate::Int>("name_eq").is_err(),
3074            "a wrong-value-type handle request is a SchemaConflict"
3075        );
3076        let found = reopened
3077            .read(|read| read.element_by_key(name_eq, "alpha"))
3078            .expect("read")
3079            .expect("alpha present");
3080        assert_eq!(found.id, alpha);
3081
3082        let _ = std::fs::remove_dir_all(&path);
3083    }
3084
3085    /// The exact logical state the crash-matrix asserts recovery preserves: the
3086    /// visible element ids, the rank-keyed property values, and the `Person`
3087    /// label membership.
3088    #[derive(Debug, Eq, PartialEq)]
3089    struct LogicalState {
3090        /// Visible element ids in ascending order.
3091        elements: Vec<ElementId>,
3092        /// Subjects whose `rank` equals each probed value, by value.
3093        rank_eq_500: Vec<PropertySubject>,
3094        /// Element ids carrying the `Person` label.
3095        person_members: Vec<ElementId>,
3096    }
3097
3098    /// Catalog/topology fixture ids returned by [`build_fixture`].
3099    struct Fixture {
3100        /// `rank` integer property key.
3101        rank: PropertyKeyId,
3102        /// `Person` label.
3103        person: LabelId,
3104    }
3105
3106    /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
3107    /// even-indexed ones labelled `Person`. Returns the fixture ids.
3108    fn build_fixture(database: &mut Db) -> Fixture {
3109        let mut writer = database.begin_write().expect("begin write");
3110        let rank = writer
3111            .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
3112            .expect("rank key");
3113        let person = writer.register_label("Person").expect("person label");
3114        for index in 0..8u64 {
3115            let element = writer.create_element().expect("element");
3116            writer
3117                .set(
3118                    element,
3119                    Key::<crate::Int>::from_id(rank),
3120                    i64::try_from(index).expect("index") * 100,
3121                )
3122                .expect("set rank");
3123            if index % 2 == 0 {
3124                writer.add_label(element, person).expect("add label");
3125            }
3126        }
3127        writer.commit().expect("commit fixture");
3128        Fixture { rank, person }
3129    }
3130
3131    /// Reads the logical state through the index-backed read surface.
3132    fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
3133        let read = database.reader();
3134        let elements = read.element_ids();
3135        let rank_eq_500 = read
3136            .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
3137            .expect("rank lookup");
3138        let person_members = read.snapshot.view().elements_with_label(fixture.person);
3139        LogicalState {
3140            elements,
3141            rank_eq_500,
3142            person_members,
3143        }
3144    }
3145
3146    /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
3147    /// `database` mints must take the id one past the current maximum visible
3148    /// element id, i.e. the recovered watermark survived the fold. A regression
3149    /// that dropped the watermark on fold (so the recovered record set is
3150    /// unchanged but the next-id counter reset) would reuse an existing id and
3151    /// fail this assertion — which the unchanged-record-set checks alone miss.
3152    ///
3153    /// The probe element is rolled back, so it does not perturb the logical state
3154    /// the surrounding test re-reads.
3155    fn assert_no_id_reuse_across_fold(database: &mut Db) {
3156        let max_existing = database
3157            .reader()
3158            .element_ids()
3159            .into_iter()
3160            .map(ElementId::get)
3161            .max()
3162            .unwrap_or(0);
3163        let expected = ElementId::new(max_existing + 1);
3164        let mut writer = database.begin_write().expect("watermark probe writer");
3165        let minted = writer.create_element().expect("watermark probe element");
3166        assert_eq!(
3167            minted, expected,
3168            "the next minted id must be one past the max existing id (watermark \
3169             survived the fold; ids are never reused)",
3170        );
3171        // Drop the probe writer so it leaves no trace in the logical state.
3172        drop(writer);
3173    }
3174
3175    /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
3176    /// recovers EXACTLY the correct logical state. After a crash before the
3177    /// superblock lands, the OLD generation stays authoritative (the orphan new
3178    /// base is ignored); after a crash once the superblock names the new
3179    /// generation, the NEW base is authoritative. The completed checkpoint
3180    /// recovers the same logical state from the folded base. In every case the
3181    /// index-backed lookups return the same answers as before the (attempted)
3182    /// fold.
3183    #[test]
3184    fn checkpoint_crash_matrix_recovers_exact_state() {
3185        for stop in [
3186            CheckpointStop::BeforeSuperblock,
3187            CheckpointStop::BeforeRotate,
3188            CheckpointStop::Complete,
3189        ] {
3190            let path = temp_store(&format!("crash-{stop:?}"));
3191            let mut database = Db::create(&path).expect("create");
3192            let fixture = build_fixture(&mut database);
3193            let before = read_logical(&database, &fixture);
3194            let before_generation = database.base_generation;
3195
3196            // Simulate a crash at `stop`: the checkpoint returns right after the
3197            // chosen fsync, leaving the intermediate files in place. We then drop
3198            // the handle (as a crash would) and reopen from disk.
3199            database
3200                .checkpoint_inner(stop)
3201                .expect("checkpoint stop returns ok");
3202            drop(database);
3203
3204            let mut recovered = Db::open(&path).expect("reopen after crash");
3205            let after = read_logical(&recovered, &fixture);
3206            assert_eq!(
3207                after, before,
3208                "crash at {stop:?} must recover the exact logical state",
3209            );
3210
3211            // The recovered watermark survives every crash window: the next minted
3212            // id is one past the max recovered element id, so ids are never reused
3213            // across the (attempted) fold — asserted behaviorally, not merely
3214            // inferred from the unchanged record set.
3215            assert_no_id_reuse_across_fold(&mut recovered);
3216
3217            // Generation expectation per crash window.
3218            match stop {
3219                CheckpointStop::BeforeSuperblock => assert_eq!(
3220                    recovered.base_generation, before_generation,
3221                    "old superblock stays authoritative before the new one lands",
3222                ),
3223                CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
3224                    recovered.base_generation,
3225                    before_generation + 1,
3226                    "the new superblock names the folded generation",
3227                ),
3228            }
3229
3230            // A second open is idempotent (orphan files from a partial crash do
3231            // not derail a repeat recovery).
3232            let reopened = Db::open(&path).expect("second reopen");
3233            assert_eq!(read_logical(&reopened, &fixture), before);
3234
3235            drop(reopened);
3236            let _ = std::fs::remove_dir_all(&path);
3237        }
3238    }
3239
3240    /// The auto-checkpoint policy folds the delta-log into a fresh base once the
3241    /// log outgrows the base by the configured factor: under a tiny factor, a
3242    /// run of dirty commits advances the live generation (the log was folded),
3243    /// and the logical state is preserved across the fold. The manual policy
3244    /// never auto-folds.
3245    #[test]
3246    fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
3247        // Manual policy: many commits, generation never advances on its own.
3248        let manual_path = temp_store("auto-manual");
3249        let mut manual = Db::create(&manual_path).expect("create manual");
3250        manual.set_checkpoint_policy(CheckpointPolicy::Manual);
3251        let _fixture = build_fixture(&mut manual);
3252        for _ in 0..200 {
3253            let mut writer = manual.begin_write().expect("writer");
3254            writer.create_element().expect("element");
3255            writer.commit().expect("commit");
3256        }
3257        assert_eq!(
3258            manual.live_generation(),
3259            CheckpointGeneration::new(0),
3260            "manual policy must never auto-fold",
3261        );
3262        drop(manual);
3263        let _ = std::fs::remove_dir_all(&manual_path);
3264
3265        // Size-ratio policy with the smallest factor: the log soon outgrows the
3266        // tiny base floor, so a run of commits triggers at least one fold.
3267        let auto_path = temp_store("auto-ratio");
3268        let mut auto = Db::create(&auto_path).expect("create auto");
3269        auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
3270        let fixture = build_fixture(&mut auto);
3271        let before = read_logical(&auto, &fixture);
3272        for _ in 0..400 {
3273            let mut writer = auto.begin_write().expect("writer");
3274            writer.create_element().expect("element");
3275            writer.commit().expect("commit");
3276        }
3277        assert!(
3278            auto.live_generation() > CheckpointGeneration::new(0),
3279            "size-ratio policy must auto-fold once the log outgrows the base",
3280        );
3281        // The pre-existing logical state survives every fold; the policy is also
3282        // surfaced in status and preserved across the fold.
3283        let after = read_logical(&auto, &fixture);
3284        assert_eq!(after.rank_eq_500, before.rank_eq_500);
3285        assert_eq!(after.person_members, before.person_members);
3286        // Ids are never reused across the auto-fold: the next minted id is one
3287        // past the max existing id (the watermark folded into the new base).
3288        assert_no_id_reuse_across_fold(&mut auto);
3289        assert_eq!(
3290            auto.checkpoint_policy(),
3291            CheckpointPolicy::SizeRatio { factor: 1 },
3292            "the auto-fold reopen must preserve the configured policy",
3293        );
3294        // Status surfaces the live generation and the (now small) log size.
3295        let status = auto.stats();
3296        assert_eq!(status.live_generation, auto.live_generation());
3297        assert!(status.base_byte_size > 0, "live base has bytes");
3298        drop(auto);
3299        let _ = std::fs::remove_dir_all(&auto_path);
3300    }
3301}