oxgraph_db/database.rs
1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13 borrow::Cow,
14 collections::BTreeSet,
15 path::{Path, PathBuf},
16 sync::Arc,
17};
18
19use oxgraph_algo::{
20 PageRankConfig, PageRankError, PageRankWorkspace, Uniform, longest_path_dag,
21 pagerank_graph_with_workspace,
22};
23use oxgraph_graph::{CanonicalElementIdentity, ElementIndex, LocalElementIdentity};
24
25use crate::{
26 Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId,
27 GraphProjectionDefinition, GraphProjectionSpec, IncidenceId, IncidenceRecord, IndexId, LabelId,
28 PreparedQuery, ProjectionDefinition, ProjectionId, Properties, PropertyKeyId, PropertySubject,
29 PropertyType, PropertyValue, QueryResult, Relation, RelationId, RelationTypeId, RoleId, Schema,
30 TransactionId,
31 backing::Base,
32 catalog::{IndexDefinition, PropertyFamily},
33 freeze::{self, FreezeStamps},
34 lock::WriterLock,
35 overlay::{Overlay, Snapshot, StateView, WriteOverlay},
36 projection::{self, GraphProjection, HypergraphProjection, ProjectionElementId},
37 state::NextIds,
38 storage,
39 traversal::{self, Direction, Subgraph, Walk},
40 typed::{Assignable, EqualityIndex, Key, ValueType},
41 wal,
42 wire::SuperblockRecord,
43};
44
45/// Lookup input for a cataloged index.
46///
47/// This type makes index lookup shape explicit: membership indexes accept
48/// [`Match::All`], single-property indexes accept scalar equality or
49/// range inputs, and composite equality indexes accept an ordered value tuple.
50///
51/// # Performance
52///
53/// Copying this value is `O(1)`.
54#[derive(Clone, Copy, Debug)]
55pub enum Match<'value> {
56 /// Lookup every subject represented by a membership-style index.
57 All,
58 /// Lookup one scalar equality value.
59 Equal(&'value PropertyValue),
60 /// Lookup one inclusive scalar range.
61 Range {
62 /// Inclusive lower bound.
63 min: &'value PropertyValue,
64 /// Inclusive upper bound.
65 max: &'value PropertyValue,
66 },
67 /// Lookup one ordered composite equality tuple.
68 Composite(&'value [PropertyValue]),
69}
70
71/// Auto-checkpoint policy: decides when a dirty commit should fold the
72/// delta-log into a fresh base generation, bounding the log tail that recovery
73/// must replay.
74///
75/// The default is size-ratio: trigger when the delta-log grows past `factor`
76/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
77/// disables auto-triggering entirely (folded only by an explicit
78/// [`Db::compact`]).
79///
80/// # Performance
81///
82/// Copying this value is `O(1)`.
83#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum CheckpointPolicy {
85 /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
86 Manual,
87 /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
88 /// times the live base size (a small floor guards a tiny/empty base so the
89 /// gen-0 store does not checkpoint on its first commit).
90 SizeRatio {
91 /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
92 /// before the next dirty commit folds it.
93 factor: u32,
94 },
95}
96
97impl CheckpointPolicy {
98 /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
99 /// four times the live base size.
100 pub const DEFAULT_FACTOR: u32 = 4;
101
102 /// The base-size floor (bytes) below which the size-ratio policy never fires,
103 /// so a freshly created (near-empty) base is not checkpointed on its first
104 /// commits before it carries meaningful data.
105 const MIN_BASE_BYTES: u64 = 4 * 1024;
106
107 /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
108 /// should trigger an auto-checkpoint under this policy.
109 ///
110 /// # Performance
111 ///
112 /// This method is `O(1)`.
113 #[must_use]
114 const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
115 match self {
116 Self::Manual => false,
117 Self::SizeRatio { factor } => {
118 let floor = if base_bytes < Self::MIN_BASE_BYTES {
119 Self::MIN_BASE_BYTES
120 } else {
121 base_bytes
122 };
123 log_bytes > floor.saturating_mul(factor as u64)
124 }
125 }
126 }
127}
128
129impl Default for CheckpointPolicy {
130 /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
131 ///
132 /// # Performance
133 ///
134 /// This function is `O(1)`.
135 fn default() -> Self {
136 Self::SizeRatio {
137 factor: Self::DEFAULT_FACTOR,
138 }
139 }
140}
141
142/// The durable result of a [`Db::write`]: whether a frame landed, and at which
143/// commit sequence.
144///
145/// # Performance
146///
147/// Copying this value is `O(1)`.
148#[derive(Clone, Copy, Debug, Eq, PartialEq)]
149#[non_exhaustive]
150pub enum CommitOutcome {
151 /// The transaction made no changes; no WAL frame was appended.
152 Empty,
153 /// A durable frame landed at this commit sequence.
154 Committed(CommitSeq),
155}
156
157/// Builds the base filename for generation `generation`.
158///
159/// # Performance
160///
161/// This function is `O(1)`.
162fn base_file(generation: u64) -> String {
163 format!("base-{generation}.oxgdb")
164}
165
166/// Builds the delta-log filename for generation `generation`.
167///
168/// # Performance
169///
170/// This function is `O(1)`.
171fn delta_file(generation: u64) -> String {
172 format!("delta-{generation}.log")
173}
174
175/// Open OXGDB database handle.
176///
177/// # Performance
178///
179/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
180/// delta-log handle.
181pub struct Db {
182 /// Root database directory.
183 root: PathBuf,
184 /// The current visible snapshot (base generation + published overlay),
185 /// shared by readers through an atomically reference-counted handle.
186 current: Arc<Snapshot>,
187 /// Live base generation named by the superblock; every delta frame and the
188 /// per-generation log filename carry it.
189 base_generation: u64,
190 /// Last writer transaction id durably recorded (the last dirty commit's id).
191 /// A rollback burns a session-local id above this but does not advance it.
192 last_transaction_id: TransactionId,
193 /// Auto-checkpoint policy consulted after each dirty commit.
194 checkpoint_policy: CheckpointPolicy,
195}
196
197impl Db {
198 /// Creates a new empty OXGDB database at `path`.
199 ///
200 /// The create order is base-0 then empty delta-0.log then the writer lock
201 /// file then the superblock (written LAST as the create-complete marker), so
202 /// a half-created store is detected on open rather than silently opened
203 /// empty.
204 ///
205 /// # Errors
206 ///
207 /// Returns [`DbError::AlreadyExists`] when a store already exists, or
208 /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
209 ///
210 /// # Performance
211 ///
212 /// This function is `O(empty base bytes)`.
213 pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
214 let root = path.as_ref().to_path_buf();
215 if root.join(wal::SUPERBLOCK_FILE).exists() {
216 return Err(DbError::AlreadyExists);
217 }
218 // Base-0: an empty merged view (empty base under an empty overlay).
219 let empty_base = crate::overlay::BaseRecords::empty();
220 let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
221 let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
222 let base_bytes = freeze::freeze_view(
223 &view,
224 FreezeStamps {
225 commit_seq: 0,
226 transaction_id: 0,
227 generation: 0,
228 },
229 )?;
230 storage::atomic_write(
231 &root,
232 &root.join(format!("{}.tmp", base_file(0))),
233 &root.join(base_file(0)),
234 &base_bytes,
235 )?;
236 // Empty delta-0.log, durably created.
237 create_empty_log(&root, 0)?;
238 // Superblock is written LAST; its existence is the create-complete marker.
239 write_superblock(&root, 0, 0, 0, 0)?;
240 Self::open(&root)
241 }
242
243 /// Opens an existing OXGDB database, recovering the live frontier from the
244 /// valid prefix of the delta-log replayed over the base named by the
245 /// superblock.
246 ///
247 /// # Errors
248 ///
249 /// Returns [`DbError`] when the store is missing, malformed, or the log is
250 /// corrupt beyond a torn tail.
251 ///
252 /// # Performance
253 ///
254 /// This function is `O(base bytes + log bytes)`.
255 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
256 let root = path.as_ref().to_path_buf();
257 let superblock = wal::read_superblock(&root)?;
258 let generation = superblock.base_generation.get();
259
260 let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
261 let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
262 let base_header = *base.get().header();
263 let base_catalog = base.get().catalog().clone();
264 let base_next = NextIds::from_header(&base_header);
265
266 // Replay the valid prefix of the per-generation delta-log.
267 let log_path = root.join(delta_file(generation));
268 let log_bytes = read_log(&log_path)?;
269 let outcome = wal::replay(generation, &log_bytes)?;
270 // A torn tail truncates the log back to its last-good byte length.
271 if outcome.valid_len < log_bytes.len() {
272 truncate_log(&log_path, outcome.valid_len)?;
273 }
274
275 // Fold the replayed frames into a fresh overlay over the base, deriving
276 // the live frontier (commit_seq/txn_id) from the last good frame.
277 let mut write = WriteOverlay::new(base_next, base_catalog);
278 let mut recovered_next = base_next;
279 let mut last_commit_seq = superblock.commit_seq.get();
280 let mut last_txn = superblock.transaction_id.get();
281 for frame in &outcome.frames {
282 for op in &frame.ops {
283 write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
284 }
285 recovered_next = recovered_next.elementwise_max(write.next_ids());
286 last_commit_seq = frame.lsn;
287 last_txn = last_txn.max(frame.txn_id);
288 }
289 // ids are never reused: the recovered watermark is the elementwise max of
290 // the base header and every replayed frame's watermark.
291 write.set_next_ids(recovered_next);
292 let overlay = Arc::new(write.freeze());
293
294 // Reuse the records already decoded for replay instead of decoding the base
295 // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
296 // the records (and their derived index) match. Halves open's base-decode cost.
297 let snapshot = Arc::new(Snapshot::with_shared_base_records(
298 CheckpointGeneration::new(generation),
299 CommitSeq::new(last_commit_seq),
300 base,
301 overlay,
302 base_records,
303 ));
304
305 Ok(Self {
306 root,
307 current: snapshot,
308 base_generation: generation,
309 last_transaction_id: TransactionId::new(last_txn),
310 checkpoint_policy: CheckpointPolicy::default(),
311 })
312 }
313
314 /// Returns the live base generation named by the superblock (the count of
315 /// folds this store has undergone; gen-0 is the freshly created store).
316 ///
317 /// # Performance
318 ///
319 /// This method is `O(1)`.
320 #[must_use]
321 pub const fn live_generation(&self) -> CheckpointGeneration {
322 CheckpointGeneration::new(self.base_generation)
323 }
324
325 /// Returns the configured auto-checkpoint policy.
326 ///
327 /// # Performance
328 ///
329 /// This method is `O(1)`.
330 #[must_use]
331 pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
332 self.checkpoint_policy
333 }
334
335 /// Sets the auto-checkpoint policy consulted after each dirty commit.
336 ///
337 /// # Performance
338 ///
339 /// This method is `O(1)`.
340 pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
341 self.checkpoint_policy = policy;
342 }
343
344 /// Validates the current handle by re-reading the superblock and verifying
345 /// the live base's content CRC.
346 ///
347 /// # Errors
348 ///
349 /// Returns [`DbError`] when the superblock or base fails validation.
350 ///
351 /// # Performance
352 ///
353 /// This method is `O(base bytes)`.
354 pub fn validate(&self) -> Result<(), DbError> {
355 wal::read_superblock(&self.root)?;
356 Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
357 }
358
359 /// Validates an OXGDB database at `path`.
360 ///
361 /// # Errors
362 ///
363 /// Returns [`DbError`] when the store fails to open and recover.
364 ///
365 /// # Performance
366 ///
367 /// This function is `O(base bytes + log bytes)`.
368 pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
369 Self::open(path).map(|_database| ())
370 }
371
372 /// Folds the current base+overlay into a new base generation, rotating the
373 /// delta-log and republishing the superblock (a manual checkpoint).
374 ///
375 /// This is the checkpoint primitive, exposed here so the existing `compact`
376 /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
377 /// configured separately via [`Db::set_checkpoint_policy`].
378 ///
379 /// # Errors
380 ///
381 /// Returns [`DbError`] when encoding, writing, or publishing the new
382 /// generation fails.
383 ///
384 /// # Performance
385 ///
386 /// This method is `O(visible state bytes)`.
387 pub fn compact(&mut self) -> Result<(), DbError> {
388 self.checkpoint()
389 }
390
391 /// Folds the current base+overlay into base-`{g+1}`, creates an empty
392 /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
393 /// linearization point), then unlinks the old base and log.
394 ///
395 /// The order is crash-safe: the new base is fully durable BEFORE the
396 /// superblock names it (so a crash before the superblock leaves the OLD
397 /// superblock authoritative and the orphan new base is ignored), and the old
398 /// base/log are unlinked only AFTER the superblock names the new generation
399 /// (so a crash before the unlink leaves the NEW superblock authoritative and
400 /// the orphan old files are ignored). The
401 /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
402 ///
403 /// # Errors
404 ///
405 /// Returns [`DbError`] when encoding, writing, or publishing fails.
406 ///
407 /// # Performance
408 ///
409 /// This method is `O(visible state bytes)`.
410 pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
411 self.checkpoint_inner(
412 #[cfg(test)]
413 CheckpointStop::Complete,
414 )
415 }
416
417 /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
418 /// [`CheckpointStop`] that simulates a crash by returning early right after a
419 /// chosen fsync point, leaving the on-disk files exactly as a real crash
420 /// there would, so the crash-matrix test can reopen and assert recovery.
421 ///
422 /// # Errors
423 ///
424 /// Returns [`DbError`] when encoding, writing, or publishing fails.
425 ///
426 /// # Performance
427 ///
428 /// This method is `O(visible state bytes)`.
429 fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
430 let _lock = WriterLock::acquire(&self.root)?;
431 let next_generation = self
432 .base_generation
433 .checked_add(1)
434 .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
435 let view = self.current.view();
436 let commit_seq = self.current.lsn().get();
437 let base_bytes = freeze::freeze_view(
438 &view,
439 FreezeStamps {
440 commit_seq,
441 transaction_id: self.last_transaction_id.get(),
442 generation: next_generation,
443 },
444 )?;
445 // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
446 storage::atomic_write(
447 &self.root,
448 &self
449 .root
450 .join(format!("{}.tmp", base_file(next_generation))),
451 &self.root.join(base_file(next_generation)),
452 &base_bytes,
453 )?;
454 // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
455 create_empty_log(&self.root, next_generation)?;
456 // Crash point A: new base + new log durable, superblock NOT yet
457 // published. The OLD superblock still names `g`, so recovery uses the old
458 // generation; the new base/log are orphans.
459 #[cfg(test)]
460 if matches!(stop, CheckpointStop::BeforeSuperblock) {
461 return Ok(());
462 }
463 // (3) publish the superblock naming g+1 — the linearization point.
464 write_superblock(
465 &self.root,
466 next_generation,
467 commit_seq,
468 commit_seq,
469 self.last_transaction_id.get(),
470 )?;
471 // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
472 // Recovery uses the new generation; the old base/log are orphans.
473 #[cfg(test)]
474 if matches!(stop, CheckpointStop::BeforeRotate) {
475 return Ok(());
476 }
477 // Re-open over the new generation, then (4) unlink the old base + log.
478 let reopened = Self::open(&self.root)?;
479 let old_generation = self.base_generation;
480 let policy = self.checkpoint_policy;
481 self.current = reopened.current;
482 self.base_generation = reopened.base_generation;
483 self.last_transaction_id = reopened.last_transaction_id;
484 // The reopen reset the policy to the default; restore the caller's.
485 self.checkpoint_policy = policy;
486 let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
487 let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
488 let _ = storage::sync_directory(&self.root);
489 Ok(())
490 }
491
492 /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
493 /// delta-log has grown too large relative to the base. Called after a dirty
494 /// commit publishes its frame. A failed fold is surfaced so the caller can
495 /// observe it; the committed data is already durable in the log regardless.
496 ///
497 /// # Errors
498 ///
499 /// Returns [`DbError`] when the triggered fold fails.
500 ///
501 /// # Performance
502 ///
503 /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
504 fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
505 let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
506 let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
507 if self
508 .checkpoint_policy
509 .should_checkpoint(log_bytes, base_bytes)
510 {
511 self.checkpoint()?;
512 }
513 Ok(())
514 }
515
516 /// Returns operational status for this handle, including the live generation
517 /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
518 /// weighs.
519 ///
520 /// # Performance
521 ///
522 /// This method is `O(visible state)` for the merged counts plus two `stat`
523 /// syscalls for the file sizes.
524 #[must_use]
525 pub fn stats(&self) -> Stats {
526 let view = self.current.view();
527 Stats {
528 visible_commit_seq: self.current.lsn(),
529 last_transaction_id: self.last_transaction_id,
530 live_generation: CheckpointGeneration::new(self.base_generation),
531 base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
532 log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
533 element_count: view.element_count(),
534 relation_count: view.relation_count(),
535 incidence_count: view.incidence_count(),
536 catalog: self.catalog_summary(),
537 }
538 }
539
540 /// Returns a catalog-size summary.
541 ///
542 /// # Performance
543 ///
544 /// This method is `O(catalog entry count)`.
545 #[must_use]
546 pub fn catalog_summary(&self) -> CatalogSummary {
547 CatalogSummary::from_catalog(self.current.view().catalog())
548 }
549
550 /// Starts a read transaction pinned to the current visible snapshot.
551 ///
552 /// # Performance
553 ///
554 /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
555 /// observes a fixed state even across later commits and checkpoints.
556 #[must_use]
557 pub fn reader(&self) -> Reader {
558 Reader {
559 snapshot: Arc::clone(&self.current),
560 }
561 }
562
563 /// Starts the single writer transaction, acquiring the cross-process writer
564 /// lock for the transaction's lifetime.
565 ///
566 /// # Errors
567 ///
568 /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
569 /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
570 ///
571 /// # Performance
572 ///
573 /// This method is `O(1)`: the writer layers a fresh empty write overlay over
574 /// the current snapshot.
575 pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
576 let lock = WriterLock::acquire(&self.root)?;
577 let transaction_id = self
578 .last_transaction_id
579 .checked_next()
580 .ok_or(DbError::TransactionIdOverflow)?;
581 // Burn the id eagerly so it is session-local-visible even on rollback;
582 // it only becomes durable when a dirty commit writes its frame, and a
583 // reopen recovers the durable high-water mark from the log.
584 self.last_transaction_id = transaction_id;
585 let parent = Arc::clone(&self.current);
586 // Seed the writer delta from the parent's published overlay so the
587 // writer reads every committed record; the parent overlay is never
588 // mutated (the seed clones its maps).
589 let delta = WriteOverlay::from_overlay(parent.overlay());
590 Ok(Writer {
591 database: self,
592 parent,
593 delta,
594 transaction_id,
595 lock,
596 })
597 }
598
599 /// Runs `f` against a read transaction pinned to the current snapshot. The
600 /// primary read entry point.
601 ///
602 /// # Errors
603 ///
604 /// Propagates whatever error `f` returns.
605 ///
606 /// # Performance
607 ///
608 /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
609 pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
610 f(&self.reader())
611 }
612
613 /// Runs `f` against the single write transaction, committing on `Ok` and
614 /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
615 /// value with the [`CommitOutcome`] (whether a durable frame landed). The
616 /// primary write entry point.
617 ///
618 /// # Errors
619 ///
620 /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock,
621 /// `f`'s error (after rolling back the staged delta), or a commit error.
622 ///
623 /// # Performance
624 ///
625 /// Begin is `O(1)`; commit is `O(change)`. A triggered auto-fold adds
626 /// `O(visible bytes)`.
627 pub fn write<R>(
628 &mut self,
629 f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
630 ) -> Result<(R, CommitOutcome), DbError> {
631 let mut writer = self.begin_write()?;
632 // On `Err` the `?` drops `writer` here, releasing the lock and discarding
633 // the staged delta — no frame is appended (rollback).
634 let value = f(&mut writer)?;
635 let committed = !writer.delta.is_empty();
636 let lsn = writer.commit()?;
637 let outcome = if committed {
638 CommitOutcome::Committed(lsn)
639 } else {
640 CommitOutcome::Empty
641 };
642 Ok((value, outcome))
643 }
644
645 /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
646 /// writing, returning the [`Bound`] handle bag (for a store already
647 /// bootstrapped with this schema).
648 ///
649 /// # Errors
650 ///
651 /// Returns [`DbError::UnknownName`] when a declared item is absent.
652 ///
653 /// # Performance
654 ///
655 /// This method is `O(declared items × log catalog)`.
656 pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
657 let view = self.current.view();
658 let catalog = view.catalog();
659 let mut bound = Bound::default();
660 for name in &schema.roles {
661 let id = catalog.role_id(name).ok_or_else(|| DbError::UnknownName {
662 kind: "role",
663 name: name.clone(),
664 })?;
665 bound.roles.insert(name.clone(), id);
666 }
667 for name in &schema.labels {
668 let id = catalog.label_id(name).ok_or_else(|| DbError::UnknownName {
669 kind: "label",
670 name: name.clone(),
671 })?;
672 bound.labels.insert(name.clone(), id);
673 }
674 for name in &schema.relation_types {
675 let id = catalog
676 .relation_type_id(name)
677 .ok_or_else(|| DbError::UnknownName {
678 kind: "relation type",
679 name: name.clone(),
680 })?;
681 bound.relation_types.insert(name.clone(), id);
682 }
683 for (name, _family, value_type) in &schema.keys {
684 let id = catalog
685 .property_key_id(name)
686 .ok_or_else(|| DbError::UnknownName {
687 kind: "property key",
688 name: name.clone(),
689 })?;
690 bound.keys.insert(name.clone(), (id, *value_type));
691 }
692 for (name, key_name) in &schema.equality_indexes {
693 let (_key_id, value_type) =
694 *bound
695 .keys
696 .get(key_name)
697 .ok_or_else(|| DbError::UnknownName {
698 kind: "property key",
699 name: key_name.clone(),
700 })?;
701 let id = catalog.index_id(name).ok_or_else(|| DbError::UnknownName {
702 kind: "index",
703 name: name.clone(),
704 })?;
705 bound
706 .equality_indexes
707 .insert(name.clone(), (id, value_type));
708 }
709 for spec in &schema.graph_projections {
710 let id = catalog
711 .projection_id(&spec.name)
712 .ok_or_else(|| DbError::UnknownName {
713 kind: "projection",
714 name: spec.name.clone(),
715 })?;
716 bound.projections.insert(spec.name.clone(), id);
717 }
718 Ok(bound)
719 }
720
721 /// Prepares a query against the current catalog.
722 ///
723 /// # Errors
724 ///
725 /// Returns [`DbError`] when parsing or semantic analysis fails.
726 ///
727 /// # Performance
728 ///
729 /// This method is `O(query length + catalog lookup cost)`.
730 pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
731 PreparedQuery::prepare(query, &self.current.view())
732 }
733}
734
735/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
736/// be stat'd (size is advisory — used for status reporting and the
737/// auto-checkpoint heuristic, never for correctness).
738///
739/// # Performance
740///
741/// This function is `O(1)`: one `stat` syscall.
742fn file_len(path: &Path) -> u64 {
743 std::fs::metadata(path).map_or(0, |meta| meta.len())
744}
745
746/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
747/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
748/// the recovered state at each crash window.
749///
750/// The crash-matrix test that constructs the non-`Complete` variants is
751/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
752/// miri's isolation cannot model), so under miri only `Complete` is constructed
753/// and the other variants are expectedly unused.
754///
755/// # Performance
756///
757/// `perf: unspecified`; a test-only control tag.
758#[cfg(test)]
759#[cfg_attr(
760 miri,
761 expect(
762 dead_code,
763 reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
764 )
765)]
766#[derive(Clone, Copy, Debug, Eq, PartialEq)]
767enum CheckpointStop {
768 /// Run the whole checkpoint (the production path).
769 Complete,
770 /// Stop after the new base + new log are durable, before the superblock is
771 /// published (the old superblock stays authoritative).
772 BeforeSuperblock,
773 /// Stop after the superblock names the new generation, before the old
774 /// base/log are unlinked (the new superblock is authoritative).
775 BeforeRotate,
776}
777
778/// Reads the whole delta-log into memory, treating a missing file as empty.
779///
780/// # Errors
781///
782/// Returns [`DbError::Io`] when the file cannot be read.
783///
784/// # Performance
785///
786/// This function is `O(log bytes)`.
787fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
788 match std::fs::read(path) {
789 Ok(bytes) => Ok(bytes),
790 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
791 Err(error) => Err(DbError::io("read delta-log", error)),
792 }
793}
794
795/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
796/// discarding a torn tail under the open path.
797///
798/// # Errors
799///
800/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
801///
802/// # Performance
803///
804/// This function is `O(1)`.
805fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
806 let file = std::fs::OpenOptions::new()
807 .write(true)
808 .open(path)
809 .map_err(|error| DbError::io("open delta-log for truncate", error))?;
810 let len = u64::try_from(len)
811 .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
812 file.set_len(len)
813 .map_err(|error| DbError::io("truncate delta-log", error))?;
814 file.sync_all()
815 .map_err(|error| DbError::io("sync truncated delta-log", error))
816}
817
818/// Creates an empty per-generation delta-log, fsyncing the file and the
819/// directory entry so the new (empty) log is durable.
820///
821/// # Errors
822///
823/// Returns [`DbError::Io`] when creation or syncing fails.
824///
825/// # Performance
826///
827/// This function is `O(1)`.
828fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
829 let path = root.join(delta_file(generation));
830 let file =
831 std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
832 file.sync_all()
833 .map_err(|error| DbError::io("sync delta-log", error))?;
834 storage::sync_directory(root)
835}
836
837/// Opens the live delta-log for appending (create when absent, read+append).
838///
839/// # Errors
840///
841/// Returns [`DbError::Io`] when the log cannot be opened.
842///
843/// # Performance
844///
845/// This function is `O(1)`.
846fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
847 std::fs::OpenOptions::new()
848 .create(true)
849 .truncate(false)
850 .read(true)
851 .append(true)
852 .open(root.join(delta_file(generation)))
853 .map_err(|error| DbError::io("open delta-log for append", error))
854}
855
856/// Writes the superblock naming `generation` with the given frontier stamps.
857///
858/// # Errors
859///
860/// Returns [`DbError::Io`] when publishing fails.
861///
862/// # Performance
863///
864/// This function is `O(1)`.
865fn write_superblock(
866 root: &Path,
867 generation: u64,
868 checkpoint_lsn: u64,
869 commit_seq: u64,
870 transaction_id: u64,
871) -> Result<(), DbError> {
872 wal::write_superblock(
873 root,
874 &SuperblockRecord {
875 magic: crate::wire::SUPERBLOCK_MAGIC,
876 base_generation: generation.into(),
877 checkpoint_lsn: checkpoint_lsn.into(),
878 log_byte_offset: 0u64.into(),
879 commit_seq: commit_seq.into(),
880 transaction_id: transaction_id.into(),
881 format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
882 flags: 0u32.into(),
883 crc32c: 0u32.into(),
884 pad: 0u32.into(),
885 },
886 )
887}
888
889/// Snapshot of database status.
890///
891/// # Performance
892///
893/// Copying and comparing status is `O(1)`.
894#[derive(Clone, Copy, Debug, Eq, PartialEq)]
895pub struct Stats {
896 /// Last visible commit sequence.
897 pub visible_commit_seq: CommitSeq,
898 /// Last writer transaction ID burned by this handle.
899 ///
900 /// This value is durable after a dirty commit and session-local after
901 /// rollback.
902 pub last_transaction_id: TransactionId,
903 /// Live base generation named by the superblock (the count of folds this
904 /// store has undergone; gen-0 is the freshly created store).
905 pub live_generation: CheckpointGeneration,
906 /// On-disk byte size of the live base file.
907 pub base_byte_size: u64,
908 /// On-disk byte size of the live delta-log (the tail recovery replays and
909 /// the auto-checkpoint policy weighs against the base size).
910 pub log_byte_size: u64,
911 /// Visible element count.
912 pub element_count: usize,
913 /// Visible relation count.
914 pub relation_count: usize,
915 /// Visible incidence count.
916 pub incidence_count: usize,
917 /// Catalog-size summary.
918 pub catalog: CatalogSummary,
919}
920
921/// Catalog-size summary.
922///
923/// # Performance
924///
925/// Copying and comparing are `O(1)`.
926#[derive(Clone, Copy, Debug, Eq, PartialEq)]
927pub struct CatalogSummary {
928 /// Role count.
929 pub role_count: usize,
930 /// Label count.
931 pub label_count: usize,
932 /// Relation type count.
933 pub relation_type_count: usize,
934 /// Property key count.
935 pub property_key_count: usize,
936 /// Projection count.
937 pub projection_count: usize,
938 /// Index count.
939 pub index_count: usize,
940}
941
942impl CatalogSummary {
943 /// Builds a summary from a catalog.
944 ///
945 /// # Performance
946 ///
947 /// This function is `O(catalog entry count)`.
948 #[must_use]
949 pub fn from_catalog(catalog: &Catalog) -> Self {
950 Self {
951 role_count: catalog.roles().count(),
952 label_count: catalog.labels().count(),
953 relation_type_count: catalog.relation_types().count(),
954 property_key_count: catalog.property_keys().count(),
955 projection_count: catalog.projections().count(),
956 index_count: catalog.indexes().count(),
957 }
958 }
959}
960
961/// Reader pin identifying the visible database generation.
962///
963/// # Performance
964///
965/// Copying and comparing a pin is `O(1)`.
966#[derive(Clone, Copy, Debug, Eq, PartialEq)]
967pub struct ReadPin {
968 /// Pinned visible commit sequence.
969 pub visible_commit_seq: CommitSeq,
970 /// Pinned checkpoint generation.
971 pub generation: CheckpointGeneration,
972}
973
974/// Read transaction over a pinned snapshot.
975///
976/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
977/// [`Db`], so it stays valid across a later `begin_write`/`checkpoint` on
978/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
979/// is [`Send`] + [`Sync`] (asserted below).
980///
981/// # Performance
982///
983/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
984/// snapshot through an `Arc`, not by copying.
985pub struct Reader {
986 /// The pinned snapshot this reader observes.
987 snapshot: Arc<Snapshot>,
988}
989
990/// Returns whether a [`Reader::neighbors`] walk should follow the edge from the
991/// incidence `from` (the queried element's incidence) to the incidence `to`
992/// (a candidate neighbor's incidence) under `direction`.
993///
994/// Endpoint roles are encoded by incidence-creation order: the source endpoint
995/// has the lower incidence id. `Outgoing` follows source→target (the queried
996/// element is the source, so `from < to`), `Incoming` follows target→source, and
997/// `Both` follows either side.
998///
999/// # Performance
1000///
1001/// This function is `O(1)`.
1002const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
1003 match direction {
1004 Direction::Outgoing => from.get() < to.get(),
1005 Direction::Incoming => from.get() > to.get(),
1006 Direction::Both => true,
1007 }
1008}
1009
1010/// `Reader` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
1011/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
1012const fn assert_send_sync<T: Send + Sync>() {}
1013const _: () = assert_send_sync::<Reader>();
1014const _: () = assert_send_sync::<Arc<Snapshot>>();
1015
1016impl Reader {
1017 /// Returns this transaction's reader pin.
1018 ///
1019 /// # Performance
1020 ///
1021 /// This method is `O(1)`.
1022 #[must_use]
1023 pub fn pin(&self) -> ReadPin {
1024 ReadPin {
1025 visible_commit_seq: self.snapshot.lsn(),
1026 generation: self.snapshot.generation(),
1027 }
1028 }
1029
1030 /// Returns catalog metadata.
1031 ///
1032 /// # Performance
1033 ///
1034 /// This method is `O(1)`.
1035 #[must_use]
1036 pub fn catalog(&self) -> &Catalog {
1037 self.snapshot.view().catalog_ref()
1038 }
1039
1040 /// Returns visible element count.
1041 ///
1042 /// # Performance
1043 ///
1044 /// This method is `O(base + overlay change)`.
1045 #[must_use]
1046 pub fn element_count(&self) -> usize {
1047 self.snapshot.view().element_count()
1048 }
1049
1050 /// Returns visible relation count.
1051 ///
1052 /// # Performance
1053 ///
1054 /// This method is `O(base + overlay change)`.
1055 #[must_use]
1056 pub fn relation_count(&self) -> usize {
1057 self.snapshot.view().relation_count()
1058 }
1059
1060 /// Returns visible incidence count.
1061 ///
1062 /// # Performance
1063 ///
1064 /// This method is `O(base + overlay change)`.
1065 #[must_use]
1066 pub fn incidence_count(&self) -> usize {
1067 self.snapshot.view().incidence_count()
1068 }
1069
1070 /// Returns every visible element id in id order.
1071 ///
1072 /// # Performance
1073 ///
1074 /// This method is `O(element count)`.
1075 #[must_use]
1076 pub fn element_ids(&self) -> Vec<ElementId> {
1077 self.snapshot
1078 .view()
1079 .elements()
1080 .map(|record| record.id)
1081 .collect()
1082 }
1083
1084 /// Returns every visible relation id in id order.
1085 ///
1086 /// # Performance
1087 ///
1088 /// This method is `O(relation count)`.
1089 #[must_use]
1090 pub fn relation_ids(&self) -> Vec<RelationId> {
1091 self.snapshot
1092 .view()
1093 .relations()
1094 .map(|record| record.id)
1095 .collect()
1096 }
1097
1098 /// Returns whether an element exists.
1099 ///
1100 /// # Performance
1101 ///
1102 /// This method is `O(log change + log n)`.
1103 #[must_use]
1104 pub fn contains_element(&self, id: ElementId) -> bool {
1105 self.snapshot.view().contains_element(id)
1106 }
1107
1108 /// Returns whether a relation exists.
1109 ///
1110 /// # Performance
1111 ///
1112 /// This method is `O(log change + log n)`.
1113 #[must_use]
1114 pub fn contains_relation(&self, id: RelationId) -> bool {
1115 self.snapshot.view().contains_relation(id)
1116 }
1117
1118 /// Returns whether an incidence exists.
1119 ///
1120 /// # Performance
1121 ///
1122 /// This method is `O(log change + log n)`.
1123 #[must_use]
1124 pub fn contains_incidence(&self, id: IncidenceId) -> bool {
1125 self.snapshot.view().contains_incidence(id)
1126 }
1127
1128 /// Returns an owned element view — id, labels, and all properties read in one
1129 /// call.
1130 ///
1131 /// # Performance
1132 ///
1133 /// This method is `O(log n + label count + property count)`.
1134 #[must_use]
1135 pub fn element(&self, id: ElementId) -> Option<Element> {
1136 let view = self.snapshot.view();
1137 let record = view.element_ref(id)?;
1138 let labels = record.labels.iter().copied().collect();
1139 let properties =
1140 Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
1141 Some(Element::new(id, labels, properties))
1142 }
1143
1144 /// Returns an owned relation view — id, type, labels, and all properties read
1145 /// in one call.
1146 ///
1147 /// # Performance
1148 ///
1149 /// This method is `O(log n + label count + property count)`.
1150 #[must_use]
1151 pub fn relation(&self, id: RelationId) -> Option<Relation> {
1152 let view = self.snapshot.view();
1153 let record = view.relation_ref(id)?;
1154 let labels = record.labels.iter().copied().collect();
1155 let properties =
1156 Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
1157 Some(Relation::new(id, record.relation_type, labels, properties))
1158 }
1159
1160 /// Returns an owned incidence record.
1161 ///
1162 /// # Performance
1163 ///
1164 /// This method is `O(log change + log n)`.
1165 #[must_use]
1166 pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
1167 self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
1168 }
1169
1170 /// Returns every visible incidence attached to an element, in ascending
1171 /// incidence-id order.
1172 ///
1173 /// The merged set mixes overlay-owned and base-borrowed records, so this
1174 /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
1175 /// cheap).
1176 ///
1177 /// # Performance
1178 ///
1179 /// This method is `O(base incidences + overlay incidence change)`.
1180 #[must_use]
1181 pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1182 self.snapshot.view().element_incidences(id)
1183 }
1184
1185 /// Returns a binary relation's two endpoint elements, ordered by ascending
1186 /// incidence id.
1187 ///
1188 /// Reads the relation's incidences from the reverse-adjacency index and
1189 /// returns the elements carried by its first two incidences in id order. A
1190 /// relation with fewer than two visible incidences returns `None`. This
1191 /// reports endpoints structurally, without consulting any projection's
1192 /// source/target roles — use [`Self::neighbors`] when role direction matters.
1193 ///
1194 /// # Performance
1195 ///
1196 /// This method is `O(degree)` over the relation's incidences.
1197 #[must_use]
1198 pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
1199 let incidences = self.snapshot.view().relation_incidences(relation);
1200 match incidences.as_slice() {
1201 [first, second, ..] => Some((first.element, second.element)),
1202 _too_few => None,
1203 }
1204 }
1205
1206 /// Returns the elements reachable from `element` along relations of
1207 /// `relation_type`, in ascending element-id order.
1208 ///
1209 /// Direction selects the role `element` must play on each relation. Endpoint
1210 /// roles are encoded by incidence-creation order: a binary relation's source
1211 /// is its lower incidence id and its target the higher (see
1212 /// [`Self::endpoints`]). `Outgoing` requires `element` to be the source (and
1213 /// yields the target), `Incoming` requires it to be the target (and yields
1214 /// the source), and `Both` yields the opposite endpoint either way. Resolved
1215 /// over the reverse-adjacency index — each incidence of `element` whose
1216 /// relation has the requested type contributes that relation's other
1217 /// endpoint — so this works for any binary relation without a materialized
1218 /// projection.
1219 ///
1220 /// # Performance
1221 ///
1222 /// This method is `O(degree of element + sum of touched relation degrees)`.
1223 #[must_use]
1224 pub fn neighbors(
1225 &self,
1226 element: ElementId,
1227 relation_type: RelationTypeId,
1228 direction: Direction,
1229 ) -> Vec<ElementId> {
1230 let view = self.snapshot.view();
1231 let mut neighbors = BTreeSet::new();
1232 for incidence in view.element_incidences(element) {
1233 let matches_type = view
1234 .relation_ref(incidence.relation)
1235 .is_some_and(|record| record.relation_type == Some(relation_type));
1236 if !matches_type {
1237 continue;
1238 }
1239 // The incidence id encodes the endpoint role: the source endpoint is
1240 // created first (lower incidence id), the target second. Compare
1241 // `element`'s incidence id against each other endpoint's to decide
1242 // which side `element` is on, then follow per the requested direction.
1243 neighbors.extend(
1244 view.relation_incidences(incidence.relation)
1245 .into_iter()
1246 .filter(|other| other.element != element)
1247 .filter(|other| follow_direction(direction, incidence.id, other.id))
1248 .map(|other| other.element),
1249 );
1250 }
1251 neighbors.into_iter().collect()
1252 }
1253
1254 /// Returns one owned property value.
1255 ///
1256 /// # Performance
1257 ///
1258 /// This method is `O(log subjects + log keys)`.
1259 #[must_use]
1260 pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
1261 self.snapshot
1262 .view()
1263 .property_ref(subject, key)
1264 .map(Cow::into_owned)
1265 }
1266
1267 /// Returns the owned element whose value in `index` equals `value`, or `None`
1268 /// when no element matches.
1269 ///
1270 /// # Errors
1271 ///
1272 /// Returns [`DbError`] when the index is unknown or is not an equality index.
1273 ///
1274 /// # Performance
1275 ///
1276 /// This method is `O(log n + label count + property count)`.
1277 pub fn element_by_key<T: ValueType>(
1278 &self,
1279 index: EqualityIndex<T>,
1280 value: impl Assignable<T>,
1281 ) -> Result<Option<Element>, DbError> {
1282 let value = value.into_value()?;
1283 let matched = self
1284 .lookup(index.id(), Match::Equal(&value))?
1285 .into_iter()
1286 .find_map(|subject| match subject {
1287 PropertySubject::Element(id) => Some(id),
1288 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
1289 });
1290 Ok(matched.and_then(|id| self.element(id)))
1291 }
1292
1293 /// Returns the number of subjects carried by a membership index (a label or
1294 /// relation-type index).
1295 ///
1296 /// # Errors
1297 ///
1298 /// Returns [`DbError`] when the index is unknown or does not support
1299 /// membership enumeration.
1300 ///
1301 /// # Performance
1302 ///
1303 /// This method is `O(indexed family size)`.
1304 pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
1305 self.lookup(index, Match::All)
1306 .map(|subjects| subjects.len())
1307 }
1308
1309 /// Looks up subjects with a property value.
1310 ///
1311 /// # Errors
1312 ///
1313 /// Returns [`DbError`] when the property key is unknown or `value` does not
1314 /// match the key schema.
1315 ///
1316 /// # Performance
1317 ///
1318 /// This method is `O(property subject count)`.
1319 pub fn lookup_property_equal(
1320 &self,
1321 key: PropertyKeyId,
1322 value: &PropertyValue,
1323 ) -> Result<Vec<PropertySubject>, DbError> {
1324 self.snapshot.view().typed_property_equal(key, value)
1325 }
1326
1327 /// Looks up subjects with a property inside an inclusive range.
1328 ///
1329 /// # Errors
1330 ///
1331 /// Returns [`DbError`] when the property key is unknown or either bound
1332 /// does not match the key schema.
1333 ///
1334 /// # Performance
1335 ///
1336 /// This method is `O(property subject count)`.
1337 pub fn lookup_property_range(
1338 &self,
1339 key: PropertyKeyId,
1340 min: &PropertyValue,
1341 max: &PropertyValue,
1342 ) -> Result<Vec<PropertySubject>, DbError> {
1343 self.snapshot.view().typed_property_range(key, min, max)
1344 }
1345
1346 /// Executes an index lookup.
1347 ///
1348 /// # Errors
1349 ///
1350 /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1351 /// match the index kind, or supplied property values do not match catalog
1352 /// schemas.
1353 ///
1354 /// # Performance
1355 ///
1356 /// This method is `O(indexed family size)`.
1357 pub fn lookup(
1358 &self,
1359 index: IndexId,
1360 lookup: Match<'_>,
1361 ) -> Result<Vec<PropertySubject>, DbError> {
1362 let view = self.snapshot.view();
1363 let entry = view
1364 .catalog()
1365 .index(index)
1366 .ok_or(DbError::UnknownIndex { id: index })?;
1367 match (&entry.definition, lookup) {
1368 (IndexDefinition::Label { label }, Match::All) => Ok(view
1369 .elements_with_label(*label)
1370 .into_iter()
1371 .map(PropertySubject::Element)
1372 .collect()),
1373 (IndexDefinition::Label { .. }, _lookup) => {
1374 Err(DbError::unsupported("label index expects all lookup"))
1375 }
1376 (IndexDefinition::RelationType { relation_type }, Match::All) => Ok(view
1377 .relations_with_type(*relation_type)
1378 .into_iter()
1379 .map(PropertySubject::Relation)
1380 .collect()),
1381 (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1382 "relation type index expects all lookup",
1383 )),
1384 (IndexDefinition::PropertyEquality { key }, Match::Equal(value)) => {
1385 view.typed_property_equal(*key, value)
1386 }
1387 (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1388 "property equality index expects equality lookup",
1389 )),
1390 (IndexDefinition::PropertyRange { key }, Match::Range { min, max }) => {
1391 view.typed_property_range(*key, min, max)
1392 }
1393 (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1394 "property range index expects range lookup",
1395 )),
1396 (IndexDefinition::CompositeEquality { keys }, Match::Composite(values)) => {
1397 view.typed_property_composite_equal(keys, values)
1398 }
1399 (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1400 "composite equality index expects composite equality lookup",
1401 )),
1402 (IndexDefinition::Projection { projection }, Match::All) => {
1403 self.projection_index_subjects(*projection)
1404 }
1405 (IndexDefinition::Projection { .. }, _lookup) => {
1406 Err(DbError::unsupported("projection index expects all lookup"))
1407 }
1408 }
1409 }
1410
1411 /// Materializes a graph projection.
1412 ///
1413 /// # Errors
1414 ///
1415 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1416 /// fails validation against current topology.
1417 ///
1418 /// # Performance
1419 ///
1420 /// This method is `O(relation count * incidence count)`.
1421 pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1422 let view = self.snapshot.view();
1423 let entry = view
1424 .catalog()
1425 .projection(id)
1426 .ok_or(DbError::UnknownProjection { id })?;
1427 match &entry.definition {
1428 ProjectionDefinition::Graph(definition) => {
1429 projection::GraphProjection::from_state(&view, definition.clone())
1430 }
1431 ProjectionDefinition::Hypergraph(_definition) => {
1432 Err(DbError::invalid_projection("projection is not a graph"))
1433 }
1434 }
1435 }
1436
1437 /// Materializes a graph projection by catalog name.
1438 ///
1439 /// # Errors
1440 ///
1441 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1442 /// fails validation against current topology.
1443 ///
1444 /// # Performance
1445 ///
1446 /// This method is `O(log projection count + relation count * incidence count)`.
1447 pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1448 let id = self
1449 .snapshot
1450 .view()
1451 .catalog()
1452 .projection_id(name)
1453 .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1454 self.graph_projection(id)
1455 }
1456
1457 /// Walks a cataloged graph projection from canonical seed elements,
1458 /// returning the discovered nodes AND the projection edges among them.
1459 ///
1460 /// Nodes are unique canonical elements in BFS first-discovery order; depth is
1461 /// the shortest discovered hop count from any seed. Edges connect two
1462 /// discovered nodes, ordered deterministically and unique by relation, so the
1463 /// [`Subgraph`] never references a node it omitted.
1464 ///
1465 /// # Errors
1466 ///
1467 /// Returns [`DbError`] when the projection is unknown, is not a graph,
1468 /// cannot be materialized, or a seed element is not part of the projection.
1469 ///
1470 /// # Performance
1471 ///
1472 /// This method is `O(relation count * incidence count + visited edges)`.
1473 pub fn walk(
1474 &self,
1475 projection: ProjectionId,
1476 seeds: &[ElementId],
1477 walk: Walk,
1478 ) -> Result<Subgraph, DbError> {
1479 if seeds.is_empty() || walk.limit == 0 {
1480 return Ok(Subgraph::default());
1481 }
1482 let graph = self.graph_projection(projection)?;
1483 traversal::walk_graph_projection(&graph, seeds, walk)
1484 }
1485
1486 /// Ranks a cataloged graph projection by personalized `PageRank`, returning
1487 /// every projection element paired with its rank, ordered highest first.
1488 ///
1489 /// `seeds` are the restart (teleport) set: rank mass returns to them on each
1490 /// damping step, biasing the stationary distribution toward elements
1491 /// reachable from the seeds (random walk with restart). The seed weights are
1492 /// normalized internally, so passing the seed elements is sufficient. With no
1493 /// seeds this is the uniform-teleport `PageRank` over the projection.
1494 ///
1495 /// # Errors
1496 ///
1497 /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
1498 /// be materialized, or `PageRank` rejects the configuration (the
1499 /// [`PageRankConfig`] was invalid or the power iteration did not converge).
1500 /// Seeds absent from the projection are ignored rather than erroring; with no
1501 /// resolvable seed this is the uniform-teleport rank.
1502 ///
1503 /// # Performance
1504 ///
1505 /// This method is `O(relation count * incidence count + iterations *
1506 /// (visible elements + visible edges) + visible elements * log(visible
1507 /// elements))` — the trailing term is the final rank sort.
1508 pub fn personalized_pagerank(
1509 &self,
1510 projection: ProjectionId,
1511 seeds: &[ElementId],
1512 config: PageRankConfig<f64>,
1513 ) -> Result<Vec<(ElementId, f64)>, DbError> {
1514 let graph = self.graph_projection(projection)?;
1515 let bound = graph.element_bound();
1516 let element_count = u32::try_from(bound).map_err(|_| {
1517 DbError::traversal("projection exceeds the personalized pagerank index bound")
1518 })?;
1519
1520 let mut personalization = vec![0.0_f64; bound];
1521 let mut seeded = false;
1522 for &seed in seeds {
1523 if let Some(local) = graph.local_element_id(seed) {
1524 personalization[graph.element_index(local)] = 1.0;
1525 seeded = true;
1526 }
1527 }
1528
1529 let mut ranks = vec![0.0_f64; bound];
1530 let mut workspace = PageRankWorkspace::for_graph(&graph);
1531 pagerank_graph_with_workspace(
1532 &graph,
1533 &Uniform,
1534 (0..element_count).map(ProjectionElementId::new),
1535 config,
1536 seeded.then_some(personalization.as_slice()),
1537 &mut ranks,
1538 &mut workspace,
1539 )
1540 .map_err(|error| {
1541 DbError::traversal(match error {
1542 PageRankError::InvalidDamping { .. }
1543 | PageRankError::InvalidTolerance { .. }
1544 | PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
1545 PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
1546 _ => "personalized pagerank failed",
1547 })
1548 })?;
1549
1550 let mut ranked: Vec<(ElementId, f64)> = (0..element_count)
1551 .map(|index| {
1552 let local = ProjectionElementId::new(index);
1553 (
1554 graph.canonical_element_id(local),
1555 ranks[graph.element_index(local)],
1556 )
1557 })
1558 .collect();
1559 ranked.sort_by(|left, right| right.1.total_cmp(&left.1));
1560 Ok(ranked)
1561 }
1562
1563 /// Returns the longest chain of canonical elements along the projection's
1564 /// outgoing edges within the subgraph induced by `elements`.
1565 ///
1566 /// Only edges whose endpoints are both in `elements` participate. The path
1567 /// lists each element once from start to end; its length in edges is
1568 /// `path.len() - 1`. An empty `elements` slice yields an empty path.
1569 ///
1570 /// # Errors
1571 ///
1572 /// Returns [`DbError`] when the projection is unknown, is not a graph, cannot
1573 /// be materialized, or the induced subgraph contains a cycle. Elements absent
1574 /// from the projection are ignored, so the chain is computed over the present
1575 /// subset.
1576 ///
1577 /// # Performance
1578 ///
1579 /// This method is `O(relation count * incidence count + visible elements +
1580 /// visible edges)`.
1581 pub fn longest_path(
1582 &self,
1583 projection: ProjectionId,
1584 elements: &[ElementId],
1585 ) -> Result<Vec<ElementId>, DbError> {
1586 if elements.is_empty() {
1587 return Ok(Vec::new());
1588 }
1589 let graph = self.graph_projection(projection)?;
1590 let locals = elements
1591 .iter()
1592 .filter_map(|&element| graph.local_element_id(element))
1593 .collect::<Vec<ProjectionElementId>>();
1594 let path = longest_path_dag(&graph, &locals)
1595 .map_err(|_| DbError::traversal("longest path found a cycle"))?;
1596 Ok(path
1597 .into_iter()
1598 .map(|local| graph.canonical_element_id(local))
1599 .collect())
1600 }
1601
1602 /// Materializes a hypergraph projection.
1603 ///
1604 /// # Errors
1605 ///
1606 /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1607 /// or fails validation against current topology.
1608 ///
1609 /// # Performance
1610 ///
1611 /// This method is `O(relation count * incidence count)`.
1612 pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1613 let view = self.snapshot.view();
1614 let entry = view
1615 .catalog()
1616 .projection(id)
1617 .ok_or(DbError::UnknownProjection { id })?;
1618 match &entry.definition {
1619 ProjectionDefinition::Hypergraph(definition) => {
1620 projection::HypergraphProjection::from_state(&view, definition.clone())
1621 }
1622 ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1623 "projection is not a hypergraph",
1624 )),
1625 }
1626 }
1627
1628 /// Executes a prepared query.
1629 ///
1630 /// # Errors
1631 ///
1632 /// Returns [`DbError`] when execution cannot materialize a referenced
1633 /// projection.
1634 ///
1635 /// # Performance
1636 ///
1637 /// This method is `O(plan output + projection build cost when used)`.
1638 pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1639 query.execute(&self.snapshot.view())
1640 }
1641
1642 /// Explains a prepared query.
1643 ///
1644 /// # Performance
1645 ///
1646 /// This method is `O(plan size)`.
1647 #[must_use]
1648 pub fn explain(&self, query: &PreparedQuery) -> String {
1649 query.explain()
1650 }
1651
1652 /// Materializes subjects represented by a projection index.
1653 ///
1654 /// # Errors
1655 ///
1656 /// Returns [`DbError`] when the projection is unknown or cannot be
1657 /// materialized.
1658 ///
1659 /// # Performance
1660 ///
1661 /// This method is `O(relation count * incidence count)`.
1662 fn projection_index_subjects(
1663 &self,
1664 projection: ProjectionId,
1665 ) -> Result<Vec<PropertySubject>, DbError> {
1666 let view = self.snapshot.view();
1667 let entry = view
1668 .catalog()
1669 .projection(projection)
1670 .ok_or(DbError::UnknownProjection { id: projection })?;
1671 match &entry.definition {
1672 ProjectionDefinition::Graph(definition) => {
1673 Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1674 }
1675 ProjectionDefinition::Hypergraph(definition) => Ok(
1676 projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1677 ),
1678 }
1679 }
1680}
1681
1682/// Single writer transaction.
1683///
1684/// Mutations accumulate into a private write overlay layered over the parent
1685/// snapshot; reads fall through the overlay then the base. `commit` appends the
1686/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1687/// `rollback` drops the overlay and appends nothing.
1688///
1689/// # Performance
1690///
1691/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1692pub struct Writer<'db> {
1693 /// Db receiving the commit.
1694 database: &'db mut Db,
1695 /// Parent snapshot the writer layers over (its base + frozen overlay).
1696 parent: Arc<Snapshot>,
1697 /// Private mutable delta this writer accumulates.
1698 delta: WriteOverlay,
1699 /// Writer transaction id (session-local until a dirty commit makes it
1700 /// durable).
1701 transaction_id: TransactionId,
1702 /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1703 /// transaction ends (on `rollback`, or on any early-return error path); a
1704 /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1705 /// triggered auto-checkpoint can re-acquire it.
1706 lock: WriterLock,
1707}
1708
1709impl Writer<'_> {
1710 /// Registers a structural incidence role.
1711 ///
1712 /// # Errors
1713 ///
1714 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1715 ///
1716 /// # Performance
1717 ///
1718 /// This method is `O(log role count + name length)`.
1719 pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1720 self.delta.register_role(name.into())
1721 }
1722
1723 /// Registers an element or relation label.
1724 ///
1725 /// # Errors
1726 ///
1727 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1728 ///
1729 /// # Performance
1730 ///
1731 /// This method is `O(log label count + name length)`.
1732 pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1733 self.delta.register_label(name.into())
1734 }
1735
1736 /// Registers a relation type.
1737 ///
1738 /// # Errors
1739 ///
1740 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1741 ///
1742 /// # Performance
1743 ///
1744 /// This method is `O(log relation type count + name length)`.
1745 pub fn register_relation_type(
1746 &mut self,
1747 name: impl Into<String>,
1748 ) -> Result<RelationTypeId, DbError> {
1749 self.delta.register_relation_type(name.into())
1750 }
1751
1752 /// Registers a typed property key.
1753 ///
1754 /// # Errors
1755 ///
1756 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1757 ///
1758 /// # Performance
1759 ///
1760 /// This method is `O(log property key count + name length)`.
1761 pub fn register_property_key(
1762 &mut self,
1763 name: impl Into<String>,
1764 family: PropertyFamily,
1765 value_type: PropertyType,
1766 ) -> Result<PropertyKeyId, DbError> {
1767 self.delta
1768 .register_property_key(name.into(), family, value_type)
1769 }
1770
1771 /// Defines a physical projection.
1772 ///
1773 /// # Errors
1774 ///
1775 /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1776 /// projection name already exists, or ID allocation fails.
1777 ///
1778 /// # Performance
1779 ///
1780 /// This method is `O(definition size + catalog lookup cost)`.
1781 pub fn define_projection(
1782 &mut self,
1783 definition: ProjectionDefinition,
1784 ) -> Result<ProjectionId, DbError> {
1785 self.validate_projection_definition(&definition)?;
1786 self.delta.register_projection(definition)
1787 }
1788
1789 /// Defines an index.
1790 ///
1791 /// # Errors
1792 ///
1793 /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1794 /// name already exists, or ID allocation fails.
1795 ///
1796 /// # Performance
1797 ///
1798 /// This method is `O(definition size + catalog lookup cost)`.
1799 pub fn define_index(
1800 &mut self,
1801 name: impl Into<String>,
1802 definition: IndexDefinition,
1803 ) -> Result<IndexId, DbError> {
1804 self.validate_index_definition(&definition)?;
1805 self.delta.register_index(name.into(), definition)
1806 }
1807
1808 /// Applies a declarative [`Schema`] idempotently (register-or-get every
1809 /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
1810 /// the same schema reuses existing ids; a name that already exists with a
1811 /// conflicting shape is a [`DbError::SchemaConflict`].
1812 ///
1813 /// # Errors
1814 ///
1815 /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
1816 /// index's key, a projection's role/type), or id-allocation failure.
1817 ///
1818 /// # Performance
1819 ///
1820 /// This method is `O(declared items × log catalog)`.
1821 pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
1822 let mut bound = Bound::default();
1823 for name in &schema.roles {
1824 let id = match self.merged().catalog().role_id(name) {
1825 Some(id) => id,
1826 None => self.register_role(name.clone())?,
1827 };
1828 bound.roles.insert(name.clone(), id);
1829 }
1830 for name in &schema.labels {
1831 let id = match self.merged().catalog().label_id(name) {
1832 Some(id) => id,
1833 None => self.register_label(name.clone())?,
1834 };
1835 bound.labels.insert(name.clone(), id);
1836 }
1837 for name in &schema.relation_types {
1838 let id = match self.merged().catalog().relation_type_id(name) {
1839 Some(id) => id,
1840 None => self.register_relation_type(name.clone())?,
1841 };
1842 bound.relation_types.insert(name.clone(), id);
1843 }
1844 for (name, family, value_type) in &schema.keys {
1845 let id = self.register_key_or_get(name, *family, *value_type)?;
1846 bound.keys.insert(name.clone(), (id, *value_type));
1847 }
1848 for (name, key_name) in &schema.equality_indexes {
1849 let (key_id, value_type) =
1850 *bound
1851 .keys
1852 .get(key_name)
1853 .ok_or_else(|| DbError::UnknownName {
1854 kind: "property key",
1855 name: key_name.clone(),
1856 })?;
1857 let id = match self.merged().catalog().index_id(name) {
1858 Some(id) => id,
1859 None => self.define_index(
1860 name.clone(),
1861 IndexDefinition::PropertyEquality { key: key_id },
1862 )?,
1863 };
1864 bound
1865 .equality_indexes
1866 .insert(name.clone(), (id, value_type));
1867 }
1868 for spec in &schema.graph_projections {
1869 let id = match self.merged().catalog().projection_id(&spec.name) {
1870 Some(id) => id,
1871 None => self.define_graph_projection(spec, &bound)?,
1872 };
1873 bound.projections.insert(spec.name.clone(), id);
1874 }
1875 Ok(bound)
1876 }
1877
1878 /// Registers a property key, or returns the existing id when the name is
1879 /// already present with a matching family and value type.
1880 ///
1881 /// # Errors
1882 ///
1883 /// Returns [`DbError::SchemaConflict`] when the name exists with a different
1884 /// family or value type.
1885 ///
1886 /// # Performance
1887 ///
1888 /// This method is `O(log catalog)`.
1889 fn register_key_or_get(
1890 &mut self,
1891 name: &str,
1892 family: PropertyFamily,
1893 value_type: PropertyType,
1894 ) -> Result<PropertyKeyId, DbError> {
1895 let Some(existing) = self.merged().catalog().property_key_id(name) else {
1896 return self.register_property_key(name.to_owned(), family, value_type);
1897 };
1898 let matches = self
1899 .merged()
1900 .catalog()
1901 .property_key(existing)
1902 .is_some_and(|def| def.family == family && def.value_type == value_type);
1903 if matches {
1904 Ok(existing)
1905 } else {
1906 Err(DbError::SchemaConflict {
1907 name: name.to_owned(),
1908 reason: "property key family/value type differs from the existing catalog entry",
1909 })
1910 }
1911 }
1912
1913 /// Defines a graph projection from a spec, resolving its relation-type and
1914 /// role names through `bound`.
1915 ///
1916 /// # Errors
1917 ///
1918 /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
1919 /// a definition error.
1920 ///
1921 /// # Performance
1922 ///
1923 /// This method is `O(relation-type count × log catalog)`.
1924 fn define_graph_projection(
1925 &mut self,
1926 spec: &GraphProjectionSpec,
1927 bound: &Bound,
1928 ) -> Result<ProjectionId, DbError> {
1929 let mut relation_types = BTreeSet::new();
1930 for name in &spec.relation_types {
1931 relation_types.insert(bound.relation_type(name)?);
1932 }
1933 let source_role = bound.role(&spec.source_role)?;
1934 let target_role = bound.role(&spec.target_role)?;
1935 self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
1936 name: spec.name.clone(),
1937 relation_types,
1938 source_role,
1939 target_role,
1940 }))
1941 }
1942
1943 /// Creates a canonical element.
1944 ///
1945 /// # Errors
1946 ///
1947 /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1948 ///
1949 /// # Performance
1950 ///
1951 /// This method is `O(log element change)`.
1952 pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1953 self.delta.create_element()
1954 }
1955
1956 /// Creates a canonical relation.
1957 ///
1958 /// # Errors
1959 ///
1960 /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1961 ///
1962 /// # Performance
1963 ///
1964 /// This method is `O(log relation change)`.
1965 pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1966 self.delta.create_relation()
1967 }
1968
1969 /// Creates a canonical incidence.
1970 ///
1971 /// # Errors
1972 ///
1973 /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1974 /// exhausted.
1975 ///
1976 /// # Performance
1977 ///
1978 /// This method is `O(log incidence change + reference lookup cost)`.
1979 pub fn create_incidence(
1980 &mut self,
1981 relation: RelationId,
1982 element: ElementId,
1983 role: RoleId,
1984 ) -> Result<IncidenceId, DbError> {
1985 self.require_relation(relation)?;
1986 self.require_element(element)?;
1987 self.require_role(role)?;
1988 self.delta.create_incidence(relation, element, role)
1989 }
1990
1991 /// Tombstones a canonical element and its incidences.
1992 ///
1993 /// # Errors
1994 ///
1995 /// Returns [`DbError::UnknownElement`] when the element is not visible.
1996 ///
1997 /// # Performance
1998 ///
1999 /// This method is `O(log n + degree)` via the reverse-adjacency index.
2000 pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
2001 self.require_element(id)?;
2002 // Cascade: every incidence on the element — resolved in O(log n + degree)
2003 // through the reverse-adjacency index, not a full incidence scan — is
2004 // tombstoned too.
2005 let incidences: Vec<IncidenceId> = self
2006 .merged()
2007 .element_incidences(id)
2008 .into_iter()
2009 .map(|record| record.id)
2010 .collect();
2011 let base = self.parent.base_records();
2012 self.delta.tombstone_element(base, id);
2013 for incidence in incidences {
2014 self.delta
2015 .tombstone_incidence(self.parent.base_records(), incidence);
2016 }
2017 Ok(())
2018 }
2019
2020 /// Tombstones a canonical relation and its incidences.
2021 ///
2022 /// # Errors
2023 ///
2024 /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
2025 ///
2026 /// # Performance
2027 ///
2028 /// This method is `O(log n + degree)` via the reverse-adjacency index.
2029 pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
2030 self.require_relation(id)?;
2031 // Cascade: every incidence in the relation — resolved in O(log n + degree)
2032 // through the reverse-adjacency index, not a full incidence scan.
2033 let incidences: Vec<IncidenceId> = self
2034 .merged()
2035 .relation_incidences(id)
2036 .into_iter()
2037 .map(|record| record.id)
2038 .collect();
2039 let base = self.parent.base_records();
2040 self.delta.tombstone_relation(base, id);
2041 for incidence in incidences {
2042 self.delta
2043 .tombstone_incidence(self.parent.base_records(), incidence);
2044 }
2045 Ok(())
2046 }
2047
2048 /// Tombstones a canonical incidence.
2049 ///
2050 /// # Errors
2051 ///
2052 /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
2053 ///
2054 /// # Performance
2055 ///
2056 /// This method is `O(log incidence change)`.
2057 pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
2058 self.require_incidence(id)?;
2059 self.delta
2060 .tombstone_incidence(self.parent.base_records(), id);
2061 Ok(())
2062 }
2063
2064 /// Adds a label to an element.
2065 ///
2066 /// # Errors
2067 ///
2068 /// Returns [`DbError`] when the element or label is unknown.
2069 ///
2070 /// # Performance
2071 ///
2072 /// This method is `O(log element change + log label count)`.
2073 pub(crate) fn add_element_label(
2074 &mut self,
2075 element: ElementId,
2076 label: LabelId,
2077 ) -> Result<(), DbError> {
2078 self.require_element(element)?;
2079 self.require_label(label)?;
2080 self.delta
2081 .add_element_label(self.parent.base_records(), element, label);
2082 Ok(())
2083 }
2084
2085 /// Adds a label to a relation.
2086 ///
2087 /// # Errors
2088 ///
2089 /// Returns [`DbError`] when the relation or label is unknown.
2090 ///
2091 /// # Performance
2092 ///
2093 /// This method is `O(log relation change + log label count)`.
2094 pub(crate) fn add_relation_label(
2095 &mut self,
2096 relation: RelationId,
2097 label: LabelId,
2098 ) -> Result<(), DbError> {
2099 self.require_relation(relation)?;
2100 self.require_label(label)?;
2101 self.delta
2102 .add_relation_label(self.parent.base_records(), relation, label);
2103 Ok(())
2104 }
2105
2106 /// Sets a relation type.
2107 ///
2108 /// # Errors
2109 ///
2110 /// Returns [`DbError`] when the relation or relation type is unknown.
2111 ///
2112 /// # Performance
2113 ///
2114 /// This method is `O(log relation change + log relation type count)`.
2115 pub fn set_relation_type(
2116 &mut self,
2117 relation: RelationId,
2118 relation_type: RelationTypeId,
2119 ) -> Result<(), DbError> {
2120 self.require_relation(relation)?;
2121 self.require_relation_type(relation_type)?;
2122 self.delta
2123 .set_relation_type(self.parent.base_records(), relation, relation_type);
2124 Ok(())
2125 }
2126
2127 /// Sets a property value.
2128 ///
2129 /// # Errors
2130 ///
2131 /// Returns [`DbError`] when the subject or key is unknown, or the value
2132 /// does not match the key schema.
2133 ///
2134 /// # Performance
2135 ///
2136 /// This method is `O(log subject change + log key count)`.
2137 pub(crate) fn set_property(
2138 &mut self,
2139 subject: PropertySubject,
2140 key: PropertyKeyId,
2141 value: PropertyValue,
2142 ) -> Result<(), DbError> {
2143 // Referential integrity: the subject must be visible (this rejects an
2144 // orphan property against a tombstoned/absent subject at the transaction
2145 // boundary — the overlay layer is permissive by design).
2146 self.require_subject(subject)?;
2147 let definition = self
2148 .merged()
2149 .catalog()
2150 .property_key(key)
2151 .cloned()
2152 .ok_or(DbError::UnknownPropertyKey { id: key })?;
2153 if definition.family != subject.family() {
2154 return Err(DbError::WrongPropertyFamily {
2155 expected: definition.family,
2156 actual: subject.family(),
2157 });
2158 }
2159 if definition.value_type != value.value_type() {
2160 return Err(DbError::PropertyTypeMismatch {
2161 expected: definition.value_type,
2162 actual: value.value_type(),
2163 });
2164 }
2165 self.delta
2166 .set_property(self.parent.base_records(), subject, key, value);
2167 Ok(())
2168 }
2169
2170 /// Removes a property value.
2171 ///
2172 /// # Errors
2173 ///
2174 /// Returns [`DbError`] when the subject or key is unknown.
2175 ///
2176 /// # Performance
2177 ///
2178 /// This method is `O(log subject change + log key count)`.
2179 pub(crate) fn remove_property(
2180 &mut self,
2181 subject: PropertySubject,
2182 key: PropertyKeyId,
2183 ) -> Result<(), DbError> {
2184 self.require_subject(subject)?;
2185 if self.merged().catalog().property_key(key).is_none() {
2186 return Err(DbError::UnknownPropertyKey { id: key });
2187 }
2188 self.delta
2189 .remove_property(self.parent.base_records(), subject, key);
2190 Ok(())
2191 }
2192
2193 /// Resolves the property key an equality index covers.
2194 ///
2195 /// # Errors
2196 ///
2197 /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
2198 /// unsupported-query error when it is not a property-equality index.
2199 ///
2200 /// # Performance
2201 ///
2202 /// This method is `O(log index count)`.
2203 fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
2204 let view = self.merged();
2205 let entry = view
2206 .catalog()
2207 .index(index)
2208 .ok_or(DbError::UnknownIndex { id: index })?;
2209 match &entry.definition {
2210 IndexDefinition::PropertyEquality { key } => Ok(*key),
2211 _other => Err(DbError::unsupported(
2212 "reconcile requires a property-equality index",
2213 )),
2214 }
2215 }
2216
2217 /// Inserts or updates the element whose value under `index` equals `value`,
2218 /// returning its canonical id — reused when an element already carries that
2219 /// identity value (id stable across reconcile), freshly minted (a never-reused
2220 /// id, with the identity property set) otherwise.
2221 ///
2222 /// # Errors
2223 ///
2224 /// Returns [`DbError`] when `index` is not an equality index or the value
2225 /// type mismatches the key schema.
2226 ///
2227 /// # Performance
2228 ///
2229 /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
2230 pub fn upsert_element<T: ValueType>(
2231 &mut self,
2232 index: EqualityIndex<T>,
2233 value: impl Assignable<T>,
2234 ) -> Result<ElementId, DbError> {
2235 let value = value.into_value()?;
2236 let key = self.equality_index_key(index.id())?;
2237 let existing = self
2238 .merged()
2239 .property_equal(key, &value)
2240 .into_iter()
2241 .find_map(|subject| match subject {
2242 PropertySubject::Element(id) => Some(id),
2243 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
2244 });
2245 if let Some(id) = existing {
2246 return Ok(id);
2247 }
2248 let element = self.create_element()?;
2249 self.set_property(PropertySubject::Element(element), key, value)?;
2250 Ok(element)
2251 }
2252
2253 /// Inserts or updates the relation whose value under `index` equals `value`,
2254 /// returning its canonical id. On a miss it mints the relation, sets its type
2255 /// and identity property, and creates one incidence per `(element, role)`
2256 /// endpoint; on a hit the existing relation (with its endpoints) is reused
2257 /// unchanged — the identity value encodes the endpoints, so they are immutable.
2258 ///
2259 /// # Errors
2260 ///
2261 /// Returns [`DbError`] when `index` is not an equality index, the value type
2262 /// mismatches, or an endpoint element does not exist.
2263 ///
2264 /// # Performance
2265 ///
2266 /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
2267 pub fn upsert_relation<T: ValueType>(
2268 &mut self,
2269 index: EqualityIndex<T>,
2270 value: impl Assignable<T>,
2271 relation_type: RelationTypeId,
2272 endpoints: &[(ElementId, RoleId)],
2273 ) -> Result<RelationId, DbError> {
2274 let value = value.into_value()?;
2275 let key = self.equality_index_key(index.id())?;
2276 let existing = self
2277 .merged()
2278 .property_equal(key, &value)
2279 .into_iter()
2280 .find_map(|subject| match subject {
2281 PropertySubject::Relation(id) => Some(id),
2282 PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
2283 });
2284 if let Some(id) = existing {
2285 return Ok(id);
2286 }
2287 let relation = self.create_relation()?;
2288 self.set_relation_type(relation, relation_type)?;
2289 self.set_property(PropertySubject::Relation(relation), key, value)?;
2290 for (element, role) in endpoints {
2291 self.create_incidence(relation, *element, *role)?;
2292 }
2293 Ok(relation)
2294 }
2295
2296 /// Tombstones every subject carried by `index` whose identity value is NOT in
2297 /// `keep`, cascading each subject's incidences in `O(degree)` via the
2298 /// reverse-adjacency index. The prune half of a reconcile: after upserting
2299 /// every desired subject, `retain` removes the vanished complement.
2300 ///
2301 /// # Errors
2302 ///
2303 /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
2304 /// type mismatches the key schema.
2305 ///
2306 /// # Performance
2307 ///
2308 /// This method is `O(family size + removed × degree)`.
2309 pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
2310 &mut self,
2311 index: EqualityIndex<T>,
2312 keep: &[V],
2313 ) -> Result<(), DbError> {
2314 let key = self.equality_index_key(index.id())?;
2315 let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
2316 for value in keep {
2317 keep_values.insert((*value).into_value()?);
2318 }
2319 let stale: Vec<PropertySubject> = self
2320 .merged()
2321 .property_key_subjects(key)
2322 .into_iter()
2323 .filter(|(_subject, value)| !keep_values.contains(value))
2324 .map(|(subject, _value)| subject)
2325 .collect();
2326 for subject in stale {
2327 match subject {
2328 PropertySubject::Element(id) => self.tombstone_element(id)?,
2329 PropertySubject::Relation(id) => self.tombstone_relation(id)?,
2330 PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
2331 }
2332 }
2333 Ok(())
2334 }
2335
2336 /// Sets a typed property on a subject; the value type is checked at compile
2337 /// time against the key.
2338 ///
2339 /// # Errors
2340 ///
2341 /// Returns [`DbError`] when the subject is absent, the value is out of range,
2342 /// or the value type mismatches the key schema.
2343 ///
2344 /// # Performance
2345 ///
2346 /// This method is `O(log change + log keys)`.
2347 pub fn set<T: ValueType>(
2348 &mut self,
2349 subject: impl Into<PropertySubject>,
2350 key: Key<T>,
2351 value: impl Assignable<T>,
2352 ) -> Result<(), DbError> {
2353 self.set_property(subject.into(), key.id(), value.into_value()?)
2354 }
2355
2356 /// Removes a typed property from a subject.
2357 ///
2358 /// # Errors
2359 ///
2360 /// Returns [`DbError`] when the subject is absent or the key is unknown.
2361 ///
2362 /// # Performance
2363 ///
2364 /// This method is `O(log change + log keys)`.
2365 pub fn unset<T: ValueType>(
2366 &mut self,
2367 subject: impl Into<PropertySubject>,
2368 key: Key<T>,
2369 ) -> Result<(), DbError> {
2370 self.remove_property(subject.into(), key.id())
2371 }
2372
2373 /// Adds a label to an element or relation subject.
2374 ///
2375 /// # Errors
2376 ///
2377 /// Returns [`DbError`] when the subject is absent, the label is unknown, or
2378 /// the subject is an incidence (incidences carry no labels).
2379 ///
2380 /// # Performance
2381 ///
2382 /// This method is `O(log change + log labels)`.
2383 pub fn add_label(
2384 &mut self,
2385 subject: impl Into<PropertySubject>,
2386 label: LabelId,
2387 ) -> Result<(), DbError> {
2388 match subject.into() {
2389 PropertySubject::Element(id) => self.add_element_label(id, label),
2390 PropertySubject::Relation(id) => self.add_relation_label(id, label),
2391 PropertySubject::Incidence(_) => {
2392 Err(DbError::unsupported("incidences do not carry labels"))
2393 }
2394 }
2395 }
2396
2397 /// Tombstones any subject by id, cascading a relation's or element's
2398 /// incidences in `O(degree)` via the reverse-adjacency index.
2399 ///
2400 /// # Errors
2401 ///
2402 /// Returns [`DbError`] when the subject is not visible.
2403 ///
2404 /// # Performance
2405 ///
2406 /// This method is `O(log change + degree)`.
2407 pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
2408 match subject.into() {
2409 PropertySubject::Element(id) => self.tombstone_element(id),
2410 PropertySubject::Relation(id) => self.tombstone_relation(id),
2411 PropertySubject::Incidence(id) => self.tombstone_incidence(id),
2412 }
2413 }
2414
2415 /// Commits this write transaction durably.
2416 ///
2417 /// A non-dirty commit returns the parent's commit sequence without appending
2418 /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
2419 /// log into one WAL frame (with the watermark op last), appends it with an
2420 /// fsync (truncating back to the captured EOF on any write error so no
2421 /// interior torn record survives), THEN folds the delta into a fresh
2422 /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
2423 ///
2424 /// After publishing, a dirty commit consults the configured
2425 /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
2426 /// re-acquire it), then folds when the delta-log has outgrown the base. The
2427 /// committed frame is already durable, so an auto-fold failure does not lose
2428 /// data; it is surfaced to the caller.
2429 ///
2430 /// # Errors
2431 ///
2432 /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
2433 /// durable append, or a triggered auto-checkpoint fold fails.
2434 ///
2435 /// # Performance
2436 ///
2437 /// This method is `O(change)` for the dirty path — flat as the base grows.
2438 /// The publish step shares the parent snapshot's already-materialized
2439 /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
2440 /// folds, so the base is byte-identical within the generation), so it neither
2441 /// re-decodes the base nor rebuilds the index. A triggered fold adds
2442 /// `O(visible state bytes)` on top.
2443 pub(crate) fn commit(self) -> Result<CommitSeq, DbError> {
2444 if self.delta.is_empty() {
2445 // Non-dirty commit: no append, no publish, no durable id advance.
2446 return Ok(self.parent.lsn());
2447 }
2448 let lsn = self
2449 .parent
2450 .lsn()
2451 .checked_next()
2452 .ok_or(DbError::CommitSeqOverflow)?;
2453 let (ops, blob) = self.delta.encode_frame();
2454 let frame = wal::encode_commit(
2455 lsn.get(),
2456 self.transaction_id.get(),
2457 self.database.base_generation,
2458 &ops,
2459 &blob,
2460 )?;
2461 let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
2462 wal::append_commit(&mut log, &frame)?;
2463
2464 // Durable: the delta was seeded from the parent overlay and only added
2465 // this writer's changes, so freezing it directly is the full new
2466 // published overlay (parent state + this commit). The parent overlay was
2467 // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
2468 // pinning the parent is unaffected.
2469 let new_overlay = Arc::new(self.delta.freeze());
2470 // A commit never folds, so the new snapshot pins the SAME base generation
2471 // as the parent — the base wire bytes are byte-identical, and so are the
2472 // owned records and the derived index built from them. Share the parent's
2473 // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
2474 // and rebuilding the index, which keeps a single-element commit `O(change)`
2475 // rather than `O(base)` regardless of how large the base has grown.
2476 let snapshot = Snapshot::with_shared_base_records(
2477 self.parent.generation(),
2478 lsn,
2479 Arc::clone(self.parent.base()),
2480 new_overlay,
2481 Arc::clone(self.parent.base_records()),
2482 );
2483 self.database.current = Arc::new(snapshot);
2484 self.database.last_transaction_id = self.transaction_id;
2485 // Release the writer lock before any auto-fold so the fold can re-acquire
2486 // it (a partial move out of `self`, legal because `Writer` has
2487 // no `Drop` impl; the remaining `&mut Db` borrow stays live).
2488 drop(self.lock);
2489 self.database.maybe_auto_checkpoint()?;
2490 Ok(lsn)
2491 }
2492
2493 /// Returns the merged read view this writer sees (overlay over base).
2494 ///
2495 /// # Performance
2496 ///
2497 /// This method is `O(1)` to construct.
2498 fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
2499 crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
2500 }
2501
2502 /// Requires an element to be visible in the writer's merged view.
2503 ///
2504 /// # Errors
2505 ///
2506 /// Returns [`DbError::UnknownElement`] when absent.
2507 ///
2508 /// # Performance
2509 ///
2510 /// This method is `O(log change + log n)`.
2511 fn require_element(&self, id: ElementId) -> Result<(), DbError> {
2512 if self.merged().contains_element(id) {
2513 Ok(())
2514 } else {
2515 Err(DbError::UnknownElement { id })
2516 }
2517 }
2518
2519 /// Requires a relation to be visible.
2520 ///
2521 /// # Errors
2522 ///
2523 /// Returns [`DbError::UnknownRelation`] when absent.
2524 ///
2525 /// # Performance
2526 ///
2527 /// This method is `O(log change + log n)`.
2528 fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
2529 if self.merged().contains_relation(id) {
2530 Ok(())
2531 } else {
2532 Err(DbError::UnknownRelation { id })
2533 }
2534 }
2535
2536 /// Requires an incidence to be visible.
2537 ///
2538 /// # Errors
2539 ///
2540 /// Returns [`DbError::UnknownIncidence`] when absent.
2541 ///
2542 /// # Performance
2543 ///
2544 /// This method is `O(log change + log n)`.
2545 fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
2546 if self.merged().contains_incidence(id) {
2547 Ok(())
2548 } else {
2549 Err(DbError::UnknownIncidence { id })
2550 }
2551 }
2552
2553 /// Requires a role to exist in the merged catalog.
2554 ///
2555 /// # Errors
2556 ///
2557 /// Returns [`DbError::UnknownRole`] when absent.
2558 ///
2559 /// # Performance
2560 ///
2561 /// This method is `O(log role count)`.
2562 fn require_role(&self, id: RoleId) -> Result<(), DbError> {
2563 if self.delta.catalog().role(id).is_some() {
2564 Ok(())
2565 } else {
2566 Err(DbError::UnknownRole { id })
2567 }
2568 }
2569
2570 /// Requires a label to exist in the merged catalog.
2571 ///
2572 /// # Errors
2573 ///
2574 /// Returns [`DbError::UnknownLabel`] when absent.
2575 ///
2576 /// # Performance
2577 ///
2578 /// This method is `O(log label count)`.
2579 fn require_label(&self, id: LabelId) -> Result<(), DbError> {
2580 if self.delta.catalog().label(id).is_some() {
2581 Ok(())
2582 } else {
2583 Err(DbError::UnknownLabel { id })
2584 }
2585 }
2586
2587 /// Requires a relation type to exist in the merged catalog.
2588 ///
2589 /// # Errors
2590 ///
2591 /// Returns [`DbError::UnknownRelationType`] when absent.
2592 ///
2593 /// # Performance
2594 ///
2595 /// This method is `O(log relation type count)`.
2596 fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
2597 if self.delta.catalog().relation_type(id).is_some() {
2598 Ok(())
2599 } else {
2600 Err(DbError::UnknownRelationType { id })
2601 }
2602 }
2603
2604 /// Requires a property subject to be visible.
2605 ///
2606 /// # Errors
2607 ///
2608 /// Returns the matching `Unknown*` error when the subject is absent.
2609 ///
2610 /// # Performance
2611 ///
2612 /// This method is `O(log change + log n)`.
2613 fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
2614 match subject {
2615 PropertySubject::Element(id) => self.require_element(id),
2616 PropertySubject::Relation(id) => self.require_relation(id),
2617 PropertySubject::Incidence(id) => self.require_incidence(id),
2618 }
2619 }
2620
2621 /// Validates one projection definition against the merged catalog.
2622 ///
2623 /// # Errors
2624 ///
2625 /// Returns [`DbError`] when a referenced role or relation type is unknown.
2626 ///
2627 /// # Performance
2628 ///
2629 /// This method is `O(definition size)`.
2630 fn validate_projection_definition(
2631 &self,
2632 definition: &ProjectionDefinition,
2633 ) -> Result<(), DbError> {
2634 match definition {
2635 ProjectionDefinition::Graph(graph) => {
2636 self.require_role(graph.source_role)?;
2637 self.require_role(graph.target_role)?;
2638 for relation_type in &graph.relation_types {
2639 self.require_relation_type(*relation_type)?;
2640 }
2641 Ok(())
2642 }
2643 ProjectionDefinition::Hypergraph(hyper) => {
2644 for role in &hyper.source_roles {
2645 self.require_role(*role)?;
2646 }
2647 for role in &hyper.target_roles {
2648 self.require_role(*role)?;
2649 }
2650 for relation_type in &hyper.relation_types {
2651 self.require_relation_type(*relation_type)?;
2652 }
2653 Ok(())
2654 }
2655 }
2656 }
2657
2658 /// Validates one index definition against the merged catalog.
2659 ///
2660 /// # Errors
2661 ///
2662 /// Returns [`DbError`] when a referenced catalog id is unknown or a
2663 /// composite index has no keys.
2664 ///
2665 /// # Performance
2666 ///
2667 /// This method is `O(definition size)`.
2668 fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
2669 let catalog = self.delta.catalog();
2670 match definition {
2671 IndexDefinition::Label { label } => self.require_label(*label),
2672 IndexDefinition::RelationType { relation_type } => {
2673 self.require_relation_type(*relation_type)
2674 }
2675 IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
2676 self.require_property_key(*key)
2677 }
2678 IndexDefinition::CompositeEquality { keys } => {
2679 if keys.is_empty() {
2680 return Err(DbError::unsupported(
2681 "composite equality index requires at least one key",
2682 ));
2683 }
2684 for key in keys {
2685 self.require_property_key(*key)?;
2686 }
2687 Ok(())
2688 }
2689 IndexDefinition::Projection { projection } => catalog
2690 .projection(*projection)
2691 .is_some()
2692 .then_some(())
2693 .ok_or(DbError::UnknownProjection { id: *projection }),
2694 }
2695 }
2696
2697 /// Requires a property key to exist in the merged catalog.
2698 ///
2699 /// # Errors
2700 ///
2701 /// Returns [`DbError::UnknownPropertyKey`] when absent.
2702 ///
2703 /// # Performance
2704 ///
2705 /// This method is `O(log property key count)`.
2706 fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
2707 if self.delta.catalog().property_key(id).is_some() {
2708 Ok(())
2709 } else {
2710 Err(DbError::UnknownPropertyKey { id })
2711 }
2712 }
2713}
2714
2715#[cfg(test)]
2716#[cfg(not(miri))]
2717mod tests {
2718 use std::{
2719 path::PathBuf,
2720 sync::atomic::{AtomicU64, Ordering},
2721 };
2722
2723 use super::*;
2724
2725 /// Per-process path counter for unique temporary store directories.
2726 static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
2727
2728 /// Returns a unique temporary store path and removes any prior contents.
2729 fn temp_store(name: &str) -> PathBuf {
2730 let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
2731 let path =
2732 std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
2733 let _ = std::fs::remove_dir_all(&path);
2734 path
2735 }
2736
2737 /// Manual measurement harness (run with
2738 /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2739 /// --nocapture`): builds a folded base at roughly the measured-problem scale
2740 /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2741 /// Open must be dominated by the record decode + page faults, NOT the
2742 /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2743 /// Number of element/relation records the open-latency harness builds and the
2744 /// number of timed open runs it averages.
2745 #[cfg(not(debug_assertions))]
2746 const OPEN_LATENCY_ELEMENTS: usize = 100_000;
2747 /// Relations the open-latency harness builds (each with two incidences).
2748 #[cfg(not(debug_assertions))]
2749 const OPEN_LATENCY_RELATIONS: usize = 320_000;
2750 /// Timed open runs the open-latency harness averages.
2751 #[cfg(not(debug_assertions))]
2752 const OPEN_LATENCY_RUNS: u32 = 5;
2753
2754 /// Populates `database` with `OPEN_LATENCY_ELEMENTS` ranked elements and
2755 /// `OPEN_LATENCY_RELATIONS` weighted relations (two incidences each).
2756 #[cfg(not(debug_assertions))]
2757 fn populate_large_store(database: &mut Db) {
2758 database.set_checkpoint_policy(CheckpointPolicy::Manual);
2759 database
2760 .write(|writer| {
2761 let rank = writer.register_property_key(
2762 "rank",
2763 PropertyFamily::Element,
2764 PropertyType::Integer,
2765 )?;
2766 let weight = writer.register_property_key(
2767 "weight",
2768 PropertyFamily::Relation,
2769 PropertyType::Integer,
2770 )?;
2771 let role = writer.register_role("party")?;
2772 let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
2773 for index in 0..OPEN_LATENCY_ELEMENTS {
2774 let element = writer.create_element()?;
2775 writer.set_property(
2776 PropertySubject::Element(element),
2777 rank,
2778 PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
2779 )?;
2780 elements.push(element);
2781 }
2782 for index in 0..OPEN_LATENCY_RELATIONS {
2783 let relation = writer.create_relation()?;
2784 writer.set_property(
2785 PropertySubject::Relation(relation),
2786 weight,
2787 PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
2788 )?;
2789 let source = elements[index % OPEN_LATENCY_ELEMENTS];
2790 let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
2791 writer.create_incidence(relation, source, role)?;
2792 writer.create_incidence(relation, target, role)?;
2793 }
2794 Ok(())
2795 })
2796 .expect("populate");
2797 // Fold everything into the base so open pays the base term, not log replay.
2798 database.compact().expect("compact");
2799 }
2800
2801 /// Mean elapsed time of `OPEN_LATENCY_RUNS` full `Db::open` calls on `path`.
2802 #[cfg(not(debug_assertions))]
2803 fn mean_open_ms(path: &std::path::Path) -> f64 {
2804 let mut total = std::time::Duration::ZERO;
2805 for _run in 0..OPEN_LATENCY_RUNS {
2806 let start = std::time::Instant::now();
2807 let opened = Db::open(path).expect("timed open");
2808 total += start.elapsed();
2809 drop(opened);
2810 }
2811 total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2812 }
2813
2814 /// Mean elapsed time of the prior design's open-time heavy work — record
2815 /// decode + `from_records` index rebuild (`BaseRecords::from_view`) — over
2816 /// `OPEN_LATENCY_RUNS` runs, the BEFORE proxy for the borrowed open.
2817 #[cfg(not(debug_assertions))]
2818 fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
2819 let superblock = wal::read_superblock(path).expect("superblock");
2820 let base_path = path.join(base_file(superblock.base_generation.get()));
2821 let mut total = std::time::Duration::ZERO;
2822 for _run in 0..OPEN_LATENCY_RUNS {
2823 let base = Base::open(&base_path, false).expect("base open");
2824 let start = std::time::Instant::now();
2825 let records =
2826 crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
2827 total += start.elapsed();
2828 drop(records);
2829 drop(base);
2830 }
2831 total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2832 }
2833
2834 /// Manual measurement harness (run with
2835 /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2836 /// --nocapture`): builds a folded base at roughly the measured-problem scale
2837 /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2838 /// Open must be dominated by the record decode + page faults, NOT the
2839 /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2840 /// Debug builds skip it (the open-time `debug_assert!` differential check
2841 /// would itself rebuild the index and skew the timing); run in `--release`.
2842 #[test]
2843 #[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
2844 #[cfg(not(debug_assertions))]
2845 fn open_latency_large_base() {
2846 let path = temp_store("open-latency");
2847 let mut database = Db::create(&path).expect("create");
2848 populate_large_store(&mut database);
2849 drop(database);
2850
2851 let _warm = Db::open(&path).expect("warm open");
2852 let after_ms = mean_open_ms(&path);
2853 let before_ms = mean_old_from_view_ms(&path);
2854
2855 println!(
2856 "open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
2857 {OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
2858 OPEN_LATENCY_RELATIONS * 2,
2859 OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
2860 );
2861 println!(" BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
2862 println!(" AFTER full Db::open (decode + BORROWED index): {after_ms:.1} ms / open");
2863
2864 let _ = std::fs::remove_dir_all(&path);
2865 }
2866
2867 #[test]
2868 fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
2869 let path = temp_store("reconcile");
2870 let mut database = Db::create(&path).expect("create");
2871 let index = {
2872 let mut writer = database.begin_write().expect("begin write");
2873 let key = writer
2874 .register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
2875 .expect("key");
2876 let index = writer
2877 .define_index(
2878 "element_stable_key_eq",
2879 IndexDefinition::PropertyEquality { key },
2880 )
2881 .expect("index");
2882 writer.commit().expect("commit schema");
2883 index
2884 };
2885 let eq = EqualityIndex::<crate::Text>::from_id(index);
2886
2887 let (a1, b1) = {
2888 let mut writer = database.begin_write().expect("begin write");
2889 let a = writer.upsert_element(eq, "a").expect("upsert a");
2890 let b = writer.upsert_element(eq, "b").expect("upsert b");
2891 writer.commit().expect("commit");
2892 (a, b)
2893 };
2894
2895 let (a2, c1) = {
2896 let mut writer = database.begin_write().expect("begin write");
2897 let a = writer.upsert_element(eq, "a").expect("re-upsert a");
2898 let c = writer.upsert_element(eq, "c").expect("upsert c");
2899 writer.retain(eq, &["a", "c"]).expect("retain");
2900 writer.commit().expect("commit");
2901 (a, c)
2902 };
2903
2904 assert_eq!(a1, a2, "an unchanged identity reuses its element id");
2905 assert_ne!(c1, a1);
2906 assert_ne!(c1, b1);
2907
2908 let read = database.reader();
2909 assert!(read.contains_element(a1), "kept a");
2910 assert!(read.contains_element(c1), "kept c");
2911 assert!(!read.contains_element(b1), "retain tombstoned b");
2912 assert_eq!(
2913 read.element_by_key(eq, "a")
2914 .expect("lookup a")
2915 .map(|element| element.id),
2916 Some(a1)
2917 );
2918 assert!(
2919 read.element_by_key(eq, "b").expect("lookup b").is_none(),
2920 "b is not resolvable after the prune"
2921 );
2922 let _ = std::fs::remove_dir_all(&path);
2923 }
2924
2925 #[test]
2926 fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
2927 let path = temp_store("write-closure");
2928 let mut database = Db::create(&path).expect("create");
2929
2930 // Ok with a change → committed; the read closure observes it.
2931 let (id, outcome) = database
2932 .write(|writer| {
2933 let id = writer.create_element()?;
2934 Ok(id)
2935 })
2936 .expect("write");
2937 assert!(matches!(outcome, CommitOutcome::Committed(_)));
2938 database
2939 .read(|read| {
2940 assert!(read.contains_element(id));
2941 Ok(())
2942 })
2943 .expect("read");
2944
2945 // A no-op write reports Empty (no frame appended).
2946 let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
2947 assert_eq!(outcome, CommitOutcome::Empty);
2948
2949 // An Err from the closure rolls back the staged delta.
2950 let before = database
2951 .read(|read| Ok(read.element_count()))
2952 .expect("count");
2953 let result = database.write(|writer| {
2954 writer.create_element()?;
2955 Err::<(), DbError>(DbError::EmptyQuery)
2956 });
2957 assert!(result.is_err());
2958 let after = database
2959 .read(|read| Ok(read.element_count()))
2960 .expect("count");
2961 assert_eq!(before, after, "the failed write staged nothing durable");
2962
2963 let _ = std::fs::remove_dir_all(&path);
2964 }
2965
2966 #[test]
2967 fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
2968 // The reconcile/reindex contract: re-asserting a property's existing value
2969 // must log NO mutation, so an incremental reconcile that re-sets every
2970 // property of every unchanged subject stays O(change). Without the no-op
2971 // gate the commit logs the whole graph every reindex.
2972 let path = temp_store("set-noop");
2973 let mut database = Db::create(&path).expect("create");
2974 let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
2975
2976 // Create an element and set its name (a real change → committed).
2977 let id = database
2978 .write(|writer| {
2979 let bound = writer.apply_schema(&schema)?;
2980 let name = bound.key::<crate::Text>("name")?;
2981 let id = writer.create_element()?;
2982 writer.set(id, name, "alpha")?;
2983 Ok(id)
2984 })
2985 .expect("first write")
2986 .0;
2987
2988 // Re-asserting the SAME value mutates nothing → the commit is Empty.
2989 let ((), outcome) = database
2990 .write(|writer| {
2991 let bound = writer.apply_schema(&schema)?;
2992 let name = bound.key::<crate::Text>("name")?;
2993 writer.set(id, name, "alpha")?;
2994 Ok(())
2995 })
2996 .expect("idempotent set");
2997 assert_eq!(
2998 outcome,
2999 CommitOutcome::Empty,
3000 "re-setting the same property value must log no mutation"
3001 );
3002
3003 // Setting a DIFFERENT value is a real change → committed, and visible.
3004 let ((), outcome) = database
3005 .write(|writer| {
3006 let bound = writer.apply_schema(&schema)?;
3007 let name = bound.key::<crate::Text>("name")?;
3008 writer.set(id, name, "beta")?;
3009 Ok(())
3010 })
3011 .expect("changed set");
3012 assert!(matches!(outcome, CommitOutcome::Committed(_)));
3013 let name = database
3014 .bind(&schema)
3015 .expect("bind")
3016 .key::<crate::Text>("name")
3017 .expect("name key");
3018 let value = database
3019 .read(|read| {
3020 Ok(read
3021 .element(id)
3022 .and_then(|element| element.properties().get::<crate::Text, String>(name)))
3023 })
3024 .expect("read");
3025 assert_eq!(value.as_deref(), Some("beta"));
3026
3027 let _ = std::fs::remove_dir_all(&path);
3028 }
3029
3030 #[test]
3031 fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
3032 let path = temp_store("schema");
3033 let mut database = Db::create(&path).expect("create");
3034 let schema = Schema::new()
3035 .label("function")
3036 .key::<crate::Text>("name", PropertyFamily::Element)
3037 .equality_index("name_eq", "name");
3038
3039 // First apply registers the catalog and upserts two elements by identity.
3040 let (alpha, beta) = database
3041 .write(|writer| {
3042 let bound = writer.apply_schema(&schema)?;
3043 let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
3044 let function = bound.label("function")?;
3045 let alpha = writer.upsert_element(name_eq, "alpha")?;
3046 writer.add_label(alpha, function)?;
3047 let beta = writer.upsert_element(name_eq, "beta")?;
3048 Ok((alpha, beta))
3049 })
3050 .expect("apply + write")
3051 .0;
3052 assert_ne!(alpha, beta);
3053
3054 // Re-applying the same schema is idempotent: nothing new registers, so the
3055 // commit is empty.
3056 let (_bound, outcome) = database
3057 .write(|writer| writer.apply_schema(&schema))
3058 .expect("re-apply");
3059 assert_eq!(
3060 outcome,
3061 CommitOutcome::Empty,
3062 "re-applying a schema registers nothing new"
3063 );
3064
3065 // bind() resolves the schema read-only on a reopened store; the typed
3066 // handle round-trips, and a wrong value type is rejected.
3067 let reopened = Db::open(&path).expect("open");
3068 let bound = reopened.bind(&schema).expect("bind");
3069 let name_eq = bound
3070 .equality_index::<crate::Text>("name_eq")
3071 .expect("typed index");
3072 assert!(
3073 bound.equality_index::<crate::Int>("name_eq").is_err(),
3074 "a wrong-value-type handle request is a SchemaConflict"
3075 );
3076 let found = reopened
3077 .read(|read| read.element_by_key(name_eq, "alpha"))
3078 .expect("read")
3079 .expect("alpha present");
3080 assert_eq!(found.id, alpha);
3081
3082 let _ = std::fs::remove_dir_all(&path);
3083 }
3084
3085 /// The exact logical state the crash-matrix asserts recovery preserves: the
3086 /// visible element ids, the rank-keyed property values, and the `Person`
3087 /// label membership.
3088 #[derive(Debug, Eq, PartialEq)]
3089 struct LogicalState {
3090 /// Visible element ids in ascending order.
3091 elements: Vec<ElementId>,
3092 /// Subjects whose `rank` equals each probed value, by value.
3093 rank_eq_500: Vec<PropertySubject>,
3094 /// Element ids carrying the `Person` label.
3095 person_members: Vec<ElementId>,
3096 }
3097
3098 /// Catalog/topology fixture ids returned by [`build_fixture`].
3099 struct Fixture {
3100 /// `rank` integer property key.
3101 rank: PropertyKeyId,
3102 /// `Person` label.
3103 person: LabelId,
3104 }
3105
3106 /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
3107 /// even-indexed ones labelled `Person`. Returns the fixture ids.
3108 fn build_fixture(database: &mut Db) -> Fixture {
3109 let mut writer = database.begin_write().expect("begin write");
3110 let rank = writer
3111 .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
3112 .expect("rank key");
3113 let person = writer.register_label("Person").expect("person label");
3114 for index in 0..8u64 {
3115 let element = writer.create_element().expect("element");
3116 writer
3117 .set(
3118 element,
3119 Key::<crate::Int>::from_id(rank),
3120 i64::try_from(index).expect("index") * 100,
3121 )
3122 .expect("set rank");
3123 if index % 2 == 0 {
3124 writer.add_label(element, person).expect("add label");
3125 }
3126 }
3127 writer.commit().expect("commit fixture");
3128 Fixture { rank, person }
3129 }
3130
3131 /// Reads the logical state through the index-backed read surface.
3132 fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
3133 let read = database.reader();
3134 let elements = read.element_ids();
3135 let rank_eq_500 = read
3136 .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
3137 .expect("rank lookup");
3138 let person_members = read.snapshot.view().elements_with_label(fixture.person);
3139 LogicalState {
3140 elements,
3141 rank_eq_500,
3142 person_members,
3143 }
3144 }
3145
3146 /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
3147 /// `database` mints must take the id one past the current maximum visible
3148 /// element id, i.e. the recovered watermark survived the fold. A regression
3149 /// that dropped the watermark on fold (so the recovered record set is
3150 /// unchanged but the next-id counter reset) would reuse an existing id and
3151 /// fail this assertion — which the unchanged-record-set checks alone miss.
3152 ///
3153 /// The probe element is rolled back, so it does not perturb the logical state
3154 /// the surrounding test re-reads.
3155 fn assert_no_id_reuse_across_fold(database: &mut Db) {
3156 let max_existing = database
3157 .reader()
3158 .element_ids()
3159 .into_iter()
3160 .map(ElementId::get)
3161 .max()
3162 .unwrap_or(0);
3163 let expected = ElementId::new(max_existing + 1);
3164 let mut writer = database.begin_write().expect("watermark probe writer");
3165 let minted = writer.create_element().expect("watermark probe element");
3166 assert_eq!(
3167 minted, expected,
3168 "the next minted id must be one past the max existing id (watermark \
3169 survived the fold; ids are never reused)",
3170 );
3171 // Drop the probe writer so it leaves no trace in the logical state.
3172 drop(writer);
3173 }
3174
3175 /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
3176 /// recovers EXACTLY the correct logical state. After a crash before the
3177 /// superblock lands, the OLD generation stays authoritative (the orphan new
3178 /// base is ignored); after a crash once the superblock names the new
3179 /// generation, the NEW base is authoritative. The completed checkpoint
3180 /// recovers the same logical state from the folded base. In every case the
3181 /// index-backed lookups return the same answers as before the (attempted)
3182 /// fold.
3183 #[test]
3184 fn checkpoint_crash_matrix_recovers_exact_state() {
3185 for stop in [
3186 CheckpointStop::BeforeSuperblock,
3187 CheckpointStop::BeforeRotate,
3188 CheckpointStop::Complete,
3189 ] {
3190 let path = temp_store(&format!("crash-{stop:?}"));
3191 let mut database = Db::create(&path).expect("create");
3192 let fixture = build_fixture(&mut database);
3193 let before = read_logical(&database, &fixture);
3194 let before_generation = database.base_generation;
3195
3196 // Simulate a crash at `stop`: the checkpoint returns right after the
3197 // chosen fsync, leaving the intermediate files in place. We then drop
3198 // the handle (as a crash would) and reopen from disk.
3199 database
3200 .checkpoint_inner(stop)
3201 .expect("checkpoint stop returns ok");
3202 drop(database);
3203
3204 let mut recovered = Db::open(&path).expect("reopen after crash");
3205 let after = read_logical(&recovered, &fixture);
3206 assert_eq!(
3207 after, before,
3208 "crash at {stop:?} must recover the exact logical state",
3209 );
3210
3211 // The recovered watermark survives every crash window: the next minted
3212 // id is one past the max recovered element id, so ids are never reused
3213 // across the (attempted) fold — asserted behaviorally, not merely
3214 // inferred from the unchanged record set.
3215 assert_no_id_reuse_across_fold(&mut recovered);
3216
3217 // Generation expectation per crash window.
3218 match stop {
3219 CheckpointStop::BeforeSuperblock => assert_eq!(
3220 recovered.base_generation, before_generation,
3221 "old superblock stays authoritative before the new one lands",
3222 ),
3223 CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
3224 recovered.base_generation,
3225 before_generation + 1,
3226 "the new superblock names the folded generation",
3227 ),
3228 }
3229
3230 // A second open is idempotent (orphan files from a partial crash do
3231 // not derail a repeat recovery).
3232 let reopened = Db::open(&path).expect("second reopen");
3233 assert_eq!(read_logical(&reopened, &fixture), before);
3234
3235 drop(reopened);
3236 let _ = std::fs::remove_dir_all(&path);
3237 }
3238 }
3239
3240 /// The auto-checkpoint policy folds the delta-log into a fresh base once the
3241 /// log outgrows the base by the configured factor: under a tiny factor, a
3242 /// run of dirty commits advances the live generation (the log was folded),
3243 /// and the logical state is preserved across the fold. The manual policy
3244 /// never auto-folds.
3245 #[test]
3246 fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
3247 // Manual policy: many commits, generation never advances on its own.
3248 let manual_path = temp_store("auto-manual");
3249 let mut manual = Db::create(&manual_path).expect("create manual");
3250 manual.set_checkpoint_policy(CheckpointPolicy::Manual);
3251 let _fixture = build_fixture(&mut manual);
3252 for _ in 0..200 {
3253 let mut writer = manual.begin_write().expect("writer");
3254 writer.create_element().expect("element");
3255 writer.commit().expect("commit");
3256 }
3257 assert_eq!(
3258 manual.live_generation(),
3259 CheckpointGeneration::new(0),
3260 "manual policy must never auto-fold",
3261 );
3262 drop(manual);
3263 let _ = std::fs::remove_dir_all(&manual_path);
3264
3265 // Size-ratio policy with the smallest factor: the log soon outgrows the
3266 // tiny base floor, so a run of commits triggers at least one fold.
3267 let auto_path = temp_store("auto-ratio");
3268 let mut auto = Db::create(&auto_path).expect("create auto");
3269 auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
3270 let fixture = build_fixture(&mut auto);
3271 let before = read_logical(&auto, &fixture);
3272 for _ in 0..400 {
3273 let mut writer = auto.begin_write().expect("writer");
3274 writer.create_element().expect("element");
3275 writer.commit().expect("commit");
3276 }
3277 assert!(
3278 auto.live_generation() > CheckpointGeneration::new(0),
3279 "size-ratio policy must auto-fold once the log outgrows the base",
3280 );
3281 // The pre-existing logical state survives every fold; the policy is also
3282 // surfaced in status and preserved across the fold.
3283 let after = read_logical(&auto, &fixture);
3284 assert_eq!(after.rank_eq_500, before.rank_eq_500);
3285 assert_eq!(after.person_members, before.person_members);
3286 // Ids are never reused across the auto-fold: the next minted id is one
3287 // past the max existing id (the watermark folded into the new base).
3288 assert_no_id_reuse_across_fold(&mut auto);
3289 assert_eq!(
3290 auto.checkpoint_policy(),
3291 CheckpointPolicy::SizeRatio { factor: 1 },
3292 "the auto-fold reopen must preserve the configured policy",
3293 );
3294 // Status surfaces the live generation and the (now small) log size.
3295 let status = auto.stats();
3296 assert_eq!(status.live_generation, auto.live_generation());
3297 assert!(status.base_byte_size > 0, "live base has bytes");
3298 drop(auto);
3299 let _ = std::fs::remove_dir_all(&auto_path);
3300 }
3301}