oxgraph_db/database.rs
1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Db`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`reader` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13 borrow::Cow,
14 collections::BTreeSet,
15 path::{Path, PathBuf},
16 sync::Arc,
17};
18
19use crate::{
20 Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId,
21 GraphProjectionDefinition, GraphProjectionSpec, IncidenceId, IncidenceRecord, IndexId, LabelId,
22 PreparedQuery, ProjectionDefinition, ProjectionId, Properties, PropertyKeyId, PropertySubject,
23 PropertyType, PropertyValue, QueryResult, Relation, RelationId, RelationTypeId, RoleId, Schema,
24 TransactionId,
25 backing::Base,
26 catalog::{IndexDefinition, PropertyFamily},
27 freeze::{self, FreezeStamps},
28 lock::WriterLock,
29 overlay::{Overlay, Snapshot, StateView, WriteOverlay},
30 projection::{self, GraphProjection, HypergraphProjection},
31 state::NextIds,
32 storage,
33 traversal::{self, Direction, Subgraph, Walk},
34 typed::{Assignable, EqualityIndex, Key, ValueType},
35 wal,
36 wire::SuperblockRecord,
37};
38
39/// Lookup input for a cataloged index.
40///
41/// This type makes index lookup shape explicit: membership indexes accept
42/// [`Match::All`], single-property indexes accept scalar equality or
43/// range inputs, and composite equality indexes accept an ordered value tuple.
44///
45/// # Performance
46///
47/// Copying this value is `O(1)`.
48#[derive(Clone, Copy, Debug)]
49pub enum Match<'value> {
50 /// Lookup every subject represented by a membership-style index.
51 All,
52 /// Lookup one scalar equality value.
53 Equal(&'value PropertyValue),
54 /// Lookup one inclusive scalar range.
55 Range {
56 /// Inclusive lower bound.
57 min: &'value PropertyValue,
58 /// Inclusive upper bound.
59 max: &'value PropertyValue,
60 },
61 /// Lookup one ordered composite equality tuple.
62 Composite(&'value [PropertyValue]),
63}
64
65/// Auto-checkpoint policy: decides when a dirty commit should fold the
66/// delta-log into a fresh base generation, bounding the log tail that recovery
67/// must replay.
68///
69/// The default is size-ratio: trigger when the delta-log grows past `factor`
70/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
71/// disables auto-triggering entirely (folded only by an explicit
72/// [`Db::compact`]).
73///
74/// # Performance
75///
76/// Copying this value is `O(1)`.
77#[derive(Clone, Copy, Debug, Eq, PartialEq)]
78pub enum CheckpointPolicy {
79 /// Never auto-checkpoint; the caller folds explicitly via [`Db::compact`].
80 Manual,
81 /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
82 /// times the live base size (a small floor guards a tiny/empty base so the
83 /// gen-0 store does not checkpoint on its first commit).
84 SizeRatio {
85 /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
86 /// before the next dirty commit folds it.
87 factor: u32,
88 },
89}
90
91impl CheckpointPolicy {
92 /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
93 /// four times the live base size.
94 pub const DEFAULT_FACTOR: u32 = 4;
95
96 /// The base-size floor (bytes) below which the size-ratio policy never fires,
97 /// so a freshly created (near-empty) base is not checkpointed on its first
98 /// commits before it carries meaningful data.
99 const MIN_BASE_BYTES: u64 = 4 * 1024;
100
101 /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
102 /// should trigger an auto-checkpoint under this policy.
103 ///
104 /// # Performance
105 ///
106 /// This method is `O(1)`.
107 #[must_use]
108 const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
109 match self {
110 Self::Manual => false,
111 Self::SizeRatio { factor } => {
112 let floor = if base_bytes < Self::MIN_BASE_BYTES {
113 Self::MIN_BASE_BYTES
114 } else {
115 base_bytes
116 };
117 log_bytes > floor.saturating_mul(factor as u64)
118 }
119 }
120 }
121}
122
123impl Default for CheckpointPolicy {
124 /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
125 ///
126 /// # Performance
127 ///
128 /// This function is `O(1)`.
129 fn default() -> Self {
130 Self::SizeRatio {
131 factor: Self::DEFAULT_FACTOR,
132 }
133 }
134}
135
136/// The durable result of a [`Db::write`]: whether a frame landed, and at which
137/// commit sequence.
138///
139/// # Performance
140///
141/// Copying this value is `O(1)`.
142#[derive(Clone, Copy, Debug, Eq, PartialEq)]
143#[non_exhaustive]
144pub enum CommitOutcome {
145 /// The transaction made no changes; no WAL frame was appended.
146 Empty,
147 /// A durable frame landed at this commit sequence.
148 Committed(CommitSeq),
149}
150
151/// Builds the base filename for generation `generation`.
152///
153/// # Performance
154///
155/// This function is `O(1)`.
156fn base_file(generation: u64) -> String {
157 format!("base-{generation}.oxgdb")
158}
159
160/// Builds the delta-log filename for generation `generation`.
161///
162/// # Performance
163///
164/// This function is `O(1)`.
165fn delta_file(generation: u64) -> String {
166 format!("delta-{generation}.log")
167}
168
169/// Open OXGDB database handle.
170///
171/// # Performance
172///
173/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
174/// delta-log handle.
175pub struct Db {
176 /// Root database directory.
177 root: PathBuf,
178 /// The current visible snapshot (base generation + published overlay),
179 /// shared by readers through an atomically reference-counted handle.
180 current: Arc<Snapshot>,
181 /// Live base generation named by the superblock; every delta frame and the
182 /// per-generation log filename carry it.
183 base_generation: u64,
184 /// Last writer transaction id durably recorded (the last dirty commit's id).
185 /// A rollback burns a session-local id above this but does not advance it.
186 last_transaction_id: TransactionId,
187 /// Auto-checkpoint policy consulted after each dirty commit.
188 checkpoint_policy: CheckpointPolicy,
189}
190
191impl Db {
192 /// Creates a new empty OXGDB database at `path`.
193 ///
194 /// The create order is base-0 then empty delta-0.log then the writer lock
195 /// file then the superblock (written LAST as the create-complete marker), so
196 /// a half-created store is detected on open rather than silently opened
197 /// empty.
198 ///
199 /// # Errors
200 ///
201 /// Returns [`DbError::AlreadyExists`] when a store already exists, or
202 /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
203 ///
204 /// # Performance
205 ///
206 /// This function is `O(empty base bytes)`.
207 pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
208 let root = path.as_ref().to_path_buf();
209 if root.join(wal::SUPERBLOCK_FILE).exists() {
210 return Err(DbError::AlreadyExists);
211 }
212 // Base-0: an empty merged view (empty base under an empty overlay).
213 let empty_base = crate::overlay::BaseRecords::empty();
214 let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
215 let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
216 let base_bytes = freeze::freeze_view(
217 &view,
218 FreezeStamps {
219 commit_seq: 0,
220 transaction_id: 0,
221 generation: 0,
222 },
223 )?;
224 storage::atomic_write(
225 &root,
226 &root.join(format!("{}.tmp", base_file(0))),
227 &root.join(base_file(0)),
228 &base_bytes,
229 )?;
230 // Empty delta-0.log, durably created.
231 create_empty_log(&root, 0)?;
232 // Superblock is written LAST; its existence is the create-complete marker.
233 write_superblock(&root, 0, 0, 0, 0)?;
234 Self::open(&root)
235 }
236
237 /// Opens an existing OXGDB database, recovering the live frontier from the
238 /// valid prefix of the delta-log replayed over the base named by the
239 /// superblock.
240 ///
241 /// # Errors
242 ///
243 /// Returns [`DbError`] when the store is missing, malformed, or the log is
244 /// corrupt beyond a torn tail.
245 ///
246 /// # Performance
247 ///
248 /// This function is `O(base bytes + log bytes)`.
249 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
250 let root = path.as_ref().to_path_buf();
251 let superblock = wal::read_superblock(&root)?;
252 let generation = superblock.base_generation.get();
253
254 let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
255 let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
256 let base_header = *base.get().header();
257 let base_catalog = base.get().catalog().clone();
258 let base_next = NextIds::from_header(&base_header);
259
260 // Replay the valid prefix of the per-generation delta-log.
261 let log_path = root.join(delta_file(generation));
262 let log_bytes = read_log(&log_path)?;
263 let outcome = wal::replay(generation, &log_bytes)?;
264 // A torn tail truncates the log back to its last-good byte length.
265 if outcome.valid_len < log_bytes.len() {
266 truncate_log(&log_path, outcome.valid_len)?;
267 }
268
269 // Fold the replayed frames into a fresh overlay over the base, deriving
270 // the live frontier (commit_seq/txn_id) from the last good frame.
271 let mut write = WriteOverlay::new(base_next, base_catalog);
272 let mut recovered_next = base_next;
273 let mut last_commit_seq = superblock.commit_seq.get();
274 let mut last_txn = superblock.transaction_id.get();
275 for frame in &outcome.frames {
276 for op in &frame.ops {
277 write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
278 }
279 recovered_next = recovered_next.elementwise_max(write.next_ids());
280 last_commit_seq = frame.lsn;
281 last_txn = last_txn.max(frame.txn_id);
282 }
283 // ids are never reused: the recovered watermark is the elementwise max of
284 // the base header and every replayed frame's watermark.
285 write.set_next_ids(recovered_next);
286 let overlay = Arc::new(write.freeze());
287
288 // Reuse the records already decoded for replay instead of decoding the base
289 // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
290 // the records (and their derived index) match. Halves open's base-decode cost.
291 let snapshot = Arc::new(Snapshot::with_shared_base_records(
292 CheckpointGeneration::new(generation),
293 CommitSeq::new(last_commit_seq),
294 base,
295 overlay,
296 base_records,
297 ));
298
299 Ok(Self {
300 root,
301 current: snapshot,
302 base_generation: generation,
303 last_transaction_id: TransactionId::new(last_txn),
304 checkpoint_policy: CheckpointPolicy::default(),
305 })
306 }
307
308 /// Returns the live base generation named by the superblock (the count of
309 /// folds this store has undergone; gen-0 is the freshly created store).
310 ///
311 /// # Performance
312 ///
313 /// This method is `O(1)`.
314 #[must_use]
315 pub const fn live_generation(&self) -> CheckpointGeneration {
316 CheckpointGeneration::new(self.base_generation)
317 }
318
319 /// Returns the configured auto-checkpoint policy.
320 ///
321 /// # Performance
322 ///
323 /// This method is `O(1)`.
324 #[must_use]
325 pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
326 self.checkpoint_policy
327 }
328
329 /// Sets the auto-checkpoint policy consulted after each dirty commit.
330 ///
331 /// # Performance
332 ///
333 /// This method is `O(1)`.
334 pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
335 self.checkpoint_policy = policy;
336 }
337
338 /// Validates the current handle by re-reading the superblock and verifying
339 /// the live base's content CRC.
340 ///
341 /// # Errors
342 ///
343 /// Returns [`DbError`] when the superblock or base fails validation.
344 ///
345 /// # Performance
346 ///
347 /// This method is `O(base bytes)`.
348 pub fn validate(&self) -> Result<(), DbError> {
349 wal::read_superblock(&self.root)?;
350 Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
351 }
352
353 /// Validates an OXGDB database at `path`.
354 ///
355 /// # Errors
356 ///
357 /// Returns [`DbError`] when the store fails to open and recover.
358 ///
359 /// # Performance
360 ///
361 /// This function is `O(base bytes + log bytes)`.
362 pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
363 Self::open(path).map(|_database| ())
364 }
365
366 /// Folds the current base+overlay into a new base generation, rotating the
367 /// delta-log and republishing the superblock (a manual checkpoint).
368 ///
369 /// This is the checkpoint primitive, exposed here so the existing `compact`
370 /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
371 /// configured separately via [`Db::set_checkpoint_policy`].
372 ///
373 /// # Errors
374 ///
375 /// Returns [`DbError`] when encoding, writing, or publishing the new
376 /// generation fails.
377 ///
378 /// # Performance
379 ///
380 /// This method is `O(visible state bytes)`.
381 pub fn compact(&mut self) -> Result<(), DbError> {
382 self.checkpoint()
383 }
384
385 /// Folds the current base+overlay into base-`{g+1}`, creates an empty
386 /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
387 /// linearization point), then unlinks the old base and log.
388 ///
389 /// The order is crash-safe: the new base is fully durable BEFORE the
390 /// superblock names it (so a crash before the superblock leaves the OLD
391 /// superblock authoritative and the orphan new base is ignored), and the old
392 /// base/log are unlinked only AFTER the superblock names the new generation
393 /// (so a crash before the unlink leaves the NEW superblock authoritative and
394 /// the orphan old files are ignored). The
395 /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
396 ///
397 /// # Errors
398 ///
399 /// Returns [`DbError`] when encoding, writing, or publishing fails.
400 ///
401 /// # Performance
402 ///
403 /// This method is `O(visible state bytes)`.
404 pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
405 self.checkpoint_inner(
406 #[cfg(test)]
407 CheckpointStop::Complete,
408 )
409 }
410
411 /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
412 /// [`CheckpointStop`] that simulates a crash by returning early right after a
413 /// chosen fsync point, leaving the on-disk files exactly as a real crash
414 /// there would, so the crash-matrix test can reopen and assert recovery.
415 ///
416 /// # Errors
417 ///
418 /// Returns [`DbError`] when encoding, writing, or publishing fails.
419 ///
420 /// # Performance
421 ///
422 /// This method is `O(visible state bytes)`.
423 fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
424 let _lock = WriterLock::acquire(&self.root)?;
425 let next_generation = self
426 .base_generation
427 .checked_add(1)
428 .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
429 let view = self.current.view();
430 let commit_seq = self.current.lsn().get();
431 let base_bytes = freeze::freeze_view(
432 &view,
433 FreezeStamps {
434 commit_seq,
435 transaction_id: self.last_transaction_id.get(),
436 generation: next_generation,
437 },
438 )?;
439 // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
440 storage::atomic_write(
441 &self.root,
442 &self
443 .root
444 .join(format!("{}.tmp", base_file(next_generation))),
445 &self.root.join(base_file(next_generation)),
446 &base_bytes,
447 )?;
448 // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
449 create_empty_log(&self.root, next_generation)?;
450 // Crash point A: new base + new log durable, superblock NOT yet
451 // published. The OLD superblock still names `g`, so recovery uses the old
452 // generation; the new base/log are orphans.
453 #[cfg(test)]
454 if matches!(stop, CheckpointStop::BeforeSuperblock) {
455 return Ok(());
456 }
457 // (3) publish the superblock naming g+1 — the linearization point.
458 write_superblock(
459 &self.root,
460 next_generation,
461 commit_seq,
462 commit_seq,
463 self.last_transaction_id.get(),
464 )?;
465 // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
466 // Recovery uses the new generation; the old base/log are orphans.
467 #[cfg(test)]
468 if matches!(stop, CheckpointStop::BeforeRotate) {
469 return Ok(());
470 }
471 // Re-open over the new generation, then (4) unlink the old base + log.
472 let reopened = Self::open(&self.root)?;
473 let old_generation = self.base_generation;
474 let policy = self.checkpoint_policy;
475 self.current = reopened.current;
476 self.base_generation = reopened.base_generation;
477 self.last_transaction_id = reopened.last_transaction_id;
478 // The reopen reset the policy to the default; restore the caller's.
479 self.checkpoint_policy = policy;
480 let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
481 let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
482 let _ = storage::sync_directory(&self.root);
483 Ok(())
484 }
485
486 /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
487 /// delta-log has grown too large relative to the base. Called after a dirty
488 /// commit publishes its frame. A failed fold is surfaced so the caller can
489 /// observe it; the committed data is already durable in the log regardless.
490 ///
491 /// # Errors
492 ///
493 /// Returns [`DbError`] when the triggered fold fails.
494 ///
495 /// # Performance
496 ///
497 /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
498 fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
499 let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
500 let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
501 if self
502 .checkpoint_policy
503 .should_checkpoint(log_bytes, base_bytes)
504 {
505 self.checkpoint()?;
506 }
507 Ok(())
508 }
509
510 /// Returns operational status for this handle, including the live generation
511 /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
512 /// weighs.
513 ///
514 /// # Performance
515 ///
516 /// This method is `O(visible state)` for the merged counts plus two `stat`
517 /// syscalls for the file sizes.
518 #[must_use]
519 pub fn stats(&self) -> Stats {
520 let view = self.current.view();
521 Stats {
522 visible_commit_seq: self.current.lsn(),
523 last_transaction_id: self.last_transaction_id,
524 live_generation: CheckpointGeneration::new(self.base_generation),
525 base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
526 log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
527 element_count: view.element_count(),
528 relation_count: view.relation_count(),
529 incidence_count: view.incidence_count(),
530 catalog: self.catalog_summary(),
531 }
532 }
533
534 /// Returns a catalog-size summary.
535 ///
536 /// # Performance
537 ///
538 /// This method is `O(catalog entry count)`.
539 #[must_use]
540 pub fn catalog_summary(&self) -> CatalogSummary {
541 CatalogSummary::from_catalog(self.current.view().catalog())
542 }
543
544 /// Starts a read transaction pinned to the current visible snapshot.
545 ///
546 /// # Performance
547 ///
548 /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
549 /// observes a fixed state even across later commits and checkpoints.
550 #[must_use]
551 pub fn reader(&self) -> Reader {
552 Reader {
553 snapshot: Arc::clone(&self.current),
554 }
555 }
556
557 /// Starts the single writer transaction, acquiring the cross-process writer
558 /// lock for the transaction's lifetime.
559 ///
560 /// # Errors
561 ///
562 /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
563 /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
564 ///
565 /// # Performance
566 ///
567 /// This method is `O(1)`: the writer layers a fresh empty write overlay over
568 /// the current snapshot.
569 pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
570 let lock = WriterLock::acquire(&self.root)?;
571 let transaction_id = self
572 .last_transaction_id
573 .checked_next()
574 .ok_or(DbError::TransactionIdOverflow)?;
575 // Burn the id eagerly so it is session-local-visible even on rollback;
576 // it only becomes durable when a dirty commit writes its frame, and a
577 // reopen recovers the durable high-water mark from the log.
578 self.last_transaction_id = transaction_id;
579 let parent = Arc::clone(&self.current);
580 // Seed the writer delta from the parent's published overlay so the
581 // writer reads every committed record; the parent overlay is never
582 // mutated (the seed clones its maps).
583 let delta = WriteOverlay::from_overlay(parent.overlay());
584 Ok(Writer {
585 database: self,
586 parent,
587 delta,
588 transaction_id,
589 lock,
590 })
591 }
592
593 /// Runs `f` against a read transaction pinned to the current snapshot. The
594 /// primary read entry point.
595 ///
596 /// # Errors
597 ///
598 /// Propagates whatever error `f` returns.
599 ///
600 /// # Performance
601 ///
602 /// Entering is `O(1)` (an `Arc` clone); the total cost is `f`'s cost.
603 pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
604 f(&self.reader())
605 }
606
607 /// Runs `f` against the single write transaction, committing on `Ok` and
608 /// rolling back on `Err` — control flow IS the commit protocol. Returns `f`'s
609 /// value with the [`CommitOutcome`] (whether a durable frame landed). The
610 /// primary write entry point.
611 ///
612 /// # Errors
613 ///
614 /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock,
615 /// `f`'s error (after rolling back the staged delta), or a commit error.
616 ///
617 /// # Performance
618 ///
619 /// Begin is `O(1)`; commit is `O(change)`. A triggered auto-fold adds
620 /// `O(visible bytes)`.
621 pub fn write<R>(
622 &mut self,
623 f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
624 ) -> Result<(R, CommitOutcome), DbError> {
625 let mut writer = self.begin_write()?;
626 // On `Err` the `?` drops `writer` here, releasing the lock and discarding
627 // the staged delta — no frame is appended (rollback).
628 let value = f(&mut writer)?;
629 let committed = !writer.delta.is_empty();
630 let lsn = writer.commit()?;
631 let outcome = if committed {
632 CommitOutcome::Committed(lsn)
633 } else {
634 CommitOutcome::Empty
635 };
636 Ok((value, outcome))
637 }
638
639 /// Resolves an already-applied [`Schema`] against the live catalog WITHOUT
640 /// writing, returning the [`Bound`] handle bag (for a store already
641 /// bootstrapped with this schema).
642 ///
643 /// # Errors
644 ///
645 /// Returns [`DbError::UnknownName`] when a declared item is absent.
646 ///
647 /// # Performance
648 ///
649 /// This method is `O(declared items × log catalog)`.
650 pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
651 let view = self.current.view();
652 let catalog = view.catalog();
653 let mut bound = Bound::default();
654 for name in &schema.roles {
655 let id = catalog.role_id(name).ok_or_else(|| DbError::UnknownName {
656 kind: "role",
657 name: name.clone(),
658 })?;
659 bound.roles.insert(name.clone(), id);
660 }
661 for name in &schema.labels {
662 let id = catalog.label_id(name).ok_or_else(|| DbError::UnknownName {
663 kind: "label",
664 name: name.clone(),
665 })?;
666 bound.labels.insert(name.clone(), id);
667 }
668 for name in &schema.relation_types {
669 let id = catalog
670 .relation_type_id(name)
671 .ok_or_else(|| DbError::UnknownName {
672 kind: "relation type",
673 name: name.clone(),
674 })?;
675 bound.relation_types.insert(name.clone(), id);
676 }
677 for (name, _family, value_type) in &schema.keys {
678 let id = catalog
679 .property_key_id(name)
680 .ok_or_else(|| DbError::UnknownName {
681 kind: "property key",
682 name: name.clone(),
683 })?;
684 bound.keys.insert(name.clone(), (id, *value_type));
685 }
686 for (name, key_name) in &schema.equality_indexes {
687 let (_key_id, value_type) =
688 *bound
689 .keys
690 .get(key_name)
691 .ok_or_else(|| DbError::UnknownName {
692 kind: "property key",
693 name: key_name.clone(),
694 })?;
695 let id = catalog.index_id(name).ok_or_else(|| DbError::UnknownName {
696 kind: "index",
697 name: name.clone(),
698 })?;
699 bound
700 .equality_indexes
701 .insert(name.clone(), (id, value_type));
702 }
703 for spec in &schema.graph_projections {
704 let id = catalog
705 .projection_id(&spec.name)
706 .ok_or_else(|| DbError::UnknownName {
707 kind: "projection",
708 name: spec.name.clone(),
709 })?;
710 bound.projections.insert(spec.name.clone(), id);
711 }
712 Ok(bound)
713 }
714
715 /// Prepares a query against the current catalog.
716 ///
717 /// # Errors
718 ///
719 /// Returns [`DbError`] when parsing or semantic analysis fails.
720 ///
721 /// # Performance
722 ///
723 /// This method is `O(query length + catalog lookup cost)`.
724 pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
725 PreparedQuery::prepare(query, &self.current.view())
726 }
727}
728
729/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
730/// be stat'd (size is advisory — used for status reporting and the
731/// auto-checkpoint heuristic, never for correctness).
732///
733/// # Performance
734///
735/// This function is `O(1)`: one `stat` syscall.
736fn file_len(path: &Path) -> u64 {
737 std::fs::metadata(path).map_or(0, |meta| meta.len())
738}
739
740/// Test-only crash-injection point for [`Db::checkpoint_inner`]: stops the
741/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
742/// the recovered state at each crash window.
743///
744/// The crash-matrix test that constructs the non-`Complete` variants is
745/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
746/// miri's isolation cannot model), so under miri only `Complete` is constructed
747/// and the other variants are expectedly unused.
748///
749/// # Performance
750///
751/// `perf: unspecified`; a test-only control tag.
752#[cfg(test)]
753#[cfg_attr(
754 miri,
755 expect(
756 dead_code,
757 reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
758 )
759)]
760#[derive(Clone, Copy, Debug, Eq, PartialEq)]
761enum CheckpointStop {
762 /// Run the whole checkpoint (the production path).
763 Complete,
764 /// Stop after the new base + new log are durable, before the superblock is
765 /// published (the old superblock stays authoritative).
766 BeforeSuperblock,
767 /// Stop after the superblock names the new generation, before the old
768 /// base/log are unlinked (the new superblock is authoritative).
769 BeforeRotate,
770}
771
772/// Reads the whole delta-log into memory, treating a missing file as empty.
773///
774/// # Errors
775///
776/// Returns [`DbError::Io`] when the file cannot be read.
777///
778/// # Performance
779///
780/// This function is `O(log bytes)`.
781fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
782 match std::fs::read(path) {
783 Ok(bytes) => Ok(bytes),
784 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
785 Err(error) => Err(DbError::io("read delta-log", error)),
786 }
787}
788
789/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
790/// discarding a torn tail under the open path.
791///
792/// # Errors
793///
794/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
795///
796/// # Performance
797///
798/// This function is `O(1)`.
799fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
800 let file = std::fs::OpenOptions::new()
801 .write(true)
802 .open(path)
803 .map_err(|error| DbError::io("open delta-log for truncate", error))?;
804 let len = u64::try_from(len)
805 .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
806 file.set_len(len)
807 .map_err(|error| DbError::io("truncate delta-log", error))?;
808 file.sync_all()
809 .map_err(|error| DbError::io("sync truncated delta-log", error))
810}
811
812/// Creates an empty per-generation delta-log, fsyncing the file and the
813/// directory entry so the new (empty) log is durable.
814///
815/// # Errors
816///
817/// Returns [`DbError::Io`] when creation or syncing fails.
818///
819/// # Performance
820///
821/// This function is `O(1)`.
822fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
823 let path = root.join(delta_file(generation));
824 let file =
825 std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
826 file.sync_all()
827 .map_err(|error| DbError::io("sync delta-log", error))?;
828 storage::sync_directory(root)
829}
830
831/// Opens the live delta-log for appending (create when absent, read+append).
832///
833/// # Errors
834///
835/// Returns [`DbError::Io`] when the log cannot be opened.
836///
837/// # Performance
838///
839/// This function is `O(1)`.
840fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
841 std::fs::OpenOptions::new()
842 .create(true)
843 .truncate(false)
844 .read(true)
845 .append(true)
846 .open(root.join(delta_file(generation)))
847 .map_err(|error| DbError::io("open delta-log for append", error))
848}
849
850/// Writes the superblock naming `generation` with the given frontier stamps.
851///
852/// # Errors
853///
854/// Returns [`DbError::Io`] when publishing fails.
855///
856/// # Performance
857///
858/// This function is `O(1)`.
859fn write_superblock(
860 root: &Path,
861 generation: u64,
862 checkpoint_lsn: u64,
863 commit_seq: u64,
864 transaction_id: u64,
865) -> Result<(), DbError> {
866 wal::write_superblock(
867 root,
868 &SuperblockRecord {
869 magic: crate::wire::SUPERBLOCK_MAGIC,
870 base_generation: generation.into(),
871 checkpoint_lsn: checkpoint_lsn.into(),
872 log_byte_offset: 0u64.into(),
873 commit_seq: commit_seq.into(),
874 transaction_id: transaction_id.into(),
875 format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
876 flags: 0u32.into(),
877 crc32c: 0u32.into(),
878 pad: 0u32.into(),
879 },
880 )
881}
882
883/// Snapshot of database status.
884///
885/// # Performance
886///
887/// Copying and comparing status is `O(1)`.
888#[derive(Clone, Copy, Debug, Eq, PartialEq)]
889pub struct Stats {
890 /// Last visible commit sequence.
891 pub visible_commit_seq: CommitSeq,
892 /// Last writer transaction ID burned by this handle.
893 ///
894 /// This value is durable after a dirty commit and session-local after
895 /// rollback.
896 pub last_transaction_id: TransactionId,
897 /// Live base generation named by the superblock (the count of folds this
898 /// store has undergone; gen-0 is the freshly created store).
899 pub live_generation: CheckpointGeneration,
900 /// On-disk byte size of the live base file.
901 pub base_byte_size: u64,
902 /// On-disk byte size of the live delta-log (the tail recovery replays and
903 /// the auto-checkpoint policy weighs against the base size).
904 pub log_byte_size: u64,
905 /// Visible element count.
906 pub element_count: usize,
907 /// Visible relation count.
908 pub relation_count: usize,
909 /// Visible incidence count.
910 pub incidence_count: usize,
911 /// Catalog-size summary.
912 pub catalog: CatalogSummary,
913}
914
915/// Catalog-size summary.
916///
917/// # Performance
918///
919/// Copying and comparing are `O(1)`.
920#[derive(Clone, Copy, Debug, Eq, PartialEq)]
921pub struct CatalogSummary {
922 /// Role count.
923 pub role_count: usize,
924 /// Label count.
925 pub label_count: usize,
926 /// Relation type count.
927 pub relation_type_count: usize,
928 /// Property key count.
929 pub property_key_count: usize,
930 /// Projection count.
931 pub projection_count: usize,
932 /// Index count.
933 pub index_count: usize,
934}
935
936impl CatalogSummary {
937 /// Builds a summary from a catalog.
938 ///
939 /// # Performance
940 ///
941 /// This function is `O(catalog entry count)`.
942 #[must_use]
943 pub fn from_catalog(catalog: &Catalog) -> Self {
944 Self {
945 role_count: catalog.roles().count(),
946 label_count: catalog.labels().count(),
947 relation_type_count: catalog.relation_types().count(),
948 property_key_count: catalog.property_keys().count(),
949 projection_count: catalog.projections().count(),
950 index_count: catalog.indexes().count(),
951 }
952 }
953}
954
955/// Reader pin identifying the visible database generation.
956///
957/// # Performance
958///
959/// Copying and comparing a pin is `O(1)`.
960#[derive(Clone, Copy, Debug, Eq, PartialEq)]
961pub struct ReadPin {
962 /// Pinned visible commit sequence.
963 pub visible_commit_seq: CommitSeq,
964 /// Pinned checkpoint generation.
965 pub generation: CheckpointGeneration,
966}
967
968/// Read transaction over a pinned snapshot.
969///
970/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
971/// [`Db`], so it stays valid across a later `begin_write`/`checkpoint` on
972/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
973/// is [`Send`] + [`Sync`] (asserted below).
974///
975/// # Performance
976///
977/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
978/// snapshot through an `Arc`, not by copying.
979pub struct Reader {
980 /// The pinned snapshot this reader observes.
981 snapshot: Arc<Snapshot>,
982}
983
984/// Returns whether a [`Reader::neighbors`] walk should follow the edge from the
985/// incidence `from` (the queried element's incidence) to the incidence `to`
986/// (a candidate neighbor's incidence) under `direction`.
987///
988/// Endpoint roles are encoded by incidence-creation order: the source endpoint
989/// has the lower incidence id. `Outgoing` follows source→target (the queried
990/// element is the source, so `from < to`), `Incoming` follows target→source, and
991/// `Both` follows either side.
992///
993/// # Performance
994///
995/// This function is `O(1)`.
996const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
997 match direction {
998 Direction::Outgoing => from.get() < to.get(),
999 Direction::Incoming => from.get() > to.get(),
1000 Direction::Both => true,
1001 }
1002}
1003
1004/// `Reader` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
1005/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
1006const fn assert_send_sync<T: Send + Sync>() {}
1007const _: () = assert_send_sync::<Reader>();
1008const _: () = assert_send_sync::<Arc<Snapshot>>();
1009
1010impl Reader {
1011 /// Returns this transaction's reader pin.
1012 ///
1013 /// # Performance
1014 ///
1015 /// This method is `O(1)`.
1016 #[must_use]
1017 pub fn pin(&self) -> ReadPin {
1018 ReadPin {
1019 visible_commit_seq: self.snapshot.lsn(),
1020 generation: self.snapshot.generation(),
1021 }
1022 }
1023
1024 /// Returns catalog metadata.
1025 ///
1026 /// # Performance
1027 ///
1028 /// This method is `O(1)`.
1029 #[must_use]
1030 pub fn catalog(&self) -> &Catalog {
1031 self.snapshot.view().catalog_ref()
1032 }
1033
1034 /// Returns visible element count.
1035 ///
1036 /// # Performance
1037 ///
1038 /// This method is `O(base + overlay change)`.
1039 #[must_use]
1040 pub fn element_count(&self) -> usize {
1041 self.snapshot.view().element_count()
1042 }
1043
1044 /// Returns visible relation count.
1045 ///
1046 /// # Performance
1047 ///
1048 /// This method is `O(base + overlay change)`.
1049 #[must_use]
1050 pub fn relation_count(&self) -> usize {
1051 self.snapshot.view().relation_count()
1052 }
1053
1054 /// Returns visible incidence count.
1055 ///
1056 /// # Performance
1057 ///
1058 /// This method is `O(base + overlay change)`.
1059 #[must_use]
1060 pub fn incidence_count(&self) -> usize {
1061 self.snapshot.view().incidence_count()
1062 }
1063
1064 /// Returns every visible element id in id order.
1065 ///
1066 /// # Performance
1067 ///
1068 /// This method is `O(element count)`.
1069 #[must_use]
1070 pub fn element_ids(&self) -> Vec<ElementId> {
1071 self.snapshot
1072 .view()
1073 .elements()
1074 .map(|record| record.id)
1075 .collect()
1076 }
1077
1078 /// Returns every visible relation id in id order.
1079 ///
1080 /// # Performance
1081 ///
1082 /// This method is `O(relation count)`.
1083 #[must_use]
1084 pub fn relation_ids(&self) -> Vec<RelationId> {
1085 self.snapshot
1086 .view()
1087 .relations()
1088 .map(|record| record.id)
1089 .collect()
1090 }
1091
1092 /// Returns whether an element exists.
1093 ///
1094 /// # Performance
1095 ///
1096 /// This method is `O(log change + log n)`.
1097 #[must_use]
1098 pub fn contains_element(&self, id: ElementId) -> bool {
1099 self.snapshot.view().contains_element(id)
1100 }
1101
1102 /// Returns whether a relation exists.
1103 ///
1104 /// # Performance
1105 ///
1106 /// This method is `O(log change + log n)`.
1107 #[must_use]
1108 pub fn contains_relation(&self, id: RelationId) -> bool {
1109 self.snapshot.view().contains_relation(id)
1110 }
1111
1112 /// Returns whether an incidence exists.
1113 ///
1114 /// # Performance
1115 ///
1116 /// This method is `O(log change + log n)`.
1117 #[must_use]
1118 pub fn contains_incidence(&self, id: IncidenceId) -> bool {
1119 self.snapshot.view().contains_incidence(id)
1120 }
1121
1122 /// Returns an owned element view — id, labels, and all properties read in one
1123 /// call.
1124 ///
1125 /// # Performance
1126 ///
1127 /// This method is `O(log n + label count + property count)`.
1128 #[must_use]
1129 pub fn element(&self, id: ElementId) -> Option<Element> {
1130 let view = self.snapshot.view();
1131 let record = view.element_ref(id)?;
1132 let labels = record.labels.iter().copied().collect();
1133 let properties =
1134 Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
1135 Some(Element::new(id, labels, properties))
1136 }
1137
1138 /// Returns an owned relation view — id, type, labels, and all properties read
1139 /// in one call.
1140 ///
1141 /// # Performance
1142 ///
1143 /// This method is `O(log n + label count + property count)`.
1144 #[must_use]
1145 pub fn relation(&self, id: RelationId) -> Option<Relation> {
1146 let view = self.snapshot.view();
1147 let record = view.relation_ref(id)?;
1148 let labels = record.labels.iter().copied().collect();
1149 let properties =
1150 Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
1151 Some(Relation::new(id, record.relation_type, labels, properties))
1152 }
1153
1154 /// Returns an owned incidence record.
1155 ///
1156 /// # Performance
1157 ///
1158 /// This method is `O(log change + log n)`.
1159 #[must_use]
1160 pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
1161 self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
1162 }
1163
1164 /// Returns every visible incidence attached to an element, in ascending
1165 /// incidence-id order.
1166 ///
1167 /// The merged set mixes overlay-owned and base-borrowed records, so this
1168 /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
1169 /// cheap).
1170 ///
1171 /// # Performance
1172 ///
1173 /// This method is `O(base incidences + overlay incidence change)`.
1174 #[must_use]
1175 pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1176 self.snapshot.view().element_incidences(id)
1177 }
1178
1179 /// Returns a binary relation's two endpoint elements, ordered by ascending
1180 /// incidence id.
1181 ///
1182 /// Reads the relation's incidences from the reverse-adjacency index and
1183 /// returns the elements carried by its first two incidences in id order. A
1184 /// relation with fewer than two visible incidences returns `None`. This
1185 /// reports endpoints structurally, without consulting any projection's
1186 /// source/target roles — use [`Self::neighbors`] when role direction matters.
1187 ///
1188 /// # Performance
1189 ///
1190 /// This method is `O(degree)` over the relation's incidences.
1191 #[must_use]
1192 pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
1193 let incidences = self.snapshot.view().relation_incidences(relation);
1194 match incidences.as_slice() {
1195 [first, second, ..] => Some((first.element, second.element)),
1196 _too_few => None,
1197 }
1198 }
1199
1200 /// Returns the elements reachable from `element` along relations of
1201 /// `relation_type`, in ascending element-id order.
1202 ///
1203 /// Direction selects the role `element` must play on each relation. Endpoint
1204 /// roles are encoded by incidence-creation order: a binary relation's source
1205 /// is its lower incidence id and its target the higher (see
1206 /// [`Self::endpoints`]). `Outgoing` requires `element` to be the source (and
1207 /// yields the target), `Incoming` requires it to be the target (and yields
1208 /// the source), and `Both` yields the opposite endpoint either way. Resolved
1209 /// over the reverse-adjacency index — each incidence of `element` whose
1210 /// relation has the requested type contributes that relation's other
1211 /// endpoint — so this works for any binary relation without a materialized
1212 /// projection.
1213 ///
1214 /// # Performance
1215 ///
1216 /// This method is `O(degree of element + sum of touched relation degrees)`.
1217 #[must_use]
1218 pub fn neighbors(
1219 &self,
1220 element: ElementId,
1221 relation_type: RelationTypeId,
1222 direction: Direction,
1223 ) -> Vec<ElementId> {
1224 let view = self.snapshot.view();
1225 let mut neighbors = BTreeSet::new();
1226 for incidence in view.element_incidences(element) {
1227 let matches_type = view
1228 .relation_ref(incidence.relation)
1229 .is_some_and(|record| record.relation_type == Some(relation_type));
1230 if !matches_type {
1231 continue;
1232 }
1233 // The incidence id encodes the endpoint role: the source endpoint is
1234 // created first (lower incidence id), the target second. Compare
1235 // `element`'s incidence id against each other endpoint's to decide
1236 // which side `element` is on, then follow per the requested direction.
1237 neighbors.extend(
1238 view.relation_incidences(incidence.relation)
1239 .into_iter()
1240 .filter(|other| other.element != element)
1241 .filter(|other| follow_direction(direction, incidence.id, other.id))
1242 .map(|other| other.element),
1243 );
1244 }
1245 neighbors.into_iter().collect()
1246 }
1247
1248 /// Returns one owned property value.
1249 ///
1250 /// # Performance
1251 ///
1252 /// This method is `O(log subjects + log keys)`.
1253 #[must_use]
1254 pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
1255 self.snapshot
1256 .view()
1257 .property_ref(subject, key)
1258 .map(Cow::into_owned)
1259 }
1260
1261 /// Returns the owned element whose value in `index` equals `value`, or `None`
1262 /// when no element matches.
1263 ///
1264 /// # Errors
1265 ///
1266 /// Returns [`DbError`] when the index is unknown or is not an equality index.
1267 ///
1268 /// # Performance
1269 ///
1270 /// This method is `O(log n + label count + property count)`.
1271 pub fn element_by_key<T: ValueType>(
1272 &self,
1273 index: EqualityIndex<T>,
1274 value: impl Assignable<T>,
1275 ) -> Result<Option<Element>, DbError> {
1276 let value = value.into_value()?;
1277 let matched = self
1278 .lookup(index.id(), Match::Equal(&value))?
1279 .into_iter()
1280 .find_map(|subject| match subject {
1281 PropertySubject::Element(id) => Some(id),
1282 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
1283 });
1284 Ok(matched.and_then(|id| self.element(id)))
1285 }
1286
1287 /// Returns the number of subjects carried by a membership index (a label or
1288 /// relation-type index).
1289 ///
1290 /// # Errors
1291 ///
1292 /// Returns [`DbError`] when the index is unknown or does not support
1293 /// membership enumeration.
1294 ///
1295 /// # Performance
1296 ///
1297 /// This method is `O(indexed family size)`.
1298 pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
1299 self.lookup(index, Match::All)
1300 .map(|subjects| subjects.len())
1301 }
1302
1303 /// Looks up subjects with a property value.
1304 ///
1305 /// # Errors
1306 ///
1307 /// Returns [`DbError`] when the property key is unknown or `value` does not
1308 /// match the key schema.
1309 ///
1310 /// # Performance
1311 ///
1312 /// This method is `O(property subject count)`.
1313 pub fn lookup_property_equal(
1314 &self,
1315 key: PropertyKeyId,
1316 value: &PropertyValue,
1317 ) -> Result<Vec<PropertySubject>, DbError> {
1318 self.snapshot.view().typed_property_equal(key, value)
1319 }
1320
1321 /// Looks up subjects with a property inside an inclusive range.
1322 ///
1323 /// # Errors
1324 ///
1325 /// Returns [`DbError`] when the property key is unknown or either bound
1326 /// does not match the key schema.
1327 ///
1328 /// # Performance
1329 ///
1330 /// This method is `O(property subject count)`.
1331 pub fn lookup_property_range(
1332 &self,
1333 key: PropertyKeyId,
1334 min: &PropertyValue,
1335 max: &PropertyValue,
1336 ) -> Result<Vec<PropertySubject>, DbError> {
1337 self.snapshot.view().typed_property_range(key, min, max)
1338 }
1339
1340 /// Executes an index lookup.
1341 ///
1342 /// # Errors
1343 ///
1344 /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1345 /// match the index kind, or supplied property values do not match catalog
1346 /// schemas.
1347 ///
1348 /// # Performance
1349 ///
1350 /// This method is `O(indexed family size)`.
1351 pub fn lookup(
1352 &self,
1353 index: IndexId,
1354 lookup: Match<'_>,
1355 ) -> Result<Vec<PropertySubject>, DbError> {
1356 let view = self.snapshot.view();
1357 let entry = view
1358 .catalog()
1359 .index(index)
1360 .ok_or(DbError::UnknownIndex { id: index })?;
1361 match (&entry.definition, lookup) {
1362 (IndexDefinition::Label { label }, Match::All) => Ok(view
1363 .elements_with_label(*label)
1364 .into_iter()
1365 .map(PropertySubject::Element)
1366 .collect()),
1367 (IndexDefinition::Label { .. }, _lookup) => {
1368 Err(DbError::unsupported("label index expects all lookup"))
1369 }
1370 (IndexDefinition::RelationType { relation_type }, Match::All) => Ok(view
1371 .relations_with_type(*relation_type)
1372 .into_iter()
1373 .map(PropertySubject::Relation)
1374 .collect()),
1375 (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1376 "relation type index expects all lookup",
1377 )),
1378 (IndexDefinition::PropertyEquality { key }, Match::Equal(value)) => {
1379 view.typed_property_equal(*key, value)
1380 }
1381 (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1382 "property equality index expects equality lookup",
1383 )),
1384 (IndexDefinition::PropertyRange { key }, Match::Range { min, max }) => {
1385 view.typed_property_range(*key, min, max)
1386 }
1387 (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1388 "property range index expects range lookup",
1389 )),
1390 (IndexDefinition::CompositeEquality { keys }, Match::Composite(values)) => {
1391 view.typed_property_composite_equal(keys, values)
1392 }
1393 (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1394 "composite equality index expects composite equality lookup",
1395 )),
1396 (IndexDefinition::Projection { projection }, Match::All) => {
1397 self.projection_index_subjects(*projection)
1398 }
1399 (IndexDefinition::Projection { .. }, _lookup) => {
1400 Err(DbError::unsupported("projection index expects all lookup"))
1401 }
1402 }
1403 }
1404
1405 /// Materializes a graph projection.
1406 ///
1407 /// # Errors
1408 ///
1409 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1410 /// fails validation against current topology.
1411 ///
1412 /// # Performance
1413 ///
1414 /// This method is `O(relation count * incidence count)`.
1415 pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1416 let view = self.snapshot.view();
1417 let entry = view
1418 .catalog()
1419 .projection(id)
1420 .ok_or(DbError::UnknownProjection { id })?;
1421 match &entry.definition {
1422 ProjectionDefinition::Graph(definition) => {
1423 projection::GraphProjection::from_state(&view, definition.clone())
1424 }
1425 ProjectionDefinition::Hypergraph(_definition) => {
1426 Err(DbError::invalid_projection("projection is not a graph"))
1427 }
1428 }
1429 }
1430
1431 /// Materializes a graph projection by catalog name.
1432 ///
1433 /// # Errors
1434 ///
1435 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1436 /// fails validation against current topology.
1437 ///
1438 /// # Performance
1439 ///
1440 /// This method is `O(log projection count + relation count * incidence count)`.
1441 pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1442 let id = self
1443 .snapshot
1444 .view()
1445 .catalog()
1446 .projection_id(name)
1447 .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1448 self.graph_projection(id)
1449 }
1450
1451 /// Walks a cataloged graph projection from canonical seed elements,
1452 /// returning the discovered nodes AND the projection edges among them.
1453 ///
1454 /// Nodes are unique canonical elements in BFS first-discovery order; depth is
1455 /// the shortest discovered hop count from any seed. Edges connect two
1456 /// discovered nodes, ordered deterministically and unique by relation, so the
1457 /// [`Subgraph`] never references a node it omitted.
1458 ///
1459 /// # Errors
1460 ///
1461 /// Returns [`DbError`] when the projection is unknown, is not a graph,
1462 /// cannot be materialized, or a seed element is not part of the projection.
1463 ///
1464 /// # Performance
1465 ///
1466 /// This method is `O(relation count * incidence count + visited edges)`.
1467 pub fn walk(
1468 &self,
1469 projection: ProjectionId,
1470 seeds: &[ElementId],
1471 walk: Walk,
1472 ) -> Result<Subgraph, DbError> {
1473 if seeds.is_empty() || walk.limit == 0 {
1474 return Ok(Subgraph::default());
1475 }
1476 let graph = self.graph_projection(projection)?;
1477 traversal::walk_graph_projection(&graph, seeds, walk)
1478 }
1479
1480 /// Materializes a hypergraph projection.
1481 ///
1482 /// # Errors
1483 ///
1484 /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1485 /// or fails validation against current topology.
1486 ///
1487 /// # Performance
1488 ///
1489 /// This method is `O(relation count * incidence count)`.
1490 pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1491 let view = self.snapshot.view();
1492 let entry = view
1493 .catalog()
1494 .projection(id)
1495 .ok_or(DbError::UnknownProjection { id })?;
1496 match &entry.definition {
1497 ProjectionDefinition::Hypergraph(definition) => {
1498 projection::HypergraphProjection::from_state(&view, definition.clone())
1499 }
1500 ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1501 "projection is not a hypergraph",
1502 )),
1503 }
1504 }
1505
1506 /// Executes a prepared query.
1507 ///
1508 /// # Errors
1509 ///
1510 /// Returns [`DbError`] when execution cannot materialize a referenced
1511 /// projection.
1512 ///
1513 /// # Performance
1514 ///
1515 /// This method is `O(plan output + projection build cost when used)`.
1516 pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1517 query.execute(&self.snapshot.view())
1518 }
1519
1520 /// Explains a prepared query.
1521 ///
1522 /// # Performance
1523 ///
1524 /// This method is `O(plan size)`.
1525 #[must_use]
1526 pub fn explain(&self, query: &PreparedQuery) -> String {
1527 query.explain()
1528 }
1529
1530 /// Materializes subjects represented by a projection index.
1531 ///
1532 /// # Errors
1533 ///
1534 /// Returns [`DbError`] when the projection is unknown or cannot be
1535 /// materialized.
1536 ///
1537 /// # Performance
1538 ///
1539 /// This method is `O(relation count * incidence count)`.
1540 fn projection_index_subjects(
1541 &self,
1542 projection: ProjectionId,
1543 ) -> Result<Vec<PropertySubject>, DbError> {
1544 let view = self.snapshot.view();
1545 let entry = view
1546 .catalog()
1547 .projection(projection)
1548 .ok_or(DbError::UnknownProjection { id: projection })?;
1549 match &entry.definition {
1550 ProjectionDefinition::Graph(definition) => {
1551 Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1552 }
1553 ProjectionDefinition::Hypergraph(definition) => Ok(
1554 projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1555 ),
1556 }
1557 }
1558}
1559
1560/// Single writer transaction.
1561///
1562/// Mutations accumulate into a private write overlay layered over the parent
1563/// snapshot; reads fall through the overlay then the base. `commit` appends the
1564/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1565/// `rollback` drops the overlay and appends nothing.
1566///
1567/// # Performance
1568///
1569/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1570pub struct Writer<'db> {
1571 /// Db receiving the commit.
1572 database: &'db mut Db,
1573 /// Parent snapshot the writer layers over (its base + frozen overlay).
1574 parent: Arc<Snapshot>,
1575 /// Private mutable delta this writer accumulates.
1576 delta: WriteOverlay,
1577 /// Writer transaction id (session-local until a dirty commit makes it
1578 /// durable).
1579 transaction_id: TransactionId,
1580 /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1581 /// transaction ends (on `rollback`, or on any early-return error path); a
1582 /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1583 /// triggered auto-checkpoint can re-acquire it.
1584 lock: WriterLock,
1585}
1586
1587impl Writer<'_> {
1588 /// Registers a structural incidence role.
1589 ///
1590 /// # Errors
1591 ///
1592 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1593 ///
1594 /// # Performance
1595 ///
1596 /// This method is `O(log role count + name length)`.
1597 pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1598 self.delta.register_role(name.into())
1599 }
1600
1601 /// Registers an element or relation label.
1602 ///
1603 /// # Errors
1604 ///
1605 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1606 ///
1607 /// # Performance
1608 ///
1609 /// This method is `O(log label count + name length)`.
1610 pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1611 self.delta.register_label(name.into())
1612 }
1613
1614 /// Registers a relation type.
1615 ///
1616 /// # Errors
1617 ///
1618 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1619 ///
1620 /// # Performance
1621 ///
1622 /// This method is `O(log relation type count + name length)`.
1623 pub fn register_relation_type(
1624 &mut self,
1625 name: impl Into<String>,
1626 ) -> Result<RelationTypeId, DbError> {
1627 self.delta.register_relation_type(name.into())
1628 }
1629
1630 /// Registers a typed property key.
1631 ///
1632 /// # Errors
1633 ///
1634 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1635 ///
1636 /// # Performance
1637 ///
1638 /// This method is `O(log property key count + name length)`.
1639 pub fn register_property_key(
1640 &mut self,
1641 name: impl Into<String>,
1642 family: PropertyFamily,
1643 value_type: PropertyType,
1644 ) -> Result<PropertyKeyId, DbError> {
1645 self.delta
1646 .register_property_key(name.into(), family, value_type)
1647 }
1648
1649 /// Defines a physical projection.
1650 ///
1651 /// # Errors
1652 ///
1653 /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1654 /// projection name already exists, or ID allocation fails.
1655 ///
1656 /// # Performance
1657 ///
1658 /// This method is `O(definition size + catalog lookup cost)`.
1659 pub fn define_projection(
1660 &mut self,
1661 definition: ProjectionDefinition,
1662 ) -> Result<ProjectionId, DbError> {
1663 self.validate_projection_definition(&definition)?;
1664 self.delta.register_projection(definition)
1665 }
1666
1667 /// Defines an index.
1668 ///
1669 /// # Errors
1670 ///
1671 /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1672 /// name already exists, or ID allocation fails.
1673 ///
1674 /// # Performance
1675 ///
1676 /// This method is `O(definition size + catalog lookup cost)`.
1677 pub fn define_index(
1678 &mut self,
1679 name: impl Into<String>,
1680 definition: IndexDefinition,
1681 ) -> Result<IndexId, DbError> {
1682 self.validate_index_definition(&definition)?;
1683 self.delta.register_index(name.into(), definition)
1684 }
1685
1686 /// Applies a declarative [`Schema`] idempotently (register-or-get every
1687 /// declared item), returning the resolved [`Bound`] handle bag. Re-applying
1688 /// the same schema reuses existing ids; a name that already exists with a
1689 /// conflicting shape is a [`DbError::SchemaConflict`].
1690 ///
1691 /// # Errors
1692 ///
1693 /// Returns [`DbError`] on a shape conflict, an undeclared referenced name (an
1694 /// index's key, a projection's role/type), or id-allocation failure.
1695 ///
1696 /// # Performance
1697 ///
1698 /// This method is `O(declared items × log catalog)`.
1699 pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
1700 let mut bound = Bound::default();
1701 for name in &schema.roles {
1702 let id = match self.merged().catalog().role_id(name) {
1703 Some(id) => id,
1704 None => self.register_role(name.clone())?,
1705 };
1706 bound.roles.insert(name.clone(), id);
1707 }
1708 for name in &schema.labels {
1709 let id = match self.merged().catalog().label_id(name) {
1710 Some(id) => id,
1711 None => self.register_label(name.clone())?,
1712 };
1713 bound.labels.insert(name.clone(), id);
1714 }
1715 for name in &schema.relation_types {
1716 let id = match self.merged().catalog().relation_type_id(name) {
1717 Some(id) => id,
1718 None => self.register_relation_type(name.clone())?,
1719 };
1720 bound.relation_types.insert(name.clone(), id);
1721 }
1722 for (name, family, value_type) in &schema.keys {
1723 let id = self.register_key_or_get(name, *family, *value_type)?;
1724 bound.keys.insert(name.clone(), (id, *value_type));
1725 }
1726 for (name, key_name) in &schema.equality_indexes {
1727 let (key_id, value_type) =
1728 *bound
1729 .keys
1730 .get(key_name)
1731 .ok_or_else(|| DbError::UnknownName {
1732 kind: "property key",
1733 name: key_name.clone(),
1734 })?;
1735 let id = match self.merged().catalog().index_id(name) {
1736 Some(id) => id,
1737 None => self.define_index(
1738 name.clone(),
1739 IndexDefinition::PropertyEquality { key: key_id },
1740 )?,
1741 };
1742 bound
1743 .equality_indexes
1744 .insert(name.clone(), (id, value_type));
1745 }
1746 for spec in &schema.graph_projections {
1747 let id = match self.merged().catalog().projection_id(&spec.name) {
1748 Some(id) => id,
1749 None => self.define_graph_projection(spec, &bound)?,
1750 };
1751 bound.projections.insert(spec.name.clone(), id);
1752 }
1753 Ok(bound)
1754 }
1755
1756 /// Registers a property key, or returns the existing id when the name is
1757 /// already present with a matching family and value type.
1758 ///
1759 /// # Errors
1760 ///
1761 /// Returns [`DbError::SchemaConflict`] when the name exists with a different
1762 /// family or value type.
1763 ///
1764 /// # Performance
1765 ///
1766 /// This method is `O(log catalog)`.
1767 fn register_key_or_get(
1768 &mut self,
1769 name: &str,
1770 family: PropertyFamily,
1771 value_type: PropertyType,
1772 ) -> Result<PropertyKeyId, DbError> {
1773 let Some(existing) = self.merged().catalog().property_key_id(name) else {
1774 return self.register_property_key(name.to_owned(), family, value_type);
1775 };
1776 let matches = self
1777 .merged()
1778 .catalog()
1779 .property_key(existing)
1780 .is_some_and(|def| def.family == family && def.value_type == value_type);
1781 if matches {
1782 Ok(existing)
1783 } else {
1784 Err(DbError::SchemaConflict {
1785 name: name.to_owned(),
1786 reason: "property key family/value type differs from the existing catalog entry",
1787 })
1788 }
1789 }
1790
1791 /// Defines a graph projection from a spec, resolving its relation-type and
1792 /// role names through `bound`.
1793 ///
1794 /// # Errors
1795 ///
1796 /// Returns [`DbError::UnknownName`] when a referenced role/type is unbound, or
1797 /// a definition error.
1798 ///
1799 /// # Performance
1800 ///
1801 /// This method is `O(relation-type count × log catalog)`.
1802 fn define_graph_projection(
1803 &mut self,
1804 spec: &GraphProjectionSpec,
1805 bound: &Bound,
1806 ) -> Result<ProjectionId, DbError> {
1807 let mut relation_types = BTreeSet::new();
1808 for name in &spec.relation_types {
1809 relation_types.insert(bound.relation_type(name)?);
1810 }
1811 let source_role = bound.role(&spec.source_role)?;
1812 let target_role = bound.role(&spec.target_role)?;
1813 self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
1814 name: spec.name.clone(),
1815 relation_types,
1816 source_role,
1817 target_role,
1818 }))
1819 }
1820
1821 /// Creates a canonical element.
1822 ///
1823 /// # Errors
1824 ///
1825 /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1826 ///
1827 /// # Performance
1828 ///
1829 /// This method is `O(log element change)`.
1830 pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1831 self.delta.create_element()
1832 }
1833
1834 /// Creates a canonical relation.
1835 ///
1836 /// # Errors
1837 ///
1838 /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1839 ///
1840 /// # Performance
1841 ///
1842 /// This method is `O(log relation change)`.
1843 pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1844 self.delta.create_relation()
1845 }
1846
1847 /// Creates a canonical incidence.
1848 ///
1849 /// # Errors
1850 ///
1851 /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1852 /// exhausted.
1853 ///
1854 /// # Performance
1855 ///
1856 /// This method is `O(log incidence change + reference lookup cost)`.
1857 pub fn create_incidence(
1858 &mut self,
1859 relation: RelationId,
1860 element: ElementId,
1861 role: RoleId,
1862 ) -> Result<IncidenceId, DbError> {
1863 self.require_relation(relation)?;
1864 self.require_element(element)?;
1865 self.require_role(role)?;
1866 self.delta.create_incidence(relation, element, role)
1867 }
1868
1869 /// Tombstones a canonical element and its incidences.
1870 ///
1871 /// # Errors
1872 ///
1873 /// Returns [`DbError::UnknownElement`] when the element is not visible.
1874 ///
1875 /// # Performance
1876 ///
1877 /// This method is `O(log n + degree)` via the reverse-adjacency index.
1878 pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
1879 self.require_element(id)?;
1880 // Cascade: every incidence on the element — resolved in O(log n + degree)
1881 // through the reverse-adjacency index, not a full incidence scan — is
1882 // tombstoned too.
1883 let incidences: Vec<IncidenceId> = self
1884 .merged()
1885 .element_incidences(id)
1886 .into_iter()
1887 .map(|record| record.id)
1888 .collect();
1889 let base = self.parent.base_records();
1890 self.delta.tombstone_element(base, id);
1891 for incidence in incidences {
1892 self.delta
1893 .tombstone_incidence(self.parent.base_records(), incidence);
1894 }
1895 Ok(())
1896 }
1897
1898 /// Tombstones a canonical relation and its incidences.
1899 ///
1900 /// # Errors
1901 ///
1902 /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
1903 ///
1904 /// # Performance
1905 ///
1906 /// This method is `O(log n + degree)` via the reverse-adjacency index.
1907 pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
1908 self.require_relation(id)?;
1909 // Cascade: every incidence in the relation — resolved in O(log n + degree)
1910 // through the reverse-adjacency index, not a full incidence scan.
1911 let incidences: Vec<IncidenceId> = self
1912 .merged()
1913 .relation_incidences(id)
1914 .into_iter()
1915 .map(|record| record.id)
1916 .collect();
1917 let base = self.parent.base_records();
1918 self.delta.tombstone_relation(base, id);
1919 for incidence in incidences {
1920 self.delta
1921 .tombstone_incidence(self.parent.base_records(), incidence);
1922 }
1923 Ok(())
1924 }
1925
1926 /// Tombstones a canonical incidence.
1927 ///
1928 /// # Errors
1929 ///
1930 /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
1931 ///
1932 /// # Performance
1933 ///
1934 /// This method is `O(log incidence change)`.
1935 pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
1936 self.require_incidence(id)?;
1937 self.delta
1938 .tombstone_incidence(self.parent.base_records(), id);
1939 Ok(())
1940 }
1941
1942 /// Adds a label to an element.
1943 ///
1944 /// # Errors
1945 ///
1946 /// Returns [`DbError`] when the element or label is unknown.
1947 ///
1948 /// # Performance
1949 ///
1950 /// This method is `O(log element change + log label count)`.
1951 pub(crate) fn add_element_label(
1952 &mut self,
1953 element: ElementId,
1954 label: LabelId,
1955 ) -> Result<(), DbError> {
1956 self.require_element(element)?;
1957 self.require_label(label)?;
1958 self.delta
1959 .add_element_label(self.parent.base_records(), element, label);
1960 Ok(())
1961 }
1962
1963 /// Adds a label to a relation.
1964 ///
1965 /// # Errors
1966 ///
1967 /// Returns [`DbError`] when the relation or label is unknown.
1968 ///
1969 /// # Performance
1970 ///
1971 /// This method is `O(log relation change + log label count)`.
1972 pub(crate) fn add_relation_label(
1973 &mut self,
1974 relation: RelationId,
1975 label: LabelId,
1976 ) -> Result<(), DbError> {
1977 self.require_relation(relation)?;
1978 self.require_label(label)?;
1979 self.delta
1980 .add_relation_label(self.parent.base_records(), relation, label);
1981 Ok(())
1982 }
1983
1984 /// Sets a relation type.
1985 ///
1986 /// # Errors
1987 ///
1988 /// Returns [`DbError`] when the relation or relation type is unknown.
1989 ///
1990 /// # Performance
1991 ///
1992 /// This method is `O(log relation change + log relation type count)`.
1993 pub fn set_relation_type(
1994 &mut self,
1995 relation: RelationId,
1996 relation_type: RelationTypeId,
1997 ) -> Result<(), DbError> {
1998 self.require_relation(relation)?;
1999 self.require_relation_type(relation_type)?;
2000 self.delta
2001 .set_relation_type(self.parent.base_records(), relation, relation_type);
2002 Ok(())
2003 }
2004
2005 /// Sets a property value.
2006 ///
2007 /// # Errors
2008 ///
2009 /// Returns [`DbError`] when the subject or key is unknown, or the value
2010 /// does not match the key schema.
2011 ///
2012 /// # Performance
2013 ///
2014 /// This method is `O(log subject change + log key count)`.
2015 pub(crate) fn set_property(
2016 &mut self,
2017 subject: PropertySubject,
2018 key: PropertyKeyId,
2019 value: PropertyValue,
2020 ) -> Result<(), DbError> {
2021 // Referential integrity: the subject must be visible (this rejects an
2022 // orphan property against a tombstoned/absent subject at the transaction
2023 // boundary — the overlay layer is permissive by design).
2024 self.require_subject(subject)?;
2025 let definition = self
2026 .merged()
2027 .catalog()
2028 .property_key(key)
2029 .cloned()
2030 .ok_or(DbError::UnknownPropertyKey { id: key })?;
2031 if definition.family != subject.family() {
2032 return Err(DbError::WrongPropertyFamily {
2033 expected: definition.family,
2034 actual: subject.family(),
2035 });
2036 }
2037 if definition.value_type != value.value_type() {
2038 return Err(DbError::PropertyTypeMismatch {
2039 expected: definition.value_type,
2040 actual: value.value_type(),
2041 });
2042 }
2043 self.delta
2044 .set_property(self.parent.base_records(), subject, key, value);
2045 Ok(())
2046 }
2047
2048 /// Removes a property value.
2049 ///
2050 /// # Errors
2051 ///
2052 /// Returns [`DbError`] when the subject or key is unknown.
2053 ///
2054 /// # Performance
2055 ///
2056 /// This method is `O(log subject change + log key count)`.
2057 pub(crate) fn remove_property(
2058 &mut self,
2059 subject: PropertySubject,
2060 key: PropertyKeyId,
2061 ) -> Result<(), DbError> {
2062 self.require_subject(subject)?;
2063 if self.merged().catalog().property_key(key).is_none() {
2064 return Err(DbError::UnknownPropertyKey { id: key });
2065 }
2066 self.delta
2067 .remove_property(self.parent.base_records(), subject, key);
2068 Ok(())
2069 }
2070
2071 /// Resolves the property key an equality index covers.
2072 ///
2073 /// # Errors
2074 ///
2075 /// Returns [`DbError::UnknownIndex`] when `index` is unknown, or an
2076 /// unsupported-query error when it is not a property-equality index.
2077 ///
2078 /// # Performance
2079 ///
2080 /// This method is `O(log index count)`.
2081 fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
2082 let view = self.merged();
2083 let entry = view
2084 .catalog()
2085 .index(index)
2086 .ok_or(DbError::UnknownIndex { id: index })?;
2087 match &entry.definition {
2088 IndexDefinition::PropertyEquality { key } => Ok(*key),
2089 _other => Err(DbError::unsupported(
2090 "reconcile requires a property-equality index",
2091 )),
2092 }
2093 }
2094
2095 /// Inserts or updates the element whose value under `index` equals `value`,
2096 /// returning its canonical id — reused when an element already carries that
2097 /// identity value (id stable across reconcile), freshly minted (a never-reused
2098 /// id, with the identity property set) otherwise.
2099 ///
2100 /// # Errors
2101 ///
2102 /// Returns [`DbError`] when `index` is not an equality index or the value
2103 /// type mismatches the key schema.
2104 ///
2105 /// # Performance
2106 ///
2107 /// This method is `O(log n + value length)` — a probe plus, on a miss, a mint.
2108 pub fn upsert_element<T: ValueType>(
2109 &mut self,
2110 index: EqualityIndex<T>,
2111 value: impl Assignable<T>,
2112 ) -> Result<ElementId, DbError> {
2113 let value = value.into_value()?;
2114 let key = self.equality_index_key(index.id())?;
2115 let existing = self
2116 .merged()
2117 .property_equal(key, &value)
2118 .into_iter()
2119 .find_map(|subject| match subject {
2120 PropertySubject::Element(id) => Some(id),
2121 PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
2122 });
2123 if let Some(id) = existing {
2124 return Ok(id);
2125 }
2126 let element = self.create_element()?;
2127 self.set_property(PropertySubject::Element(element), key, value)?;
2128 Ok(element)
2129 }
2130
2131 /// Inserts or updates the relation whose value under `index` equals `value`,
2132 /// returning its canonical id. On a miss it mints the relation, sets its type
2133 /// and identity property, and creates one incidence per `(element, role)`
2134 /// endpoint; on a hit the existing relation (with its endpoints) is reused
2135 /// unchanged — the identity value encodes the endpoints, so they are immutable.
2136 ///
2137 /// # Errors
2138 ///
2139 /// Returns [`DbError`] when `index` is not an equality index, the value type
2140 /// mismatches, or an endpoint element does not exist.
2141 ///
2142 /// # Performance
2143 ///
2144 /// This method is `O(log n + endpoints)` — a probe plus, on a miss, a mint.
2145 pub fn upsert_relation<T: ValueType>(
2146 &mut self,
2147 index: EqualityIndex<T>,
2148 value: impl Assignable<T>,
2149 relation_type: RelationTypeId,
2150 endpoints: &[(ElementId, RoleId)],
2151 ) -> Result<RelationId, DbError> {
2152 let value = value.into_value()?;
2153 let key = self.equality_index_key(index.id())?;
2154 let existing = self
2155 .merged()
2156 .property_equal(key, &value)
2157 .into_iter()
2158 .find_map(|subject| match subject {
2159 PropertySubject::Relation(id) => Some(id),
2160 PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
2161 });
2162 if let Some(id) = existing {
2163 return Ok(id);
2164 }
2165 let relation = self.create_relation()?;
2166 self.set_relation_type(relation, relation_type)?;
2167 self.set_property(PropertySubject::Relation(relation), key, value)?;
2168 for (element, role) in endpoints {
2169 self.create_incidence(relation, *element, *role)?;
2170 }
2171 Ok(relation)
2172 }
2173
2174 /// Tombstones every subject carried by `index` whose identity value is NOT in
2175 /// `keep`, cascading each subject's incidences in `O(degree)` via the
2176 /// reverse-adjacency index. The prune half of a reconcile: after upserting
2177 /// every desired subject, `retain` removes the vanished complement.
2178 ///
2179 /// # Errors
2180 ///
2181 /// Returns [`DbError`] when `index` is not an equality index or a `keep` value
2182 /// type mismatches the key schema.
2183 ///
2184 /// # Performance
2185 ///
2186 /// This method is `O(family size + removed × degree)`.
2187 pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
2188 &mut self,
2189 index: EqualityIndex<T>,
2190 keep: &[V],
2191 ) -> Result<(), DbError> {
2192 let key = self.equality_index_key(index.id())?;
2193 let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
2194 for value in keep {
2195 keep_values.insert((*value).into_value()?);
2196 }
2197 let stale: Vec<PropertySubject> = self
2198 .merged()
2199 .property_key_subjects(key)
2200 .into_iter()
2201 .filter(|(_subject, value)| !keep_values.contains(value))
2202 .map(|(subject, _value)| subject)
2203 .collect();
2204 for subject in stale {
2205 match subject {
2206 PropertySubject::Element(id) => self.tombstone_element(id)?,
2207 PropertySubject::Relation(id) => self.tombstone_relation(id)?,
2208 PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
2209 }
2210 }
2211 Ok(())
2212 }
2213
2214 /// Sets a typed property on a subject; the value type is checked at compile
2215 /// time against the key.
2216 ///
2217 /// # Errors
2218 ///
2219 /// Returns [`DbError`] when the subject is absent, the value is out of range,
2220 /// or the value type mismatches the key schema.
2221 ///
2222 /// # Performance
2223 ///
2224 /// This method is `O(log change + log keys)`.
2225 pub fn set<T: ValueType>(
2226 &mut self,
2227 subject: impl Into<PropertySubject>,
2228 key: Key<T>,
2229 value: impl Assignable<T>,
2230 ) -> Result<(), DbError> {
2231 self.set_property(subject.into(), key.id(), value.into_value()?)
2232 }
2233
2234 /// Removes a typed property from a subject.
2235 ///
2236 /// # Errors
2237 ///
2238 /// Returns [`DbError`] when the subject is absent or the key is unknown.
2239 ///
2240 /// # Performance
2241 ///
2242 /// This method is `O(log change + log keys)`.
2243 pub fn unset<T: ValueType>(
2244 &mut self,
2245 subject: impl Into<PropertySubject>,
2246 key: Key<T>,
2247 ) -> Result<(), DbError> {
2248 self.remove_property(subject.into(), key.id())
2249 }
2250
2251 /// Adds a label to an element or relation subject.
2252 ///
2253 /// # Errors
2254 ///
2255 /// Returns [`DbError`] when the subject is absent, the label is unknown, or
2256 /// the subject is an incidence (incidences carry no labels).
2257 ///
2258 /// # Performance
2259 ///
2260 /// This method is `O(log change + log labels)`.
2261 pub fn add_label(
2262 &mut self,
2263 subject: impl Into<PropertySubject>,
2264 label: LabelId,
2265 ) -> Result<(), DbError> {
2266 match subject.into() {
2267 PropertySubject::Element(id) => self.add_element_label(id, label),
2268 PropertySubject::Relation(id) => self.add_relation_label(id, label),
2269 PropertySubject::Incidence(_) => {
2270 Err(DbError::unsupported("incidences do not carry labels"))
2271 }
2272 }
2273 }
2274
2275 /// Tombstones any subject by id, cascading a relation's or element's
2276 /// incidences in `O(degree)` via the reverse-adjacency index.
2277 ///
2278 /// # Errors
2279 ///
2280 /// Returns [`DbError`] when the subject is not visible.
2281 ///
2282 /// # Performance
2283 ///
2284 /// This method is `O(log change + degree)`.
2285 pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
2286 match subject.into() {
2287 PropertySubject::Element(id) => self.tombstone_element(id),
2288 PropertySubject::Relation(id) => self.tombstone_relation(id),
2289 PropertySubject::Incidence(id) => self.tombstone_incidence(id),
2290 }
2291 }
2292
2293 /// Commits this write transaction durably.
2294 ///
2295 /// A non-dirty commit returns the parent's commit sequence without appending
2296 /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
2297 /// log into one WAL frame (with the watermark op last), appends it with an
2298 /// fsync (truncating back to the captured EOF on any write error so no
2299 /// interior torn record survives), THEN folds the delta into a fresh
2300 /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
2301 ///
2302 /// After publishing, a dirty commit consults the configured
2303 /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
2304 /// re-acquire it), then folds when the delta-log has outgrown the base. The
2305 /// committed frame is already durable, so an auto-fold failure does not lose
2306 /// data; it is surfaced to the caller.
2307 ///
2308 /// # Errors
2309 ///
2310 /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
2311 /// durable append, or a triggered auto-checkpoint fold fails.
2312 ///
2313 /// # Performance
2314 ///
2315 /// This method is `O(change)` for the dirty path — flat as the base grows.
2316 /// The publish step shares the parent snapshot's already-materialized
2317 /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
2318 /// folds, so the base is byte-identical within the generation), so it neither
2319 /// re-decodes the base nor rebuilds the index. A triggered fold adds
2320 /// `O(visible state bytes)` on top.
2321 pub(crate) fn commit(self) -> Result<CommitSeq, DbError> {
2322 if self.delta.is_empty() {
2323 // Non-dirty commit: no append, no publish, no durable id advance.
2324 return Ok(self.parent.lsn());
2325 }
2326 let lsn = self
2327 .parent
2328 .lsn()
2329 .checked_next()
2330 .ok_or(DbError::CommitSeqOverflow)?;
2331 let (ops, blob) = self.delta.encode_frame();
2332 let frame = wal::encode_commit(
2333 lsn.get(),
2334 self.transaction_id.get(),
2335 self.database.base_generation,
2336 &ops,
2337 &blob,
2338 )?;
2339 let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
2340 wal::append_commit(&mut log, &frame)?;
2341
2342 // Durable: the delta was seeded from the parent overlay and only added
2343 // this writer's changes, so freezing it directly is the full new
2344 // published overlay (parent state + this commit). The parent overlay was
2345 // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
2346 // pinning the parent is unaffected.
2347 let new_overlay = Arc::new(self.delta.freeze());
2348 // A commit never folds, so the new snapshot pins the SAME base generation
2349 // as the parent — the base wire bytes are byte-identical, and so are the
2350 // owned records and the derived index built from them. Share the parent's
2351 // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
2352 // and rebuilding the index, which keeps a single-element commit `O(change)`
2353 // rather than `O(base)` regardless of how large the base has grown.
2354 let snapshot = Snapshot::with_shared_base_records(
2355 self.parent.generation(),
2356 lsn,
2357 Arc::clone(self.parent.base()),
2358 new_overlay,
2359 Arc::clone(self.parent.base_records()),
2360 );
2361 self.database.current = Arc::new(snapshot);
2362 self.database.last_transaction_id = self.transaction_id;
2363 // Release the writer lock before any auto-fold so the fold can re-acquire
2364 // it (a partial move out of `self`, legal because `Writer` has
2365 // no `Drop` impl; the remaining `&mut Db` borrow stays live).
2366 drop(self.lock);
2367 self.database.maybe_auto_checkpoint()?;
2368 Ok(lsn)
2369 }
2370
2371 /// Returns the merged read view this writer sees (overlay over base).
2372 ///
2373 /// # Performance
2374 ///
2375 /// This method is `O(1)` to construct.
2376 fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
2377 crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
2378 }
2379
2380 /// Requires an element to be visible in the writer's merged view.
2381 ///
2382 /// # Errors
2383 ///
2384 /// Returns [`DbError::UnknownElement`] when absent.
2385 ///
2386 /// # Performance
2387 ///
2388 /// This method is `O(log change + log n)`.
2389 fn require_element(&self, id: ElementId) -> Result<(), DbError> {
2390 if self.merged().contains_element(id) {
2391 Ok(())
2392 } else {
2393 Err(DbError::UnknownElement { id })
2394 }
2395 }
2396
2397 /// Requires a relation to be visible.
2398 ///
2399 /// # Errors
2400 ///
2401 /// Returns [`DbError::UnknownRelation`] when absent.
2402 ///
2403 /// # Performance
2404 ///
2405 /// This method is `O(log change + log n)`.
2406 fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
2407 if self.merged().contains_relation(id) {
2408 Ok(())
2409 } else {
2410 Err(DbError::UnknownRelation { id })
2411 }
2412 }
2413
2414 /// Requires an incidence to be visible.
2415 ///
2416 /// # Errors
2417 ///
2418 /// Returns [`DbError::UnknownIncidence`] when absent.
2419 ///
2420 /// # Performance
2421 ///
2422 /// This method is `O(log change + log n)`.
2423 fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
2424 if self.merged().contains_incidence(id) {
2425 Ok(())
2426 } else {
2427 Err(DbError::UnknownIncidence { id })
2428 }
2429 }
2430
2431 /// Requires a role to exist in the merged catalog.
2432 ///
2433 /// # Errors
2434 ///
2435 /// Returns [`DbError::UnknownRole`] when absent.
2436 ///
2437 /// # Performance
2438 ///
2439 /// This method is `O(log role count)`.
2440 fn require_role(&self, id: RoleId) -> Result<(), DbError> {
2441 if self.delta.catalog().role(id).is_some() {
2442 Ok(())
2443 } else {
2444 Err(DbError::UnknownRole { id })
2445 }
2446 }
2447
2448 /// Requires a label to exist in the merged catalog.
2449 ///
2450 /// # Errors
2451 ///
2452 /// Returns [`DbError::UnknownLabel`] when absent.
2453 ///
2454 /// # Performance
2455 ///
2456 /// This method is `O(log label count)`.
2457 fn require_label(&self, id: LabelId) -> Result<(), DbError> {
2458 if self.delta.catalog().label(id).is_some() {
2459 Ok(())
2460 } else {
2461 Err(DbError::UnknownLabel { id })
2462 }
2463 }
2464
2465 /// Requires a relation type to exist in the merged catalog.
2466 ///
2467 /// # Errors
2468 ///
2469 /// Returns [`DbError::UnknownRelationType`] when absent.
2470 ///
2471 /// # Performance
2472 ///
2473 /// This method is `O(log relation type count)`.
2474 fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
2475 if self.delta.catalog().relation_type(id).is_some() {
2476 Ok(())
2477 } else {
2478 Err(DbError::UnknownRelationType { id })
2479 }
2480 }
2481
2482 /// Requires a property subject to be visible.
2483 ///
2484 /// # Errors
2485 ///
2486 /// Returns the matching `Unknown*` error when the subject is absent.
2487 ///
2488 /// # Performance
2489 ///
2490 /// This method is `O(log change + log n)`.
2491 fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
2492 match subject {
2493 PropertySubject::Element(id) => self.require_element(id),
2494 PropertySubject::Relation(id) => self.require_relation(id),
2495 PropertySubject::Incidence(id) => self.require_incidence(id),
2496 }
2497 }
2498
2499 /// Validates one projection definition against the merged catalog.
2500 ///
2501 /// # Errors
2502 ///
2503 /// Returns [`DbError`] when a referenced role or relation type is unknown.
2504 ///
2505 /// # Performance
2506 ///
2507 /// This method is `O(definition size)`.
2508 fn validate_projection_definition(
2509 &self,
2510 definition: &ProjectionDefinition,
2511 ) -> Result<(), DbError> {
2512 match definition {
2513 ProjectionDefinition::Graph(graph) => {
2514 self.require_role(graph.source_role)?;
2515 self.require_role(graph.target_role)?;
2516 for relation_type in &graph.relation_types {
2517 self.require_relation_type(*relation_type)?;
2518 }
2519 Ok(())
2520 }
2521 ProjectionDefinition::Hypergraph(hyper) => {
2522 for role in &hyper.source_roles {
2523 self.require_role(*role)?;
2524 }
2525 for role in &hyper.target_roles {
2526 self.require_role(*role)?;
2527 }
2528 for relation_type in &hyper.relation_types {
2529 self.require_relation_type(*relation_type)?;
2530 }
2531 Ok(())
2532 }
2533 }
2534 }
2535
2536 /// Validates one index definition against the merged catalog.
2537 ///
2538 /// # Errors
2539 ///
2540 /// Returns [`DbError`] when a referenced catalog id is unknown or a
2541 /// composite index has no keys.
2542 ///
2543 /// # Performance
2544 ///
2545 /// This method is `O(definition size)`.
2546 fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
2547 let catalog = self.delta.catalog();
2548 match definition {
2549 IndexDefinition::Label { label } => self.require_label(*label),
2550 IndexDefinition::RelationType { relation_type } => {
2551 self.require_relation_type(*relation_type)
2552 }
2553 IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
2554 self.require_property_key(*key)
2555 }
2556 IndexDefinition::CompositeEquality { keys } => {
2557 if keys.is_empty() {
2558 return Err(DbError::unsupported(
2559 "composite equality index requires at least one key",
2560 ));
2561 }
2562 for key in keys {
2563 self.require_property_key(*key)?;
2564 }
2565 Ok(())
2566 }
2567 IndexDefinition::Projection { projection } => catalog
2568 .projection(*projection)
2569 .is_some()
2570 .then_some(())
2571 .ok_or(DbError::UnknownProjection { id: *projection }),
2572 }
2573 }
2574
2575 /// Requires a property key to exist in the merged catalog.
2576 ///
2577 /// # Errors
2578 ///
2579 /// Returns [`DbError::UnknownPropertyKey`] when absent.
2580 ///
2581 /// # Performance
2582 ///
2583 /// This method is `O(log property key count)`.
2584 fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
2585 if self.delta.catalog().property_key(id).is_some() {
2586 Ok(())
2587 } else {
2588 Err(DbError::UnknownPropertyKey { id })
2589 }
2590 }
2591}
2592
2593#[cfg(test)]
2594#[cfg(not(miri))]
2595mod tests {
2596 use std::{
2597 path::PathBuf,
2598 sync::atomic::{AtomicU64, Ordering},
2599 };
2600
2601 use super::*;
2602
2603 /// Per-process path counter for unique temporary store directories.
2604 static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
2605
2606 /// Returns a unique temporary store path and removes any prior contents.
2607 fn temp_store(name: &str) -> PathBuf {
2608 let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
2609 let path =
2610 std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
2611 let _ = std::fs::remove_dir_all(&path);
2612 path
2613 }
2614
2615 /// Manual measurement harness (run with
2616 /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2617 /// --nocapture`): builds a folded base at roughly the measured-problem scale
2618 /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2619 /// Open must be dominated by the record decode + page faults, NOT the
2620 /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2621 /// Number of element/relation records the open-latency harness builds and the
2622 /// number of timed open runs it averages.
2623 #[cfg(not(debug_assertions))]
2624 const OPEN_LATENCY_ELEMENTS: usize = 100_000;
2625 /// Relations the open-latency harness builds (each with two incidences).
2626 #[cfg(not(debug_assertions))]
2627 const OPEN_LATENCY_RELATIONS: usize = 320_000;
2628 /// Timed open runs the open-latency harness averages.
2629 #[cfg(not(debug_assertions))]
2630 const OPEN_LATENCY_RUNS: u32 = 5;
2631
2632 /// Populates `database` with `OPEN_LATENCY_ELEMENTS` ranked elements and
2633 /// `OPEN_LATENCY_RELATIONS` weighted relations (two incidences each).
2634 #[cfg(not(debug_assertions))]
2635 fn populate_large_store(database: &mut Db) {
2636 database.set_checkpoint_policy(CheckpointPolicy::Manual);
2637 database
2638 .write(|writer| {
2639 let rank = writer.register_property_key(
2640 "rank",
2641 PropertyFamily::Element,
2642 PropertyType::Integer,
2643 )?;
2644 let weight = writer.register_property_key(
2645 "weight",
2646 PropertyFamily::Relation,
2647 PropertyType::Integer,
2648 )?;
2649 let role = writer.register_role("party")?;
2650 let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
2651 for index in 0..OPEN_LATENCY_ELEMENTS {
2652 let element = writer.create_element()?;
2653 writer.set_property(
2654 PropertySubject::Element(element),
2655 rank,
2656 PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
2657 )?;
2658 elements.push(element);
2659 }
2660 for index in 0..OPEN_LATENCY_RELATIONS {
2661 let relation = writer.create_relation()?;
2662 writer.set_property(
2663 PropertySubject::Relation(relation),
2664 weight,
2665 PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
2666 )?;
2667 let source = elements[index % OPEN_LATENCY_ELEMENTS];
2668 let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
2669 writer.create_incidence(relation, source, role)?;
2670 writer.create_incidence(relation, target, role)?;
2671 }
2672 Ok(())
2673 })
2674 .expect("populate");
2675 // Fold everything into the base so open pays the base term, not log replay.
2676 database.compact().expect("compact");
2677 }
2678
2679 /// Mean elapsed time of `OPEN_LATENCY_RUNS` full `Db::open` calls on `path`.
2680 #[cfg(not(debug_assertions))]
2681 fn mean_open_ms(path: &std::path::Path) -> f64 {
2682 let mut total = std::time::Duration::ZERO;
2683 for _run in 0..OPEN_LATENCY_RUNS {
2684 let start = std::time::Instant::now();
2685 let opened = Db::open(path).expect("timed open");
2686 total += start.elapsed();
2687 drop(opened);
2688 }
2689 total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2690 }
2691
2692 /// Mean elapsed time of the prior design's open-time heavy work — record
2693 /// decode + `from_records` index rebuild (`BaseRecords::from_view`) — over
2694 /// `OPEN_LATENCY_RUNS` runs, the BEFORE proxy for the borrowed open.
2695 #[cfg(not(debug_assertions))]
2696 fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
2697 let superblock = wal::read_superblock(path).expect("superblock");
2698 let base_path = path.join(base_file(superblock.base_generation.get()));
2699 let mut total = std::time::Duration::ZERO;
2700 for _run in 0..OPEN_LATENCY_RUNS {
2701 let base = Base::open(&base_path, false).expect("base open");
2702 let start = std::time::Instant::now();
2703 let records =
2704 crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
2705 total += start.elapsed();
2706 drop(records);
2707 drop(base);
2708 }
2709 total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
2710 }
2711
2712 /// Manual measurement harness (run with
2713 /// `cargo test -p oxgraph-db --release -- --ignored open_latency_large_base
2714 /// --nocapture`): builds a folded base at roughly the measured-problem scale
2715 /// (>=100k elements, >=300k relations, properties), then times `Db::open`.
2716 /// Open must be dominated by the record decode + page faults, NOT the
2717 /// `O(base)` index rebuild the prior design paid — the index is borrowed.
2718 /// Debug builds skip it (the open-time `debug_assert!` differential check
2719 /// would itself rebuild the index and skew the timing); run in `--release`.
2720 #[test]
2721 #[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
2722 #[cfg(not(debug_assertions))]
2723 fn open_latency_large_base() {
2724 let path = temp_store("open-latency");
2725 let mut database = Db::create(&path).expect("create");
2726 populate_large_store(&mut database);
2727 drop(database);
2728
2729 let _warm = Db::open(&path).expect("warm open");
2730 let after_ms = mean_open_ms(&path);
2731 let before_ms = mean_old_from_view_ms(&path);
2732
2733 println!(
2734 "open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
2735 {OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
2736 OPEN_LATENCY_RELATIONS * 2,
2737 OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
2738 );
2739 println!(" BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
2740 println!(" AFTER full Db::open (decode + BORROWED index): {after_ms:.1} ms / open");
2741
2742 let _ = std::fs::remove_dir_all(&path);
2743 }
2744
2745 #[test]
2746 fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
2747 let path = temp_store("reconcile");
2748 let mut database = Db::create(&path).expect("create");
2749 let index = {
2750 let mut writer = database.begin_write().expect("begin write");
2751 let key = writer
2752 .register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
2753 .expect("key");
2754 let index = writer
2755 .define_index(
2756 "element_stable_key_eq",
2757 IndexDefinition::PropertyEquality { key },
2758 )
2759 .expect("index");
2760 writer.commit().expect("commit schema");
2761 index
2762 };
2763 let eq = EqualityIndex::<crate::Text>::from_id(index);
2764
2765 let (a1, b1) = {
2766 let mut writer = database.begin_write().expect("begin write");
2767 let a = writer.upsert_element(eq, "a").expect("upsert a");
2768 let b = writer.upsert_element(eq, "b").expect("upsert b");
2769 writer.commit().expect("commit");
2770 (a, b)
2771 };
2772
2773 let (a2, c1) = {
2774 let mut writer = database.begin_write().expect("begin write");
2775 let a = writer.upsert_element(eq, "a").expect("re-upsert a");
2776 let c = writer.upsert_element(eq, "c").expect("upsert c");
2777 writer.retain(eq, &["a", "c"]).expect("retain");
2778 writer.commit().expect("commit");
2779 (a, c)
2780 };
2781
2782 assert_eq!(a1, a2, "an unchanged identity reuses its element id");
2783 assert_ne!(c1, a1);
2784 assert_ne!(c1, b1);
2785
2786 let read = database.reader();
2787 assert!(read.contains_element(a1), "kept a");
2788 assert!(read.contains_element(c1), "kept c");
2789 assert!(!read.contains_element(b1), "retain tombstoned b");
2790 assert_eq!(
2791 read.element_by_key(eq, "a")
2792 .expect("lookup a")
2793 .map(|element| element.id),
2794 Some(a1)
2795 );
2796 assert!(
2797 read.element_by_key(eq, "b").expect("lookup b").is_none(),
2798 "b is not resolvable after the prune"
2799 );
2800 let _ = std::fs::remove_dir_all(&path);
2801 }
2802
2803 #[test]
2804 fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
2805 let path = temp_store("write-closure");
2806 let mut database = Db::create(&path).expect("create");
2807
2808 // Ok with a change → committed; the read closure observes it.
2809 let (id, outcome) = database
2810 .write(|writer| {
2811 let id = writer.create_element()?;
2812 Ok(id)
2813 })
2814 .expect("write");
2815 assert!(matches!(outcome, CommitOutcome::Committed(_)));
2816 database
2817 .read(|read| {
2818 assert!(read.contains_element(id));
2819 Ok(())
2820 })
2821 .expect("read");
2822
2823 // A no-op write reports Empty (no frame appended).
2824 let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
2825 assert_eq!(outcome, CommitOutcome::Empty);
2826
2827 // An Err from the closure rolls back the staged delta.
2828 let before = database
2829 .read(|read| Ok(read.element_count()))
2830 .expect("count");
2831 let result = database.write(|writer| {
2832 writer.create_element()?;
2833 Err::<(), DbError>(DbError::EmptyQuery)
2834 });
2835 assert!(result.is_err());
2836 let after = database
2837 .read(|read| Ok(read.element_count()))
2838 .expect("count");
2839 assert_eq!(before, after, "the failed write staged nothing durable");
2840
2841 let _ = std::fs::remove_dir_all(&path);
2842 }
2843
2844 #[test]
2845 fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
2846 // The reconcile/reindex contract: re-asserting a property's existing value
2847 // must log NO mutation, so an incremental reconcile that re-sets every
2848 // property of every unchanged subject stays O(change). Without the no-op
2849 // gate the commit logs the whole graph every reindex.
2850 let path = temp_store("set-noop");
2851 let mut database = Db::create(&path).expect("create");
2852 let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
2853
2854 // Create an element and set its name (a real change → committed).
2855 let id = database
2856 .write(|writer| {
2857 let bound = writer.apply_schema(&schema)?;
2858 let name = bound.key::<crate::Text>("name")?;
2859 let id = writer.create_element()?;
2860 writer.set(id, name, "alpha")?;
2861 Ok(id)
2862 })
2863 .expect("first write")
2864 .0;
2865
2866 // Re-asserting the SAME value mutates nothing → the commit is Empty.
2867 let ((), outcome) = database
2868 .write(|writer| {
2869 let bound = writer.apply_schema(&schema)?;
2870 let name = bound.key::<crate::Text>("name")?;
2871 writer.set(id, name, "alpha")?;
2872 Ok(())
2873 })
2874 .expect("idempotent set");
2875 assert_eq!(
2876 outcome,
2877 CommitOutcome::Empty,
2878 "re-setting the same property value must log no mutation"
2879 );
2880
2881 // Setting a DIFFERENT value is a real change → committed, and visible.
2882 let ((), outcome) = database
2883 .write(|writer| {
2884 let bound = writer.apply_schema(&schema)?;
2885 let name = bound.key::<crate::Text>("name")?;
2886 writer.set(id, name, "beta")?;
2887 Ok(())
2888 })
2889 .expect("changed set");
2890 assert!(matches!(outcome, CommitOutcome::Committed(_)));
2891 let name = database
2892 .bind(&schema)
2893 .expect("bind")
2894 .key::<crate::Text>("name")
2895 .expect("name key");
2896 let value = database
2897 .read(|read| {
2898 Ok(read
2899 .element(id)
2900 .and_then(|element| element.properties().get::<crate::Text, String>(name)))
2901 })
2902 .expect("read");
2903 assert_eq!(value.as_deref(), Some("beta"));
2904
2905 let _ = std::fs::remove_dir_all(&path);
2906 }
2907
2908 #[test]
2909 fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
2910 let path = temp_store("schema");
2911 let mut database = Db::create(&path).expect("create");
2912 let schema = Schema::new()
2913 .label("function")
2914 .key::<crate::Text>("name", PropertyFamily::Element)
2915 .equality_index("name_eq", "name");
2916
2917 // First apply registers the catalog and upserts two elements by identity.
2918 let (alpha, beta) = database
2919 .write(|writer| {
2920 let bound = writer.apply_schema(&schema)?;
2921 let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
2922 let function = bound.label("function")?;
2923 let alpha = writer.upsert_element(name_eq, "alpha")?;
2924 writer.add_label(alpha, function)?;
2925 let beta = writer.upsert_element(name_eq, "beta")?;
2926 Ok((alpha, beta))
2927 })
2928 .expect("apply + write")
2929 .0;
2930 assert_ne!(alpha, beta);
2931
2932 // Re-applying the same schema is idempotent: nothing new registers, so the
2933 // commit is empty.
2934 let (_bound, outcome) = database
2935 .write(|writer| writer.apply_schema(&schema))
2936 .expect("re-apply");
2937 assert_eq!(
2938 outcome,
2939 CommitOutcome::Empty,
2940 "re-applying a schema registers nothing new"
2941 );
2942
2943 // bind() resolves the schema read-only on a reopened store; the typed
2944 // handle round-trips, and a wrong value type is rejected.
2945 let reopened = Db::open(&path).expect("open");
2946 let bound = reopened.bind(&schema).expect("bind");
2947 let name_eq = bound
2948 .equality_index::<crate::Text>("name_eq")
2949 .expect("typed index");
2950 assert!(
2951 bound.equality_index::<crate::Int>("name_eq").is_err(),
2952 "a wrong-value-type handle request is a SchemaConflict"
2953 );
2954 let found = reopened
2955 .read(|read| read.element_by_key(name_eq, "alpha"))
2956 .expect("read")
2957 .expect("alpha present");
2958 assert_eq!(found.id, alpha);
2959
2960 let _ = std::fs::remove_dir_all(&path);
2961 }
2962
2963 /// The exact logical state the crash-matrix asserts recovery preserves: the
2964 /// visible element ids, the rank-keyed property values, and the `Person`
2965 /// label membership.
2966 #[derive(Debug, Eq, PartialEq)]
2967 struct LogicalState {
2968 /// Visible element ids in ascending order.
2969 elements: Vec<ElementId>,
2970 /// Subjects whose `rank` equals each probed value, by value.
2971 rank_eq_500: Vec<PropertySubject>,
2972 /// Element ids carrying the `Person` label.
2973 person_members: Vec<ElementId>,
2974 }
2975
2976 /// Catalog/topology fixture ids returned by [`build_fixture`].
2977 struct Fixture {
2978 /// `rank` integer property key.
2979 rank: PropertyKeyId,
2980 /// `Person` label.
2981 person: LabelId,
2982 }
2983
2984 /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
2985 /// even-indexed ones labelled `Person`. Returns the fixture ids.
2986 fn build_fixture(database: &mut Db) -> Fixture {
2987 let mut writer = database.begin_write().expect("begin write");
2988 let rank = writer
2989 .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
2990 .expect("rank key");
2991 let person = writer.register_label("Person").expect("person label");
2992 for index in 0..8u64 {
2993 let element = writer.create_element().expect("element");
2994 writer
2995 .set(
2996 element,
2997 Key::<crate::Int>::from_id(rank),
2998 i64::try_from(index).expect("index") * 100,
2999 )
3000 .expect("set rank");
3001 if index % 2 == 0 {
3002 writer.add_label(element, person).expect("add label");
3003 }
3004 }
3005 writer.commit().expect("commit fixture");
3006 Fixture { rank, person }
3007 }
3008
3009 /// Reads the logical state through the index-backed read surface.
3010 fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
3011 let read = database.reader();
3012 let elements = read.element_ids();
3013 let rank_eq_500 = read
3014 .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
3015 .expect("rank lookup");
3016 let person_members = read.snapshot.view().elements_with_label(fixture.person);
3017 LogicalState {
3018 elements,
3019 rank_eq_500,
3020 person_members,
3021 }
3022 }
3023
3024 /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
3025 /// `database` mints must take the id one past the current maximum visible
3026 /// element id, i.e. the recovered watermark survived the fold. A regression
3027 /// that dropped the watermark on fold (so the recovered record set is
3028 /// unchanged but the next-id counter reset) would reuse an existing id and
3029 /// fail this assertion — which the unchanged-record-set checks alone miss.
3030 ///
3031 /// The probe element is rolled back, so it does not perturb the logical state
3032 /// the surrounding test re-reads.
3033 fn assert_no_id_reuse_across_fold(database: &mut Db) {
3034 let max_existing = database
3035 .reader()
3036 .element_ids()
3037 .into_iter()
3038 .map(ElementId::get)
3039 .max()
3040 .unwrap_or(0);
3041 let expected = ElementId::new(max_existing + 1);
3042 let mut writer = database.begin_write().expect("watermark probe writer");
3043 let minted = writer.create_element().expect("watermark probe element");
3044 assert_eq!(
3045 minted, expected,
3046 "the next minted id must be one past the max existing id (watermark \
3047 survived the fold; ids are never reused)",
3048 );
3049 // Drop the probe writer so it leaves no trace in the logical state.
3050 drop(writer);
3051 }
3052
3053 /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
3054 /// recovers EXACTLY the correct logical state. After a crash before the
3055 /// superblock lands, the OLD generation stays authoritative (the orphan new
3056 /// base is ignored); after a crash once the superblock names the new
3057 /// generation, the NEW base is authoritative. The completed checkpoint
3058 /// recovers the same logical state from the folded base. In every case the
3059 /// index-backed lookups return the same answers as before the (attempted)
3060 /// fold.
3061 #[test]
3062 fn checkpoint_crash_matrix_recovers_exact_state() {
3063 for stop in [
3064 CheckpointStop::BeforeSuperblock,
3065 CheckpointStop::BeforeRotate,
3066 CheckpointStop::Complete,
3067 ] {
3068 let path = temp_store(&format!("crash-{stop:?}"));
3069 let mut database = Db::create(&path).expect("create");
3070 let fixture = build_fixture(&mut database);
3071 let before = read_logical(&database, &fixture);
3072 let before_generation = database.base_generation;
3073
3074 // Simulate a crash at `stop`: the checkpoint returns right after the
3075 // chosen fsync, leaving the intermediate files in place. We then drop
3076 // the handle (as a crash would) and reopen from disk.
3077 database
3078 .checkpoint_inner(stop)
3079 .expect("checkpoint stop returns ok");
3080 drop(database);
3081
3082 let mut recovered = Db::open(&path).expect("reopen after crash");
3083 let after = read_logical(&recovered, &fixture);
3084 assert_eq!(
3085 after, before,
3086 "crash at {stop:?} must recover the exact logical state",
3087 );
3088
3089 // The recovered watermark survives every crash window: the next minted
3090 // id is one past the max recovered element id, so ids are never reused
3091 // across the (attempted) fold — asserted behaviorally, not merely
3092 // inferred from the unchanged record set.
3093 assert_no_id_reuse_across_fold(&mut recovered);
3094
3095 // Generation expectation per crash window.
3096 match stop {
3097 CheckpointStop::BeforeSuperblock => assert_eq!(
3098 recovered.base_generation, before_generation,
3099 "old superblock stays authoritative before the new one lands",
3100 ),
3101 CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
3102 recovered.base_generation,
3103 before_generation + 1,
3104 "the new superblock names the folded generation",
3105 ),
3106 }
3107
3108 // A second open is idempotent (orphan files from a partial crash do
3109 // not derail a repeat recovery).
3110 let reopened = Db::open(&path).expect("second reopen");
3111 assert_eq!(read_logical(&reopened, &fixture), before);
3112
3113 drop(reopened);
3114 let _ = std::fs::remove_dir_all(&path);
3115 }
3116 }
3117
3118 /// The auto-checkpoint policy folds the delta-log into a fresh base once the
3119 /// log outgrows the base by the configured factor: under a tiny factor, a
3120 /// run of dirty commits advances the live generation (the log was folded),
3121 /// and the logical state is preserved across the fold. The manual policy
3122 /// never auto-folds.
3123 #[test]
3124 fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
3125 // Manual policy: many commits, generation never advances on its own.
3126 let manual_path = temp_store("auto-manual");
3127 let mut manual = Db::create(&manual_path).expect("create manual");
3128 manual.set_checkpoint_policy(CheckpointPolicy::Manual);
3129 let _fixture = build_fixture(&mut manual);
3130 for _ in 0..200 {
3131 let mut writer = manual.begin_write().expect("writer");
3132 writer.create_element().expect("element");
3133 writer.commit().expect("commit");
3134 }
3135 assert_eq!(
3136 manual.live_generation(),
3137 CheckpointGeneration::new(0),
3138 "manual policy must never auto-fold",
3139 );
3140 drop(manual);
3141 let _ = std::fs::remove_dir_all(&manual_path);
3142
3143 // Size-ratio policy with the smallest factor: the log soon outgrows the
3144 // tiny base floor, so a run of commits triggers at least one fold.
3145 let auto_path = temp_store("auto-ratio");
3146 let mut auto = Db::create(&auto_path).expect("create auto");
3147 auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
3148 let fixture = build_fixture(&mut auto);
3149 let before = read_logical(&auto, &fixture);
3150 for _ in 0..400 {
3151 let mut writer = auto.begin_write().expect("writer");
3152 writer.create_element().expect("element");
3153 writer.commit().expect("commit");
3154 }
3155 assert!(
3156 auto.live_generation() > CheckpointGeneration::new(0),
3157 "size-ratio policy must auto-fold once the log outgrows the base",
3158 );
3159 // The pre-existing logical state survives every fold; the policy is also
3160 // surfaced in status and preserved across the fold.
3161 let after = read_logical(&auto, &fixture);
3162 assert_eq!(after.rank_eq_500, before.rank_eq_500);
3163 assert_eq!(after.person_members, before.person_members);
3164 // Ids are never reused across the auto-fold: the next minted id is one
3165 // past the max existing id (the watermark folded into the new base).
3166 assert_no_id_reuse_across_fold(&mut auto);
3167 assert_eq!(
3168 auto.checkpoint_policy(),
3169 CheckpointPolicy::SizeRatio { factor: 1 },
3170 "the auto-fold reopen must preserve the configured policy",
3171 );
3172 // Status surfaces the live generation and the (now small) log size.
3173 let status = auto.stats();
3174 assert_eq!(status.live_generation, auto.live_generation());
3175 assert!(status.base_byte_size > 0, "live base has bytes");
3176 drop(auto);
3177 let _ = std::fs::remove_dir_all(&auto_path);
3178 }
3179}