oxgraph_db/database/mod.rs
1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{path::PathBuf, sync::Arc};
13
14use crate::{
15 Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, PreparedQuery, PropertyValue, Schema,
16 TransactionId,
17 lock::WriterLock,
18 overlay::{Snapshot, StateView, WriteOverlay},
19};
20
21mod maintenance;
22mod open;
23mod reader;
24mod writer;
25
26#[cfg(test)]
27#[cfg(not(miri))]
28mod tests;
29
30pub use maintenance::CheckpointPolicy;
31use open::{base_file, delta_file, file_len};
32pub use reader::{ReadPin, Reader};
33pub use writer::Writer;
34
35/// Lookup input for a cataloged index.
36///
37/// This type makes index lookup shape explicit: membership indexes accept
38/// [`IndexProbe::All`], single-property indexes accept scalar equality or
39/// range inputs, and composite equality indexes accept an ordered value tuple.
40///
41/// # Performance
42///
43/// Copying this value is `O(1)`.
44#[derive(Clone, Copy, Debug)]
45pub enum IndexProbe<'value> {
46 /// Lookup every subject represented by a membership-style index.
47 All,
48 /// Lookup one scalar equality value.
49 Equal(&'value PropertyValue),
50 /// Lookup one inclusive scalar range.
51 Range {
52 /// Inclusive lower bound.
53 min: &'value PropertyValue,
54 /// Inclusive upper bound.
55 max: &'value PropertyValue,
56 },
57 /// Lookup one ordered composite equality tuple.
58 Composite(&'value [PropertyValue]),
59}
60
61/// The durable result of a [`Db::write`]: whether a frame landed, and at which
62/// commit sequence.
63///
64/// # Performance
65///
66/// Copying this value is `O(1)`.
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68#[non_exhaustive]
69pub enum CommitOutcome {
70 /// The transaction made no changes; no WAL frame was appended.
71 Empty,
72 /// A durable frame landed at this commit sequence.
73 Committed(CommitSeq),
74}
75
76/// Open OXGDB database handle.
77///
78/// # Performance
79///
80/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
81/// delta-log handle.
82pub struct Db {
83 /// Root database directory.
84 pub(super) root: PathBuf,
85 /// The current visible snapshot (base generation + published overlay),
86 /// shared by readers through an atomically reference-counted handle.
87 pub(super) current: Arc<Snapshot>,
88 /// Live base generation named by the superblock; every delta frame and the
89 /// per-generation log filename carry it.
90 pub(super) base_generation: u64,
91 /// Last writer transaction id durably recorded (the last dirty commit's id).
92 /// A rollback burns a session-local id above this but does not advance it.
93 pub(super) last_transaction_id: TransactionId,
94 /// Auto-checkpoint policy consulted after each dirty commit.
95 pub(super) checkpoint_policy: CheckpointPolicy,
96}
97
98impl Db {
99 /// Returns the live base generation named by the superblock (the count of
100 /// folds this store has undergone; gen-0 is the freshly created store).
101 ///
102 /// # Performance
103 ///
104 /// This method is `O(1)`.
105 #[must_use]
106 pub const fn live_generation(&self) -> CheckpointGeneration {
107 CheckpointGeneration::new(self.base_generation)
108 }
109
110 /// Returns operational status for this handle, including the live generation
111 /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
112 /// weighs.
113 ///
114 /// # Performance
115 ///
116 /// This method is `O(visible state)` for the merged counts plus two `stat`
117 /// syscalls for the file sizes.
118 #[must_use]
119 pub fn stats(&self) -> Stats {
120 let view = self.current.view();
121 Stats {
122 visible_commit_seq: self.current.lsn(),
123 last_transaction_id: self.last_transaction_id,
124 live_generation: CheckpointGeneration::new(self.base_generation),
125 base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
126 log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
127 element_count: view.element_count(),
128 relation_count: view.relation_count(),
129 incidence_count: view.incidence_count(),
130 catalog: self.catalog_summary(),
131 }
132 }
133
134 /// Returns a catalog-size summary.
135 ///
136 /// # Performance
137 ///
138 /// This method is `O(catalog entry count)`.
139 #[must_use]
140 pub fn catalog_summary(&self) -> CatalogSummary {
141 CatalogSummary::from_catalog(self.current.view().catalog())
142 }
143
144 /// Starts a read transaction pinned to the current visible snapshot.
145 ///
146 /// # Performance
147 ///
148 /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
149 /// observes a fixed state even across later commits and checkpoints.
150 #[must_use]
151 pub fn reader(&self) -> Reader {
152 Reader {
153 snapshot: Arc::clone(&self.current),
154 }
155 }
156
157 /// Returns the pin identifying the current visible snapshot — the commit
158 /// sequence and checkpoint generation a [`Self::reader`] started now would
159 /// observe — without starting a read transaction.
160 ///
161 /// # Performance
162 ///
163 /// This method is `O(1)`: it copies the current snapshot's identity fields.
164 #[must_use]
165 pub fn pin(&self) -> ReadPin {
166 ReadPin {
167 visible_commit_seq: self.current.lsn(),
168 generation: self.current.generation(),
169 }
170 }
171
172 /// Starts the single writer transaction, acquiring the cross-process writer
173 /// lock for the transaction's lifetime.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`DbError::Txn(crate::error::TxnError::WriterLockHeld)`] when another writer holds
178 /// the lock or [`DbError::Txn(crate::error::TxnError::TransactionIdOverflow)`] when writer
179 /// ids are exhausted.
180 ///
181 /// # Performance
182 ///
183 /// This method is `O(parent change)` map entries with `O(1)` per entry:
184 /// the writer seeds from the parent's published overlay by cloning its
185 /// delta map structure, while label sets, text values, per-subject
186 /// property delta maps, and index postings are `Arc`-shared copy-on-write
187 /// — it scales with the committed-but-unfolded change count (not the base
188 /// size, and not the payload bytes).
189 pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
190 let lock = WriterLock::acquire(&self.root)?;
191 let transaction_id = self
192 .last_transaction_id
193 .checked_next()
194 .ok_or(DbError::Txn(crate::error::TxnError::TransactionIdOverflow))?;
195 // Burn the id eagerly so it is session-local-visible even on rollback;
196 // it only becomes durable when a dirty commit writes its frame, and a
197 // reopen recovers the durable high-water mark from the log.
198 self.last_transaction_id = transaction_id;
199 let parent = Arc::clone(&self.current);
200 // Seed the writer delta from the parent's published overlay so the
201 // writer reads every committed record; the parent overlay is never
202 // mutated (the seed clones its maps).
203 let delta = WriteOverlay::from_overlay(parent.overlay());
204 Ok(Writer {
205 database: self,
206 parent,
207 delta,
208 transaction_id,
209 lock,
210 })
211 }
212
213 /// Runs `f` against a read transaction pinned to the current snapshot. The
214 /// primary read entry point.
215 ///
216 /// # Errors
217 ///
218 /// Propagates whatever error `f` returns.
219 ///
220 /// # Performance
221 ///
222 /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
223 pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
224 f(&self.reader())
225 }
226
227 /// Runs `f` against the single write transaction, committing on `Ok` and
228 /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
229 /// value with the [`CommitOutcome`] (whether a durable frame landed). The
230 /// primary write entry point.
231 ///
232 /// # Errors
233 ///
234 /// Returns [`DbError::Txn(crate::error::TxnError::WriterLockHeld)`] when another writer holds
235 /// the lock, `f`'s error (after rolling back the staged delta), or a commit error.
236 ///
237 /// # Performance
238 ///
239 /// Begin is `O(parent change)` — the writer seeds by cloning the parent
240 /// overlay's delta map structure (label sets, text values, per-subject
241 /// property delta maps, and index postings are `Arc`-shared, so each of
242 /// the `N` committed-but-unfolded entries costs `O(1)`; folded away by a
243 /// checkpoint). Commit is `O(change)`. A triggered auto-fold adds
244 /// `O(visible bytes)`.
245 pub fn write<R>(
246 &mut self,
247 f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
248 ) -> Result<(R, CommitOutcome), DbError> {
249 let mut writer = self.begin_write()?;
250 // On `Err` the `?` drops `writer` here, releasing the lock and discarding
251 // the staged delta — no frame is appended (rollback).
252 let value = f(&mut writer)?;
253 let committed = !writer.delta.is_empty();
254 let lsn = writer.commit()?;
255 let outcome = if committed {
256 CommitOutcome::Committed(lsn)
257 } else {
258 CommitOutcome::Empty
259 };
260 Ok((value, outcome))
261 }
262
263 /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
264 /// writing, returning the [`Bound`] handle bag (for a store already
265 /// bootstrapped with this schema).
266 ///
267 /// # Errors
268 ///
269 /// Returns [`DbError::UnknownName`] when a declared item is absent.
270 ///
271 /// # Performance
272 ///
273 /// This method is `O(declared items × log catalog)`.
274 pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
275 let view = self.current.view();
276 let catalog = view.catalog();
277 let mut bound = Bound::default();
278 for name in &schema.roles {
279 let id = catalog.role_id(name).ok_or_else(|| {
280 DbError::Catalog(crate::error::CatalogError::UnknownName {
281 kind: "role",
282 name: name.clone(),
283 })
284 })?;
285 bound.roles.insert(name.clone(), id);
286 }
287 for name in &schema.labels {
288 let id = catalog.label_id(name).ok_or_else(|| {
289 DbError::Catalog(crate::error::CatalogError::UnknownName {
290 kind: "label",
291 name: name.clone(),
292 })
293 })?;
294 bound.labels.insert(name.clone(), id);
295 }
296 for name in &schema.relation_types {
297 let id = catalog.relation_type_id(name).ok_or_else(|| {
298 DbError::Catalog(crate::error::CatalogError::UnknownName {
299 kind: "relation type",
300 name: name.clone(),
301 })
302 })?;
303 bound.relation_types.insert(name.clone(), id);
304 }
305 for (name, _family, value_type) in &schema.keys {
306 let id = catalog.property_key_id(name).ok_or_else(|| {
307 DbError::Catalog(crate::error::CatalogError::UnknownName {
308 kind: "property key",
309 name: name.clone(),
310 })
311 })?;
312 bound.keys.insert(name.clone(), (id, *value_type));
313 }
314 for (name, key_name) in &schema.equality_indexes {
315 let (_key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
316 DbError::Catalog(crate::error::CatalogError::UnknownName {
317 kind: "property key",
318 name: key_name.clone(),
319 })
320 })?;
321 let id = catalog.index_id(name).ok_or_else(|| {
322 DbError::Catalog(crate::error::CatalogError::UnknownName {
323 kind: "index",
324 name: name.clone(),
325 })
326 })?;
327 bound
328 .equality_indexes
329 .insert(name.clone(), (id, value_type));
330 }
331 for spec in &schema.graph_projections {
332 let id = catalog.projection_id(&spec.name).ok_or_else(|| {
333 DbError::Catalog(crate::error::CatalogError::UnknownName {
334 kind: "projection",
335 name: spec.name.clone(),
336 })
337 })?;
338 bound.projections.insert(spec.name.clone(), id);
339 }
340 Ok(bound)
341 }
342
343 /// Prepares a query against the current catalog.
344 ///
345 /// # Errors
346 ///
347 /// Returns [`DbError`] when parsing or semantic analysis fails.
348 ///
349 /// # Performance
350 ///
351 /// This method is `O(query length + catalog lookup cost)`.
352 pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
353 PreparedQuery::prepare(query, &self.current.view())
354 }
355}
356
357/// Snapshot of database status.
358///
359/// # Performance
360///
361/// Copying and comparing status is `O(1)`.
362#[derive(Clone, Copy, Debug, Eq, PartialEq)]
363pub struct Stats {
364 /// Last visible commit sequence.
365 pub visible_commit_seq: CommitSeq,
366 /// Last writer transaction ID burned by this handle.
367 ///
368 /// This value is durable after a dirty commit and session-local after
369 /// rollback.
370 pub last_transaction_id: TransactionId,
371 /// Live base generation named by the superblock (the count of folds this
372 /// store has undergone; gen-0 is the freshly created store).
373 pub live_generation: CheckpointGeneration,
374 /// On-disk byte size of the live base file.
375 pub base_byte_size: u64,
376 /// On-disk byte size of the live delta-log (the tail recovery replays and
377 /// the auto-checkpoint policy weighs against the base size).
378 pub log_byte_size: u64,
379 /// Visible element count.
380 pub element_count: usize,
381 /// Visible relation count.
382 pub relation_count: usize,
383 /// Visible incidence count.
384 pub incidence_count: usize,
385 /// Catalog-size summary.
386 pub catalog: CatalogSummary,
387}
388
389/// Catalog-size summary.
390///
391/// # Performance
392///
393/// Copying and comparing are `O(1)`.
394#[derive(Clone, Copy, Debug, Eq, PartialEq)]
395pub struct CatalogSummary {
396 /// Role count.
397 pub role_count: usize,
398 /// Label count.
399 pub label_count: usize,
400 /// Relation type count.
401 pub relation_type_count: usize,
402 /// Property key count.
403 pub property_key_count: usize,
404 /// Projection count.
405 pub projection_count: usize,
406 /// Index count.
407 pub index_count: usize,
408}
409
410impl CatalogSummary {
411 /// Builds a summary from a catalog.
412 ///
413 /// # Performance
414 ///
415 /// This function is `O(catalog entry count)`.
416 #[must_use]
417 pub fn from_catalog(catalog: &Catalog) -> Self {
418 Self {
419 role_count: catalog.roles().count(),
420 label_count: catalog.labels().count(),
421 relation_type_count: catalog.relation_types().count(),
422 property_key_count: catalog.property_keys().count(),
423 projection_count: catalog.projections().count(),
424 index_count: catalog.indexes().count(),
425 }
426 }
427}