Skip to main content

oxgraph_db/database/
mod.rs

1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{path::PathBuf, sync::Arc};
13
14use crate::{
15    Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, PreparedQuery, PropertyValue, Schema,
16    TransactionId,
17    lock::WriterLock,
18    overlay::{Snapshot, StateView, WriteOverlay},
19};
20
21mod maintenance;
22mod open;
23mod reader;
24mod writer;
25
26#[cfg(test)]
27#[cfg(not(miri))]
28mod tests;
29
30pub use maintenance::CheckpointPolicy;
31use open::{base_file, delta_file, file_len};
32pub use reader::{ReadPin, Reader};
33pub use writer::Writer;
34
35/// Lookup input for a cataloged index.
36///
37/// This type makes index lookup shape explicit: membership indexes accept
38/// [`IndexProbe::All`], single-property indexes accept scalar equality or
39/// range inputs, and composite equality indexes accept an ordered value tuple.
40///
41/// # Performance
42///
43/// Copying this value is `O(1)`.
44#[derive(Clone, Copy, Debug)]
45pub enum IndexProbe<'value> {
46    /// Lookup every subject represented by a membership-style index.
47    All,
48    /// Lookup one scalar equality value.
49    Equal(&'value PropertyValue),
50    /// Lookup one inclusive scalar range.
51    Range {
52        /// Inclusive lower bound.
53        min: &'value PropertyValue,
54        /// Inclusive upper bound.
55        max: &'value PropertyValue,
56    },
57    /// Lookup one ordered composite equality tuple.
58    Composite(&'value [PropertyValue]),
59}
60
61/// The durable result of a [`Db::write`]: whether a frame landed, and at which
62/// commit sequence.
63///
64/// # Performance
65///
66/// Copying this value is `O(1)`.
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68#[non_exhaustive]
69pub enum CommitOutcome {
70    /// The transaction made no changes; no WAL frame was appended.
71    Empty,
72    /// A durable frame landed at this commit sequence.
73    Committed(CommitSeq),
74}
75
76/// Open OXGDB database handle.
77///
78/// # Performance
79///
80/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
81/// delta-log handle.
82pub struct Db {
83    /// Root database directory.
84    pub(super) root: PathBuf,
85    /// The current visible snapshot (base generation + published overlay),
86    /// shared by readers through an atomically reference-counted handle.
87    pub(super) current: Arc<Snapshot>,
88    /// Live base generation named by the superblock; every delta frame and the
89    /// per-generation log filename carry it.
90    pub(super) base_generation: u64,
91    /// Last writer transaction id durably recorded (the last dirty commit's id).
92    /// A rollback burns a session-local id above this but does not advance it.
93    pub(super) last_transaction_id: TransactionId,
94    /// Auto-checkpoint policy consulted after each dirty commit.
95    pub(super) checkpoint_policy: CheckpointPolicy,
96}
97
98impl Db {
99    /// Returns the live base generation named by the superblock (the count of
100    /// folds this store has undergone; gen-0 is the freshly created store).
101    ///
102    /// # Performance
103    ///
104    /// This method is `O(1)`.
105    #[must_use]
106    pub const fn live_generation(&self) -> CheckpointGeneration {
107        CheckpointGeneration::new(self.base_generation)
108    }
109
110    /// Returns operational status for this handle, including the live generation
111    /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
112    /// weighs.
113    ///
114    /// # Performance
115    ///
116    /// This method is `O(visible state)` for the merged counts plus two `stat`
117    /// syscalls for the file sizes.
118    #[must_use]
119    pub fn stats(&self) -> Stats {
120        let view = self.current.view();
121        Stats {
122            visible_commit_seq: self.current.lsn(),
123            last_transaction_id: self.last_transaction_id,
124            live_generation: CheckpointGeneration::new(self.base_generation),
125            base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
126            log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
127            element_count: view.element_count(),
128            relation_count: view.relation_count(),
129            incidence_count: view.incidence_count(),
130            catalog: self.catalog_summary(),
131        }
132    }
133
134    /// Returns a catalog-size summary.
135    ///
136    /// # Performance
137    ///
138    /// This method is `O(catalog entry count)`.
139    #[must_use]
140    pub fn catalog_summary(&self) -> CatalogSummary {
141        CatalogSummary::from_catalog(self.current.view().catalog())
142    }
143
144    /// Starts a read transaction pinned to the current visible snapshot.
145    ///
146    /// # Performance
147    ///
148    /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
149    /// observes a fixed state even across later commits and checkpoints.
150    #[must_use]
151    pub fn reader(&self) -> Reader {
152        Reader {
153            snapshot: Arc::clone(&self.current),
154        }
155    }
156
157    /// Returns the pin identifying the current visible snapshot — the commit
158    /// sequence and checkpoint generation a [`Self::reader`] started now would
159    /// observe — without starting a read transaction.
160    ///
161    /// # Performance
162    ///
163    /// This method is `O(1)`: it copies the current snapshot's identity fields.
164    #[must_use]
165    pub fn pin(&self) -> ReadPin {
166        ReadPin {
167            visible_commit_seq: self.current.lsn(),
168            generation: self.current.generation(),
169        }
170    }
171
172    /// Starts the single writer transaction, acquiring the cross-process writer
173    /// lock for the transaction's lifetime.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`DbError::Txn(crate::error::TxnError::WriterLockHeld)`] when another writer holds
178    /// the lock or [`DbError::Txn(crate::error::TxnError::TransactionIdOverflow)`] when writer
179    /// ids are exhausted.
180    ///
181    /// # Performance
182    ///
183    /// This method is `O(parent change)` map entries with `O(1)` per entry:
184    /// the writer seeds from the parent's published overlay by cloning its
185    /// delta map structure, while label sets, text values, per-subject
186    /// property delta maps, and index postings are `Arc`-shared copy-on-write
187    /// — it scales with the committed-but-unfolded change count (not the base
188    /// size, and not the payload bytes).
189    pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
190        let lock = WriterLock::acquire(&self.root)?;
191        let transaction_id = self
192            .last_transaction_id
193            .checked_next()
194            .ok_or(DbError::Txn(crate::error::TxnError::TransactionIdOverflow))?;
195        // Burn the id eagerly so it is session-local-visible even on rollback;
196        // it only becomes durable when a dirty commit writes its frame, and a
197        // reopen recovers the durable high-water mark from the log.
198        self.last_transaction_id = transaction_id;
199        let parent = Arc::clone(&self.current);
200        // Seed the writer delta from the parent's published overlay so the
201        // writer reads every committed record; the parent overlay is never
202        // mutated (the seed clones its maps).
203        let delta = WriteOverlay::from_overlay(parent.overlay());
204        Ok(Writer {
205            database: self,
206            parent,
207            delta,
208            transaction_id,
209            lock,
210        })
211    }
212
213    /// Runs `f` against a read transaction pinned to the current snapshot. The
214    /// primary read entry point.
215    ///
216    /// # Errors
217    ///
218    /// Propagates whatever error `f` returns.
219    ///
220    /// # Performance
221    ///
222    /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
223    pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
224        f(&self.reader())
225    }
226
227    /// Runs `f` against the single write transaction, committing on `Ok` and
228    /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
229    /// value with the [`CommitOutcome`] (whether a durable frame landed). The
230    /// primary write entry point.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`DbError::Txn(crate::error::TxnError::WriterLockHeld)`] when another writer holds
235    /// the lock, `f`'s error (after rolling back the staged delta), or a commit error.
236    ///
237    /// # Performance
238    ///
239    /// Begin is `O(parent change)` — the writer seeds by cloning the parent
240    /// overlay's delta map structure (label sets, text values, per-subject
241    /// property delta maps, and index postings are `Arc`-shared, so each of
242    /// the `N` committed-but-unfolded entries costs `O(1)`; folded away by a
243    /// checkpoint). Commit is `O(change)`. A triggered auto-fold adds
244    /// `O(visible bytes)`.
245    pub fn write<R>(
246        &mut self,
247        f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
248    ) -> Result<(R, CommitOutcome), DbError> {
249        let mut writer = self.begin_write()?;
250        // On `Err` the `?` drops `writer` here, releasing the lock and discarding
251        // the staged delta — no frame is appended (rollback).
252        let value = f(&mut writer)?;
253        let committed = !writer.delta.is_empty();
254        let lsn = writer.commit()?;
255        let outcome = if committed {
256            CommitOutcome::Committed(lsn)
257        } else {
258            CommitOutcome::Empty
259        };
260        Ok((value, outcome))
261    }
262
263    /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
264    /// writing, returning the [`Bound`] handle bag (for a store already
265    /// bootstrapped with this schema).
266    ///
267    /// # Errors
268    ///
269    /// Returns [`DbError::UnknownName`] when a declared item is absent.
270    ///
271    /// # Performance
272    ///
273    /// This method is `O(declared items × log catalog)`.
274    pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
275        let view = self.current.view();
276        let catalog = view.catalog();
277        let mut bound = Bound::default();
278        for name in &schema.roles {
279            let id = catalog.role_id(name).ok_or_else(|| {
280                DbError::Catalog(crate::error::CatalogError::UnknownName {
281                    kind: "role",
282                    name: name.clone(),
283                })
284            })?;
285            bound.roles.insert(name.clone(), id);
286        }
287        for name in &schema.labels {
288            let id = catalog.label_id(name).ok_or_else(|| {
289                DbError::Catalog(crate::error::CatalogError::UnknownName {
290                    kind: "label",
291                    name: name.clone(),
292                })
293            })?;
294            bound.labels.insert(name.clone(), id);
295        }
296        for name in &schema.relation_types {
297            let id = catalog.relation_type_id(name).ok_or_else(|| {
298                DbError::Catalog(crate::error::CatalogError::UnknownName {
299                    kind: "relation type",
300                    name: name.clone(),
301                })
302            })?;
303            bound.relation_types.insert(name.clone(), id);
304        }
305        for (name, _family, value_type) in &schema.keys {
306            let id = catalog.property_key_id(name).ok_or_else(|| {
307                DbError::Catalog(crate::error::CatalogError::UnknownName {
308                    kind: "property key",
309                    name: name.clone(),
310                })
311            })?;
312            bound.keys.insert(name.clone(), (id, *value_type));
313        }
314        for (name, key_name) in &schema.equality_indexes {
315            let (_key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
316                DbError::Catalog(crate::error::CatalogError::UnknownName {
317                    kind: "property key",
318                    name: key_name.clone(),
319                })
320            })?;
321            let id = catalog.index_id(name).ok_or_else(|| {
322                DbError::Catalog(crate::error::CatalogError::UnknownName {
323                    kind: "index",
324                    name: name.clone(),
325                })
326            })?;
327            bound
328                .equality_indexes
329                .insert(name.clone(), (id, value_type));
330        }
331        for spec in &schema.graph_projections {
332            let id = catalog.projection_id(&spec.name).ok_or_else(|| {
333                DbError::Catalog(crate::error::CatalogError::UnknownName {
334                    kind: "projection",
335                    name: spec.name.clone(),
336                })
337            })?;
338            bound.projections.insert(spec.name.clone(), id);
339        }
340        Ok(bound)
341    }
342
343    /// Prepares a query against the current catalog.
344    ///
345    /// # Errors
346    ///
347    /// Returns [`DbError`] when parsing or semantic analysis fails.
348    ///
349    /// # Performance
350    ///
351    /// This method is `O(query length + catalog lookup cost)`.
352    pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
353        PreparedQuery::prepare(query, &self.current.view())
354    }
355}
356
357/// Snapshot of database status.
358///
359/// # Performance
360///
361/// Copying and comparing status is `O(1)`.
362#[derive(Clone, Copy, Debug, Eq, PartialEq)]
363pub struct Stats {
364    /// Last visible commit sequence.
365    pub visible_commit_seq: CommitSeq,
366    /// Last writer transaction ID burned by this handle.
367    ///
368    /// This value is durable after a dirty commit and session-local after
369    /// rollback.
370    pub last_transaction_id: TransactionId,
371    /// Live base generation named by the superblock (the count of folds this
372    /// store has undergone; gen-0 is the freshly created store).
373    pub live_generation: CheckpointGeneration,
374    /// On-disk byte size of the live base file.
375    pub base_byte_size: u64,
376    /// On-disk byte size of the live delta-log (the tail recovery replays and
377    /// the auto-checkpoint policy weighs against the base size).
378    pub log_byte_size: u64,
379    /// Visible element count.
380    pub element_count: usize,
381    /// Visible relation count.
382    pub relation_count: usize,
383    /// Visible incidence count.
384    pub incidence_count: usize,
385    /// Catalog-size summary.
386    pub catalog: CatalogSummary,
387}
388
389/// Catalog-size summary.
390///
391/// # Performance
392///
393/// Copying and comparing are `O(1)`.
394#[derive(Clone, Copy, Debug, Eq, PartialEq)]
395pub struct CatalogSummary {
396    /// Role count.
397    pub role_count: usize,
398    /// Label count.
399    pub label_count: usize,
400    /// Relation type count.
401    pub relation_type_count: usize,
402    /// Property key count.
403    pub property_key_count: usize,
404    /// Projection count.
405    pub projection_count: usize,
406    /// Index count.
407    pub index_count: usize,
408}
409
410impl CatalogSummary {
411    /// Builds a summary from a catalog.
412    ///
413    /// # Performance
414    ///
415    /// This function is `O(catalog entry count)`.
416    #[must_use]
417    pub fn from_catalog(catalog: &Catalog) -> Self {
418        Self {
419            role_count: catalog.roles().count(),
420            label_count: catalog.labels().count(),
421            relation_type_count: catalog.relation_types().count(),
422            property_key_count: catalog.property_keys().count(),
423            projection_count: catalog.projections().count(),
424            index_count: catalog.indexes().count(),
425        }
426    }
427}