oxgraph_db/database.rs
1//! Embedded `OxGraph` database engine API.
2//!
3//! This is the integration layer over the base+overlay+WAL core. A [`Database`]
4//! holds the current `Arc<Snapshot>` (one immutable base generation plus the
5//! frozen overlay published over it), the open append-only delta-log, and the
6//! recovered id/transaction watermarks. Reads pin the current snapshot in `O(1)`
7//! (`begin_read` clones the `Arc`); writes layer a fresh [`WriteOverlay`] over
8//! the current snapshot, append a WAL frame on commit, and publish a new
9//! snapshot. The whole read/query/projection surface resolves through the merged
10//! [`StateView`] of the pinned snapshot.
11
12use std::{
13 borrow::Cow,
14 path::{Path, PathBuf},
15 sync::Arc,
16};
17
18use crate::{
19 Catalog, CheckpointGeneration, CommitSeq, DbError, ElementId, ElementRecord, GraphProjection,
20 HypergraphProjection, IncidenceId, IncidenceRecord, IndexId, LabelId, PreparedQuery,
21 ProjectionDefinition, ProjectionId, PropertyKeyId, PropertySubject, PropertyType,
22 PropertyValue, QueryLanguage, QueryResult, RelationId, RelationRecord, RelationTypeId, RoleId,
23 TransactionId,
24 backing::Base,
25 catalog::{IndexDefinition, PropertyFamily},
26 freeze::{self, FreezeStamps},
27 lock::WriterLock,
28 overlay::{Overlay, Snapshot, StateView, WriteOverlay},
29 projection,
30 state::NextIds,
31 storage,
32 traversal::{self, TraversalOptions, TraversalResult},
33 wal,
34 wire::SuperblockRecord,
35};
36
37/// Lookup input for a cataloged index.
38///
39/// This type makes index lookup shape explicit: membership indexes accept
40/// [`IndexLookup::All`], single-property indexes accept scalar equality or
41/// range inputs, and composite equality indexes accept an ordered value tuple.
42///
43/// # Performance
44///
45/// Copying this value is `O(1)`.
46#[derive(Clone, Copy, Debug)]
47pub enum IndexLookup<'value> {
48 /// Lookup every subject represented by a membership-style index.
49 All,
50 /// Lookup one scalar equality value.
51 Equal(&'value PropertyValue),
52 /// Lookup one inclusive scalar range.
53 Range {
54 /// Inclusive lower bound.
55 min: &'value PropertyValue,
56 /// Inclusive upper bound.
57 max: &'value PropertyValue,
58 },
59 /// Lookup one ordered composite equality tuple.
60 CompositeEqual(&'value [PropertyValue]),
61}
62
63/// Auto-checkpoint policy: decides when a dirty commit should fold the
64/// delta-log into a fresh base generation, bounding the log tail that recovery
65/// must replay.
66///
67/// The default is size-ratio: trigger when the delta-log grows past `factor`
68/// times the live base size (`factor` configurable). [`CheckpointPolicy::Manual`]
69/// disables auto-triggering entirely (checkpoint only via
70/// [`Database::checkpoint`]/[`Database::compact`]).
71///
72/// # Performance
73///
74/// Copying this value is `O(1)`.
75#[derive(Clone, Copy, Debug, Eq, PartialEq)]
76pub enum CheckpointPolicy {
77 /// Never auto-checkpoint; the caller drives [`Database::checkpoint`].
78 Manual,
79 /// Auto-checkpoint after a dirty commit once the delta-log exceeds `factor`
80 /// times the live base size (a small floor guards a tiny/empty base so the
81 /// gen-0 store does not checkpoint on its first commit).
82 SizeRatio {
83 /// Log-to-base size factor `K`; the log may grow to `K × base` bytes
84 /// before the next dirty commit folds it.
85 factor: u32,
86 },
87}
88
89impl CheckpointPolicy {
90 /// The default auto-checkpoint factor `K`: fold when the delta-log exceeds
91 /// four times the live base size.
92 pub const DEFAULT_FACTOR: u32 = 4;
93
94 /// The base-size floor (bytes) below which the size-ratio policy never fires,
95 /// so a freshly created (near-empty) base is not checkpointed on its first
96 /// commits before it carries meaningful data.
97 const MIN_BASE_BYTES: u64 = 4 * 1024;
98
99 /// Returns whether a delta-log of `log_bytes` over a base of `base_bytes`
100 /// should trigger an auto-checkpoint under this policy.
101 ///
102 /// # Performance
103 ///
104 /// This method is `O(1)`.
105 #[must_use]
106 const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
107 match self {
108 Self::Manual => false,
109 Self::SizeRatio { factor } => {
110 let floor = if base_bytes < Self::MIN_BASE_BYTES {
111 Self::MIN_BASE_BYTES
112 } else {
113 base_bytes
114 };
115 log_bytes > floor.saturating_mul(factor as u64)
116 }
117 }
118 }
119}
120
121impl Default for CheckpointPolicy {
122 /// The default policy: size-ratio with [`CheckpointPolicy::DEFAULT_FACTOR`].
123 ///
124 /// # Performance
125 ///
126 /// This function is `O(1)`.
127 fn default() -> Self {
128 Self::SizeRatio {
129 factor: Self::DEFAULT_FACTOR,
130 }
131 }
132}
133
134/// Builds the base filename for generation `generation`.
135///
136/// # Performance
137///
138/// This function is `O(1)`.
139fn base_file(generation: u64) -> String {
140 format!("base-{generation}.oxgdb")
141}
142
143/// Builds the delta-log filename for generation `generation`.
144///
145/// # Performance
146///
147/// This function is `O(1)`.
148fn delta_file(generation: u64) -> String {
149 format!("delta-{generation}.log")
150}
151
152/// Open OXGDB database handle.
153///
154/// # Performance
155///
156/// Moving a handle is `O(1)`: it moves the current `Arc<Snapshot>` and the open
157/// delta-log handle.
158pub struct Database {
159 /// Root database directory.
160 root: PathBuf,
161 /// The current visible snapshot (base generation + published overlay),
162 /// shared by readers through an atomically reference-counted handle.
163 current: Arc<Snapshot>,
164 /// Live base generation named by the superblock; every delta frame and the
165 /// per-generation log filename carry it.
166 base_generation: u64,
167 /// Last writer transaction id durably recorded (the last dirty commit's id).
168 /// A rollback burns a session-local id above this but does not advance it.
169 last_transaction_id: TransactionId,
170 /// Auto-checkpoint policy consulted after each dirty commit.
171 checkpoint_policy: CheckpointPolicy,
172}
173
174impl Database {
175 /// Creates a new empty OXGDB database at `path`.
176 ///
177 /// The create order is base-0 then empty delta-0.log then the writer lock
178 /// file then the superblock (written LAST as the create-complete marker), so
179 /// a half-created store is detected on open rather than silently opened
180 /// empty.
181 ///
182 /// # Errors
183 ///
184 /// Returns [`DbError::AlreadyExists`] when a store already exists, or
185 /// [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
186 ///
187 /// # Performance
188 ///
189 /// This function is `O(empty base bytes)`.
190 pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
191 let root = path.as_ref().to_path_buf();
192 if root.join(wal::SUPERBLOCK_FILE).exists() {
193 return Err(DbError::AlreadyExists);
194 }
195 // Base-0: an empty merged view (empty base under an empty overlay).
196 let empty_base = crate::overlay::BaseRecords::empty();
197 let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
198 let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
199 let base_bytes = freeze::freeze_view(
200 &view,
201 FreezeStamps {
202 commit_seq: 0,
203 transaction_id: 0,
204 generation: 0,
205 },
206 )?;
207 storage::atomic_write(
208 &root,
209 &root.join(format!("{}.tmp", base_file(0))),
210 &root.join(base_file(0)),
211 &base_bytes,
212 )?;
213 // Empty delta-0.log, durably created.
214 create_empty_log(&root, 0)?;
215 // Superblock is written LAST; its existence is the create-complete marker.
216 write_superblock(&root, 0, 0, 0, 0)?;
217 Self::open(&root)
218 }
219
220 /// Opens an existing OXGDB database, recovering the live frontier from the
221 /// valid prefix of the delta-log replayed over the base named by the
222 /// superblock.
223 ///
224 /// # Errors
225 ///
226 /// Returns [`DbError`] when the store is missing, malformed, or the log is
227 /// corrupt beyond a torn tail.
228 ///
229 /// # Performance
230 ///
231 /// This function is `O(base bytes + log bytes)`.
232 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
233 let root = path.as_ref().to_path_buf();
234 let superblock = wal::read_superblock(&root)?;
235 let generation = superblock.base_generation.get();
236
237 let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
238 let base_records = Arc::new(crate::overlay::BaseRecords::from_view(base.get())?);
239 let base_header = *base.get().header();
240 let base_catalog = base.get().catalog().clone();
241 let base_next = NextIds::from_header(&base_header);
242
243 // Replay the valid prefix of the per-generation delta-log.
244 let log_path = root.join(delta_file(generation));
245 let log_bytes = read_log(&log_path)?;
246 let outcome = wal::replay(generation, &log_bytes)?;
247 // A torn tail truncates the log back to its last-good byte length.
248 if outcome.valid_len < log_bytes.len() {
249 truncate_log(&log_path, outcome.valid_len)?;
250 }
251
252 // Fold the replayed frames into a fresh overlay over the base, deriving
253 // the live frontier (commit_seq/txn_id) from the last good frame.
254 let mut write = WriteOverlay::new(base_next, base_catalog);
255 let mut recovered_next = base_next;
256 let mut last_commit_seq = superblock.commit_seq.get();
257 let mut last_txn = superblock.transaction_id.get();
258 for frame in &outcome.frames {
259 for op in &frame.ops {
260 write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
261 }
262 recovered_next = recovered_next.elementwise_max(write.next_ids());
263 last_commit_seq = frame.lsn;
264 last_txn = last_txn.max(frame.txn_id);
265 }
266 // ids are never reused: the recovered watermark is the elementwise max of
267 // the base header and every replayed frame's watermark.
268 write.set_next_ids(recovered_next);
269 let overlay = Arc::new(write.freeze());
270
271 let snapshot = Arc::new(Snapshot::new(
272 CheckpointGeneration::new(generation),
273 CommitSeq::new(last_commit_seq),
274 base,
275 overlay,
276 )?);
277
278 Ok(Self {
279 root,
280 current: snapshot,
281 base_generation: generation,
282 last_transaction_id: TransactionId::new(last_txn),
283 checkpoint_policy: CheckpointPolicy::default(),
284 })
285 }
286
287 /// Returns the live base generation named by the superblock (the count of
288 /// folds this store has undergone; gen-0 is the freshly created store).
289 ///
290 /// # Performance
291 ///
292 /// This method is `O(1)`.
293 #[must_use]
294 pub const fn live_generation(&self) -> CheckpointGeneration {
295 CheckpointGeneration::new(self.base_generation)
296 }
297
298 /// Returns the configured auto-checkpoint policy.
299 ///
300 /// # Performance
301 ///
302 /// This method is `O(1)`.
303 #[must_use]
304 pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
305 self.checkpoint_policy
306 }
307
308 /// Sets the auto-checkpoint policy consulted after each dirty commit.
309 ///
310 /// # Performance
311 ///
312 /// This method is `O(1)`.
313 pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
314 self.checkpoint_policy = policy;
315 }
316
317 /// Validates the current handle by re-reading the superblock and verifying
318 /// the live base's content CRC.
319 ///
320 /// # Errors
321 ///
322 /// Returns [`DbError`] when the superblock or base fails validation.
323 ///
324 /// # Performance
325 ///
326 /// This method is `O(base bytes)`.
327 pub fn validate(&self) -> Result<(), DbError> {
328 wal::read_superblock(&self.root)?;
329 Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
330 }
331
332 /// Validates an OXGDB database at `path`.
333 ///
334 /// # Errors
335 ///
336 /// Returns [`DbError`] when the store fails to open and recover.
337 ///
338 /// # Performance
339 ///
340 /// This function is `O(base bytes + log bytes)`.
341 pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
342 Self::open(path).map(|_database| ())
343 }
344
345 /// Folds the current base+overlay into a new base generation, rotating the
346 /// delta-log and republishing the superblock (a manual checkpoint).
347 ///
348 /// This is the checkpoint primitive, exposed here so the existing `compact`
349 /// API keeps its "rewrite the store compactly" contract. Auto-triggering is
350 /// configured separately via [`Database::set_checkpoint_policy`].
351 ///
352 /// # Errors
353 ///
354 /// Returns [`DbError`] when encoding, writing, or publishing the new
355 /// generation fails.
356 ///
357 /// # Performance
358 ///
359 /// This method is `O(visible state bytes)`.
360 pub fn compact(&mut self) -> Result<(), DbError> {
361 self.checkpoint()
362 }
363
364 /// Folds the current base+overlay into base-`{g+1}`, creates an empty
365 /// delta-`{g+1}`.log, republishes the superblock naming `g+1` (the
366 /// linearization point), then unlinks the old base and log.
367 ///
368 /// The order is crash-safe: the new base is fully durable BEFORE the
369 /// superblock names it (so a crash before the superblock leaves the OLD
370 /// superblock authoritative and the orphan new base is ignored), and the old
371 /// base/log are unlinked only AFTER the superblock names the new generation
372 /// (so a crash before the unlink leaves the NEW superblock authoritative and
373 /// the orphan old files are ignored). The
374 /// [`crate::wire::SuperblockRecord`] rename is the single linearization point.
375 ///
376 /// # Errors
377 ///
378 /// Returns [`DbError`] when encoding, writing, or publishing fails.
379 ///
380 /// # Performance
381 ///
382 /// This method is `O(visible state bytes)`.
383 pub fn checkpoint(&mut self) -> Result<(), DbError> {
384 self.checkpoint_inner(
385 #[cfg(test)]
386 CheckpointStop::Complete,
387 )
388 }
389
390 /// Crash-safe checkpoint body. Under `#[cfg(test)]` it accepts a
391 /// [`CheckpointStop`] that simulates a crash by returning early right after a
392 /// chosen fsync point, leaving the on-disk files exactly as a real crash
393 /// there would, so the crash-matrix test can reopen and assert recovery.
394 ///
395 /// # Errors
396 ///
397 /// Returns [`DbError`] when encoding, writing, or publishing fails.
398 ///
399 /// # Performance
400 ///
401 /// This method is `O(visible state bytes)`.
402 fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
403 let _lock = WriterLock::acquire(&self.root)?;
404 let next_generation = self
405 .base_generation
406 .checked_add(1)
407 .ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
408 let view = self.current.view();
409 let commit_seq = self.current.lsn().get();
410 let base_bytes = freeze::freeze_view(
411 &view,
412 FreezeStamps {
413 commit_seq,
414 transaction_id: self.last_transaction_id.get(),
415 generation: next_generation,
416 },
417 )?;
418 // (1) write base-{g+1} (temp + fsync + rename + dir-fsync).
419 storage::atomic_write(
420 &self.root,
421 &self
422 .root
423 .join(format!("{}.tmp", base_file(next_generation))),
424 &self.root.join(base_file(next_generation)),
425 &base_bytes,
426 )?;
427 // (2) create empty delta-{g+1}.log (fsync + dir-fsync).
428 create_empty_log(&self.root, next_generation)?;
429 // Crash point A: new base + new log durable, superblock NOT yet
430 // published. The OLD superblock still names `g`, so recovery uses the old
431 // generation; the new base/log are orphans.
432 #[cfg(test)]
433 if matches!(stop, CheckpointStop::BeforeSuperblock) {
434 return Ok(());
435 }
436 // (3) publish the superblock naming g+1 — the linearization point.
437 write_superblock(
438 &self.root,
439 next_generation,
440 commit_seq,
441 commit_seq,
442 self.last_transaction_id.get(),
443 )?;
444 // Crash point B: superblock now names g+1, old base/log NOT yet unlinked.
445 // Recovery uses the new generation; the old base/log are orphans.
446 #[cfg(test)]
447 if matches!(stop, CheckpointStop::BeforeRotate) {
448 return Ok(());
449 }
450 // Re-open over the new generation, then (4) unlink the old base + log.
451 let reopened = Self::open(&self.root)?;
452 let old_generation = self.base_generation;
453 let policy = self.checkpoint_policy;
454 self.current = reopened.current;
455 self.base_generation = reopened.base_generation;
456 self.last_transaction_id = reopened.last_transaction_id;
457 // The reopen reset the policy to the default; restore the caller's.
458 self.checkpoint_policy = policy;
459 let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
460 let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
461 let _ = storage::sync_directory(&self.root);
462 Ok(())
463 }
464
465 /// Auto-checkpoints when the configured [`CheckpointPolicy`] says the
466 /// delta-log has grown too large relative to the base. Called after a dirty
467 /// commit publishes its frame. A failed fold is surfaced so the caller can
468 /// observe it; the committed data is already durable in the log regardless.
469 ///
470 /// # Errors
471 ///
472 /// Returns [`DbError`] when the triggered fold fails.
473 ///
474 /// # Performance
475 ///
476 /// This method is `O(1)` to decide; `O(visible state bytes)` when it folds.
477 fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
478 let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
479 let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
480 if self
481 .checkpoint_policy
482 .should_checkpoint(log_bytes, base_bytes)
483 {
484 self.checkpoint()?;
485 }
486 Ok(())
487 }
488
489 /// Returns operational status for this handle, including the live generation
490 /// count and the on-disk base/delta-log sizes the auto-checkpoint policy
491 /// weighs.
492 ///
493 /// # Performance
494 ///
495 /// This method is `O(visible state)` for the merged counts plus two `stat`
496 /// syscalls for the file sizes.
497 #[must_use]
498 pub fn status(&self) -> DatabaseStatus {
499 let view = self.current.view();
500 DatabaseStatus {
501 visible_commit_seq: self.current.lsn(),
502 last_transaction_id: self.last_transaction_id,
503 live_generation: CheckpointGeneration::new(self.base_generation),
504 base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
505 log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
506 element_count: view.element_count(),
507 relation_count: view.relation_count(),
508 incidence_count: view.incidence_count(),
509 catalog: self.catalog_summary(),
510 }
511 }
512
513 /// Returns a catalog-size summary.
514 ///
515 /// # Performance
516 ///
517 /// This method is `O(catalog entry count)`.
518 #[must_use]
519 pub fn catalog_summary(&self) -> CatalogSummary {
520 CatalogSummary::from_catalog(self.current.view().catalog())
521 }
522
523 /// Starts a read transaction pinned to the current visible snapshot.
524 ///
525 /// # Performance
526 ///
527 /// This method is `O(1)`: the reader clones the current `Arc<Snapshot>` and
528 /// observes a fixed state even across later commits and checkpoints.
529 #[must_use]
530 pub fn begin_read(&self) -> ReadTransaction {
531 ReadTransaction {
532 snapshot: Arc::clone(&self.current),
533 }
534 }
535
536 /// Starts the single writer transaction, acquiring the cross-process writer
537 /// lock for the transaction's lifetime.
538 ///
539 /// # Errors
540 ///
541 /// Returns [`DbError::WriterLockHeld`] when another writer holds the lock or
542 /// [`DbError::TransactionIdOverflow`] when writer ids are exhausted.
543 ///
544 /// # Performance
545 ///
546 /// This method is `O(1)`: the writer layers a fresh empty write overlay over
547 /// the current snapshot.
548 pub fn begin_write(&mut self) -> Result<WriteTransaction<'_>, DbError> {
549 let lock = WriterLock::acquire(&self.root)?;
550 let transaction_id = self
551 .last_transaction_id
552 .checked_next()
553 .ok_or(DbError::TransactionIdOverflow)?;
554 // Burn the id eagerly so it is session-local-visible even on rollback;
555 // it only becomes durable when a dirty commit writes its frame, and a
556 // reopen recovers the durable high-water mark from the log.
557 self.last_transaction_id = transaction_id;
558 let parent = Arc::clone(&self.current);
559 // Seed the writer delta from the parent's published overlay so the
560 // writer reads every committed record; the parent overlay is never
561 // mutated (the seed clones its maps).
562 let delta = WriteOverlay::from_overlay(parent.overlay());
563 Ok(WriteTransaction {
564 database: self,
565 parent,
566 delta,
567 transaction_id,
568 lock,
569 })
570 }
571
572 /// Prepares a query against the current catalog.
573 ///
574 /// # Errors
575 ///
576 /// Returns [`DbError`] when parsing or semantic analysis fails.
577 ///
578 /// # Performance
579 ///
580 /// This method is `O(query length + catalog lookup cost)`.
581 pub fn prepare(&self, language: QueryLanguage, query: &str) -> Result<PreparedQuery, DbError> {
582 PreparedQuery::prepare(language, query, &self.current.view())
583 }
584}
585
586/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
587/// be stat'd (size is advisory — used for status reporting and the
588/// auto-checkpoint heuristic, never for correctness).
589///
590/// # Performance
591///
592/// This function is `O(1)`: one `stat` syscall.
593fn file_len(path: &Path) -> u64 {
594 std::fs::metadata(path).map_or(0, |meta| meta.len())
595}
596
597/// Test-only crash-injection point for [`Database::checkpoint_inner`]: stops the
598/// fold right after a chosen fsync so the crash-matrix test can reopen and assert
599/// the recovered state at each crash window.
600///
601/// The crash-matrix test that constructs the non-`Complete` variants is
602/// `#[cfg(not(miri))]` (it reopens a real store across simulated crashes, which
603/// miri's isolation cannot model), so under miri only `Complete` is constructed
604/// and the other variants are expectedly unused.
605///
606/// # Performance
607///
608/// `perf: unspecified`; a test-only control tag.
609#[cfg(test)]
610#[cfg_attr(
611 miri,
612 expect(
613 dead_code,
614 reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
615 )
616)]
617#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618enum CheckpointStop {
619 /// Run the whole checkpoint (the production path).
620 Complete,
621 /// Stop after the new base + new log are durable, before the superblock is
622 /// published (the old superblock stays authoritative).
623 BeforeSuperblock,
624 /// Stop after the superblock names the new generation, before the old
625 /// base/log are unlinked (the new superblock is authoritative).
626 BeforeRotate,
627}
628
629/// Reads the whole delta-log into memory, treating a missing file as empty.
630///
631/// # Errors
632///
633/// Returns [`DbError::Io`] when the file cannot be read.
634///
635/// # Performance
636///
637/// This function is `O(log bytes)`.
638fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
639 match std::fs::read(path) {
640 Ok(bytes) => Ok(bytes),
641 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
642 Err(error) => Err(DbError::io("read delta-log", error)),
643 }
644}
645
646/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
647/// discarding a torn tail under the open path.
648///
649/// # Errors
650///
651/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
652///
653/// # Performance
654///
655/// This function is `O(1)`.
656fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
657 let file = std::fs::OpenOptions::new()
658 .write(true)
659 .open(path)
660 .map_err(|error| DbError::io("open delta-log for truncate", error))?;
661 let len = u64::try_from(len)
662 .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
663 file.set_len(len)
664 .map_err(|error| DbError::io("truncate delta-log", error))?;
665 file.sync_all()
666 .map_err(|error| DbError::io("sync truncated delta-log", error))
667}
668
669/// Creates an empty per-generation delta-log, fsyncing the file and the
670/// directory entry so the new (empty) log is durable.
671///
672/// # Errors
673///
674/// Returns [`DbError::Io`] when creation or syncing fails.
675///
676/// # Performance
677///
678/// This function is `O(1)`.
679fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
680 let path = root.join(delta_file(generation));
681 let file =
682 std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
683 file.sync_all()
684 .map_err(|error| DbError::io("sync delta-log", error))?;
685 storage::sync_directory(root)
686}
687
688/// Opens the live delta-log for appending (create when absent, read+append).
689///
690/// # Errors
691///
692/// Returns [`DbError::Io`] when the log cannot be opened.
693///
694/// # Performance
695///
696/// This function is `O(1)`.
697fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
698 std::fs::OpenOptions::new()
699 .create(true)
700 .truncate(false)
701 .read(true)
702 .append(true)
703 .open(root.join(delta_file(generation)))
704 .map_err(|error| DbError::io("open delta-log for append", error))
705}
706
707/// Writes the superblock naming `generation` with the given frontier stamps.
708///
709/// # Errors
710///
711/// Returns [`DbError::Io`] when publishing fails.
712///
713/// # Performance
714///
715/// This function is `O(1)`.
716fn write_superblock(
717 root: &Path,
718 generation: u64,
719 checkpoint_lsn: u64,
720 commit_seq: u64,
721 transaction_id: u64,
722) -> Result<(), DbError> {
723 wal::write_superblock(
724 root,
725 &SuperblockRecord {
726 magic: crate::wire::SUPERBLOCK_MAGIC,
727 base_generation: generation.into(),
728 checkpoint_lsn: checkpoint_lsn.into(),
729 log_byte_offset: 0u64.into(),
730 commit_seq: commit_seq.into(),
731 transaction_id: transaction_id.into(),
732 format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
733 flags: 0u32.into(),
734 crc32c: 0u32.into(),
735 pad: 0u32.into(),
736 },
737 )
738}
739
740/// Snapshot of database status.
741///
742/// # Performance
743///
744/// Copying and comparing status is `O(1)`.
745#[derive(Clone, Copy, Debug, Eq, PartialEq)]
746pub struct DatabaseStatus {
747 /// Last visible commit sequence.
748 pub visible_commit_seq: CommitSeq,
749 /// Last writer transaction ID burned by this handle.
750 ///
751 /// This value is durable after a dirty commit and session-local after
752 /// rollback.
753 pub last_transaction_id: TransactionId,
754 /// Live base generation named by the superblock (the count of folds this
755 /// store has undergone; gen-0 is the freshly created store).
756 pub live_generation: CheckpointGeneration,
757 /// On-disk byte size of the live base file.
758 pub base_byte_size: u64,
759 /// On-disk byte size of the live delta-log (the tail recovery replays and
760 /// the auto-checkpoint policy weighs against the base size).
761 pub log_byte_size: u64,
762 /// Visible element count.
763 pub element_count: usize,
764 /// Visible relation count.
765 pub relation_count: usize,
766 /// Visible incidence count.
767 pub incidence_count: usize,
768 /// Catalog-size summary.
769 pub catalog: CatalogSummary,
770}
771
772/// Catalog-size summary.
773///
774/// # Performance
775///
776/// Copying and comparing are `O(1)`.
777#[derive(Clone, Copy, Debug, Eq, PartialEq)]
778pub struct CatalogSummary {
779 /// Role count.
780 pub role_count: usize,
781 /// Label count.
782 pub label_count: usize,
783 /// Relation type count.
784 pub relation_type_count: usize,
785 /// Property key count.
786 pub property_key_count: usize,
787 /// Projection count.
788 pub projection_count: usize,
789 /// Index count.
790 pub index_count: usize,
791}
792
793impl CatalogSummary {
794 /// Builds a summary from a catalog.
795 ///
796 /// # Performance
797 ///
798 /// This function is `O(catalog entry count)`.
799 #[must_use]
800 pub fn from_catalog(catalog: &Catalog) -> Self {
801 Self {
802 role_count: catalog.roles().count(),
803 label_count: catalog.labels().count(),
804 relation_type_count: catalog.relation_types().count(),
805 property_key_count: catalog.property_keys().count(),
806 projection_count: catalog.projections().count(),
807 index_count: catalog.indexes().count(),
808 }
809 }
810}
811
812/// Reader pin identifying the visible database generation.
813///
814/// # Performance
815///
816/// Copying and comparing a pin is `O(1)`.
817#[derive(Clone, Copy, Debug, Eq, PartialEq)]
818pub struct ReadPin {
819 /// Pinned visible commit sequence.
820 pub visible_commit_seq: CommitSeq,
821 /// Pinned checkpoint generation.
822 pub generation: CheckpointGeneration,
823}
824
825/// Read transaction over a pinned snapshot.
826///
827/// A read transaction owns its own `Arc<Snapshot>` and never borrows the
828/// [`Database`], so it stays valid across a later `begin_write`/`checkpoint` on
829/// the same handle (it cloned the snapshot before the write borrowed `&mut`). It
830/// is [`Send`] + [`Sync`] (asserted below).
831///
832/// # Performance
833///
834/// Creating and cloning a read transaction is `O(1)`: it shares the pinned
835/// snapshot through an `Arc`, not by copying.
836pub struct ReadTransaction {
837 /// The pinned snapshot this reader observes.
838 snapshot: Arc<Snapshot>,
839}
840
841/// `ReadTransaction` MUST be `Send + Sync`: it pins only an `Arc<Snapshot>`,
842/// which holds `Arc`-shared `Send + Sync` data (no `Rc`/`RefCell` reachable).
843const fn assert_send_sync<T: Send + Sync>() {}
844const _: () = assert_send_sync::<ReadTransaction>();
845const _: () = assert_send_sync::<Arc<Snapshot>>();
846
847impl ReadTransaction {
848 /// Returns this transaction's reader pin.
849 ///
850 /// # Performance
851 ///
852 /// This method is `O(1)`.
853 #[must_use]
854 pub fn pin(&self) -> ReadPin {
855 ReadPin {
856 visible_commit_seq: self.snapshot.lsn(),
857 generation: self.snapshot.generation(),
858 }
859 }
860
861 /// Returns catalog metadata.
862 ///
863 /// # Performance
864 ///
865 /// This method is `O(1)`.
866 #[must_use]
867 pub fn catalog(&self) -> &Catalog {
868 self.snapshot.view().catalog_ref()
869 }
870
871 /// Returns visible element count.
872 ///
873 /// # Performance
874 ///
875 /// This method is `O(base + overlay change)`.
876 #[must_use]
877 pub fn element_count(&self) -> usize {
878 self.snapshot.view().element_count()
879 }
880
881 /// Returns visible relation count.
882 ///
883 /// # Performance
884 ///
885 /// This method is `O(base + overlay change)`.
886 #[must_use]
887 pub fn relation_count(&self) -> usize {
888 self.snapshot.view().relation_count()
889 }
890
891 /// Returns visible incidence count.
892 ///
893 /// # Performance
894 ///
895 /// This method is `O(base + overlay change)`.
896 #[must_use]
897 pub fn incidence_count(&self) -> usize {
898 self.snapshot.view().incidence_count()
899 }
900
901 /// Returns every visible element id in id order.
902 ///
903 /// # Performance
904 ///
905 /// This method is `O(element count)`.
906 #[must_use]
907 pub fn element_ids(&self) -> Vec<ElementId> {
908 self.snapshot
909 .view()
910 .elements()
911 .map(|record| record.id)
912 .collect()
913 }
914
915 /// Returns every visible relation id in id order.
916 ///
917 /// # Performance
918 ///
919 /// This method is `O(relation count)`.
920 #[must_use]
921 pub fn relation_ids(&self) -> Vec<RelationId> {
922 self.snapshot
923 .view()
924 .relations()
925 .map(|record| record.id)
926 .collect()
927 }
928
929 /// Returns whether an element exists.
930 ///
931 /// # Performance
932 ///
933 /// This method is `O(log change + log n)`.
934 #[must_use]
935 pub fn contains_element(&self, id: ElementId) -> bool {
936 self.snapshot.view().contains_element(id)
937 }
938
939 /// Returns whether a relation exists.
940 ///
941 /// # Performance
942 ///
943 /// This method is `O(log change + log n)`.
944 #[must_use]
945 pub fn contains_relation(&self, id: RelationId) -> bool {
946 self.snapshot.view().contains_relation(id)
947 }
948
949 /// Returns whether an incidence exists.
950 ///
951 /// # Performance
952 ///
953 /// This method is `O(log change + log n)`.
954 #[must_use]
955 pub fn contains_incidence(&self, id: IncidenceId) -> bool {
956 self.snapshot.view().contains_incidence(id)
957 }
958
959 /// Returns an element record, borrowed from the base for a base-only id and
960 /// owned for an overlay-supplied id.
961 ///
962 /// # Performance
963 ///
964 /// This method is `O(log change + log n)`.
965 #[must_use]
966 pub fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>> {
967 self.snapshot.view().element_ref(id)
968 }
969
970 /// Returns a relation record (see [`Self::element`] for the borrow contract).
971 ///
972 /// # Performance
973 ///
974 /// This method is `O(log change + log n)`.
975 #[must_use]
976 pub fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>> {
977 self.snapshot.view().relation_ref(id)
978 }
979
980 /// Returns an incidence record (see [`Self::element`] for the borrow
981 /// contract).
982 ///
983 /// # Performance
984 ///
985 /// This method is `O(log change + log n)`.
986 #[must_use]
987 pub fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>> {
988 self.snapshot.view().incidence_ref(id)
989 }
990
991 /// Returns every visible incidence attached to an element, in ascending
992 /// incidence-id order.
993 ///
994 /// The merged set mixes overlay-owned and base-borrowed records, so this
995 /// returns an owned [`Vec`] ([`IncidenceRecord`] is [`Copy`], so the copy is
996 /// cheap).
997 ///
998 /// # Performance
999 ///
1000 /// This method is `O(base incidences + overlay incidence change)`.
1001 #[must_use]
1002 pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
1003 self.snapshot.view().element_incidences(id)
1004 }
1005
1006 /// Returns one property value (see [`Self::element`] for the borrow
1007 /// contract).
1008 ///
1009 /// # Performance
1010 ///
1011 /// This method is `O(log subjects + log keys)`.
1012 #[must_use]
1013 pub fn property(
1014 &self,
1015 subject: PropertySubject,
1016 key: PropertyKeyId,
1017 ) -> Option<Cow<'_, PropertyValue>> {
1018 self.snapshot.view().property_ref(subject, key)
1019 }
1020
1021 /// Looks up subjects with a property value.
1022 ///
1023 /// # Errors
1024 ///
1025 /// Returns [`DbError`] when the property key is unknown or `value` does not
1026 /// match the key schema.
1027 ///
1028 /// # Performance
1029 ///
1030 /// This method is `O(property subject count)`.
1031 pub fn lookup_property_equal(
1032 &self,
1033 key: PropertyKeyId,
1034 value: &PropertyValue,
1035 ) -> Result<Vec<PropertySubject>, DbError> {
1036 self.snapshot.view().typed_property_equal(key, value)
1037 }
1038
1039 /// Looks up subjects with a property inside an inclusive range.
1040 ///
1041 /// # Errors
1042 ///
1043 /// Returns [`DbError`] when the property key is unknown or either bound
1044 /// does not match the key schema.
1045 ///
1046 /// # Performance
1047 ///
1048 /// This method is `O(property subject count)`.
1049 pub fn lookup_property_range(
1050 &self,
1051 key: PropertyKeyId,
1052 min: &PropertyValue,
1053 max: &PropertyValue,
1054 ) -> Result<Vec<PropertySubject>, DbError> {
1055 self.snapshot.view().typed_property_range(key, min, max)
1056 }
1057
1058 /// Executes an index lookup.
1059 ///
1060 /// # Errors
1061 ///
1062 /// Returns [`DbError`] when the index is unknown, the lookup shape does not
1063 /// match the index kind, or supplied property values do not match catalog
1064 /// schemas.
1065 ///
1066 /// # Performance
1067 ///
1068 /// This method is `O(indexed family size)`.
1069 pub fn lookup_index(
1070 &self,
1071 index: IndexId,
1072 lookup: IndexLookup<'_>,
1073 ) -> Result<Vec<PropertySubject>, DbError> {
1074 let view = self.snapshot.view();
1075 let entry = view
1076 .catalog()
1077 .index(index)
1078 .ok_or(DbError::UnknownIndex { id: index })?;
1079 match (&entry.definition, lookup) {
1080 (IndexDefinition::Label { label }, IndexLookup::All) => Ok(view
1081 .elements_with_label(*label)
1082 .into_iter()
1083 .map(PropertySubject::Element)
1084 .collect()),
1085 (IndexDefinition::Label { .. }, _lookup) => {
1086 Err(DbError::unsupported("label index expects all lookup"))
1087 }
1088 (IndexDefinition::RelationType { relation_type }, IndexLookup::All) => Ok(view
1089 .relations_with_type(*relation_type)
1090 .into_iter()
1091 .map(PropertySubject::Relation)
1092 .collect()),
1093 (IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
1094 "relation type index expects all lookup",
1095 )),
1096 (IndexDefinition::PropertyEquality { key }, IndexLookup::Equal(value)) => {
1097 view.typed_property_equal(*key, value)
1098 }
1099 (IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
1100 "property equality index expects equality lookup",
1101 )),
1102 (IndexDefinition::PropertyRange { key }, IndexLookup::Range { min, max }) => {
1103 view.typed_property_range(*key, min, max)
1104 }
1105 (IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
1106 "property range index expects range lookup",
1107 )),
1108 (IndexDefinition::CompositeEquality { keys }, IndexLookup::CompositeEqual(values)) => {
1109 view.typed_property_composite_equal(keys, values)
1110 }
1111 (IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
1112 "composite equality index expects composite equality lookup",
1113 )),
1114 (IndexDefinition::Projection { projection }, IndexLookup::All) => {
1115 self.projection_index_subjects(*projection)
1116 }
1117 (IndexDefinition::Projection { .. }, _lookup) => {
1118 Err(DbError::unsupported("projection index expects all lookup"))
1119 }
1120 }
1121 }
1122
1123 /// Materializes a graph projection.
1124 ///
1125 /// # Errors
1126 ///
1127 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1128 /// fails validation against current topology.
1129 ///
1130 /// # Performance
1131 ///
1132 /// This method is `O(relation count * incidence count)`.
1133 pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
1134 let view = self.snapshot.view();
1135 let entry = view
1136 .catalog()
1137 .projection(id)
1138 .ok_or(DbError::UnknownProjection { id })?;
1139 match &entry.definition {
1140 ProjectionDefinition::Graph(definition) => {
1141 projection::GraphProjection::from_state(&view, definition.clone())
1142 }
1143 ProjectionDefinition::Hypergraph(_definition) => {
1144 Err(DbError::invalid_projection("projection is not a graph"))
1145 }
1146 }
1147 }
1148
1149 /// Materializes a graph projection by catalog name.
1150 ///
1151 /// # Errors
1152 ///
1153 /// Returns [`DbError`] when the projection is unknown, is not a graph, or
1154 /// fails validation against current topology.
1155 ///
1156 /// # Performance
1157 ///
1158 /// This method is `O(log projection count + relation count * incidence count)`.
1159 pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
1160 let id = self
1161 .snapshot
1162 .view()
1163 .catalog()
1164 .projection_id(name)
1165 .ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
1166 self.graph_projection(id)
1167 }
1168
1169 /// Traverses a cataloged graph projection from canonical seed elements.
1170 ///
1171 /// Rows are unique canonical elements in BFS first-discovery order. Depth is
1172 /// the shortest discovered hop count from any seed.
1173 ///
1174 /// # Errors
1175 ///
1176 /// Returns [`DbError`] when the projection is unknown, is not a graph,
1177 /// cannot be materialized, or a seed element is not part of the projection.
1178 ///
1179 /// # Performance
1180 ///
1181 /// This method is `O(relation count * incidence count + visited edges)`.
1182 pub fn traverse_graph(
1183 &self,
1184 projection: ProjectionId,
1185 seeds: &[ElementId],
1186 options: TraversalOptions,
1187 ) -> Result<TraversalResult, DbError> {
1188 if seeds.is_empty() || options.limit == 0 {
1189 return Ok(TraversalResult::new(Vec::new()));
1190 }
1191 let graph = self.graph_projection(projection)?;
1192 traversal::traverse_graph_projection(&graph, seeds, options)
1193 }
1194
1195 /// Materializes a hypergraph projection.
1196 ///
1197 /// # Errors
1198 ///
1199 /// Returns [`DbError`] when the projection is unknown, is not a hypergraph,
1200 /// or fails validation against current topology.
1201 ///
1202 /// # Performance
1203 ///
1204 /// This method is `O(relation count * incidence count)`.
1205 pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
1206 let view = self.snapshot.view();
1207 let entry = view
1208 .catalog()
1209 .projection(id)
1210 .ok_or(DbError::UnknownProjection { id })?;
1211 match &entry.definition {
1212 ProjectionDefinition::Hypergraph(definition) => {
1213 projection::HypergraphProjection::from_state(&view, definition.clone())
1214 }
1215 ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
1216 "projection is not a hypergraph",
1217 )),
1218 }
1219 }
1220
1221 /// Executes a prepared query.
1222 ///
1223 /// # Errors
1224 ///
1225 /// Returns [`DbError`] when execution cannot materialize a referenced
1226 /// projection.
1227 ///
1228 /// # Performance
1229 ///
1230 /// This method is `O(plan output + projection build cost when used)`.
1231 pub fn execute(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
1232 query.execute(&self.snapshot.view())
1233 }
1234
1235 /// Explains a prepared query.
1236 ///
1237 /// # Performance
1238 ///
1239 /// This method is `O(plan size)`.
1240 #[must_use]
1241 pub fn explain(&self, query: &PreparedQuery) -> String {
1242 query.explain()
1243 }
1244
1245 /// Materializes subjects represented by a projection index.
1246 ///
1247 /// # Errors
1248 ///
1249 /// Returns [`DbError`] when the projection is unknown or cannot be
1250 /// materialized.
1251 ///
1252 /// # Performance
1253 ///
1254 /// This method is `O(relation count * incidence count)`.
1255 fn projection_index_subjects(
1256 &self,
1257 projection: ProjectionId,
1258 ) -> Result<Vec<PropertySubject>, DbError> {
1259 let view = self.snapshot.view();
1260 let entry = view
1261 .catalog()
1262 .projection(projection)
1263 .ok_or(DbError::UnknownProjection { id: projection })?;
1264 match &entry.definition {
1265 ProjectionDefinition::Graph(definition) => {
1266 Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
1267 }
1268 ProjectionDefinition::Hypergraph(definition) => Ok(
1269 projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
1270 ),
1271 }
1272 }
1273}
1274
1275/// Single writer transaction.
1276///
1277/// Mutations accumulate into a private write overlay layered over the parent
1278/// snapshot; reads fall through the overlay then the base. `commit` appends the
1279/// overlay's mutation log to the WAL (when dirty) and publishes a fresh snapshot;
1280/// `rollback` drops the overlay and appends nothing.
1281///
1282/// # Performance
1283///
1284/// Creating and moving a writer is `O(1)`; each mutation is `O(log change)`.
1285pub struct WriteTransaction<'db> {
1286 /// Database receiving the commit.
1287 database: &'db mut Database,
1288 /// Parent snapshot the writer layers over (its base + frozen overlay).
1289 parent: Arc<Snapshot>,
1290 /// Private mutable delta this writer accumulates.
1291 delta: WriteOverlay,
1292 /// Writer transaction id (session-local until a dirty commit makes it
1293 /// durable).
1294 transaction_id: TransactionId,
1295 /// Held single-writer advisory lock. Its [`Drop`] releases the lock when this
1296 /// transaction ends (on `rollback`, or on any early-return error path); a
1297 /// successful dirty [`Self::commit`] releases it explicitly with `drop` so a
1298 /// triggered auto-checkpoint can re-acquire it.
1299 lock: WriterLock,
1300}
1301
1302impl WriteTransaction<'_> {
1303 /// Registers a structural incidence role.
1304 ///
1305 /// # Errors
1306 ///
1307 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1308 ///
1309 /// # Performance
1310 ///
1311 /// This method is `O(log role count + name length)`.
1312 pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
1313 self.delta.register_role(name.into())
1314 }
1315
1316 /// Registers an element or relation label.
1317 ///
1318 /// # Errors
1319 ///
1320 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1321 ///
1322 /// # Performance
1323 ///
1324 /// This method is `O(log label count + name length)`.
1325 pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
1326 self.delta.register_label(name.into())
1327 }
1328
1329 /// Registers a relation type.
1330 ///
1331 /// # Errors
1332 ///
1333 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1334 ///
1335 /// # Performance
1336 ///
1337 /// This method is `O(log relation type count + name length)`.
1338 pub fn register_relation_type(
1339 &mut self,
1340 name: impl Into<String>,
1341 ) -> Result<RelationTypeId, DbError> {
1342 self.delta.register_relation_type(name.into())
1343 }
1344
1345 /// Registers a typed property key.
1346 ///
1347 /// # Errors
1348 ///
1349 /// Returns [`DbError`] when the name already exists or ID allocation fails.
1350 ///
1351 /// # Performance
1352 ///
1353 /// This method is `O(log property key count + name length)`.
1354 pub fn register_property_key(
1355 &mut self,
1356 name: impl Into<String>,
1357 family: PropertyFamily,
1358 value_type: PropertyType,
1359 ) -> Result<PropertyKeyId, DbError> {
1360 self.delta
1361 .register_property_key(name.into(), family, value_type)
1362 }
1363
1364 /// Defines a physical projection.
1365 ///
1366 /// # Errors
1367 ///
1368 /// Returns [`DbError`] when referenced catalog IDs are unknown, the
1369 /// projection name already exists, or ID allocation fails.
1370 ///
1371 /// # Performance
1372 ///
1373 /// This method is `O(definition size + catalog lookup cost)`.
1374 pub fn define_projection(
1375 &mut self,
1376 definition: ProjectionDefinition,
1377 ) -> Result<ProjectionId, DbError> {
1378 self.validate_projection_definition(&definition)?;
1379 self.delta.register_projection(definition)
1380 }
1381
1382 /// Defines an index.
1383 ///
1384 /// # Errors
1385 ///
1386 /// Returns [`DbError`] when referenced catalog IDs are unknown, the index
1387 /// name already exists, or ID allocation fails.
1388 ///
1389 /// # Performance
1390 ///
1391 /// This method is `O(definition size + catalog lookup cost)`.
1392 pub fn define_index(
1393 &mut self,
1394 name: impl Into<String>,
1395 definition: IndexDefinition,
1396 ) -> Result<IndexId, DbError> {
1397 self.validate_index_definition(&definition)?;
1398 self.delta.register_index(name.into(), definition)
1399 }
1400
1401 /// Creates a canonical element.
1402 ///
1403 /// # Errors
1404 ///
1405 /// Returns [`DbError::IdOverflow`] when element IDs are exhausted.
1406 ///
1407 /// # Performance
1408 ///
1409 /// This method is `O(log element change)`.
1410 pub fn create_element(&mut self) -> Result<ElementId, DbError> {
1411 self.delta.create_element()
1412 }
1413
1414 /// Creates a canonical relation.
1415 ///
1416 /// # Errors
1417 ///
1418 /// Returns [`DbError::IdOverflow`] when relation IDs are exhausted.
1419 ///
1420 /// # Performance
1421 ///
1422 /// This method is `O(log relation change)`.
1423 pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
1424 self.delta.create_relation()
1425 }
1426
1427 /// Creates a canonical incidence.
1428 ///
1429 /// # Errors
1430 ///
1431 /// Returns [`DbError`] when referenced IDs are unknown or incidence IDs are
1432 /// exhausted.
1433 ///
1434 /// # Performance
1435 ///
1436 /// This method is `O(log incidence change + reference lookup cost)`.
1437 pub fn create_incidence(
1438 &mut self,
1439 relation: RelationId,
1440 element: ElementId,
1441 role: RoleId,
1442 ) -> Result<IncidenceId, DbError> {
1443 self.require_relation(relation)?;
1444 self.require_element(element)?;
1445 self.require_role(role)?;
1446 self.delta.create_incidence(relation, element, role)
1447 }
1448
1449 /// Tombstones a canonical element and its incidences.
1450 ///
1451 /// # Errors
1452 ///
1453 /// Returns [`DbError::UnknownElement`] when the element is not visible.
1454 ///
1455 /// # Performance
1456 ///
1457 /// This method is `O(incidence count)`.
1458 pub fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
1459 self.require_element(id)?;
1460 let base = self.parent.base_records();
1461 self.delta.tombstone_element(base, id);
1462 // Cascade: every incidence referencing the element is tombstoned too.
1463 let incidences: Vec<IncidenceId> = self
1464 .merged()
1465 .incidences()
1466 .filter(|record| record.element == id)
1467 .map(|record| record.id)
1468 .collect();
1469 for incidence in incidences {
1470 self.delta
1471 .tombstone_incidence(self.parent.base_records(), incidence);
1472 }
1473 Ok(())
1474 }
1475
1476 /// Tombstones a canonical relation and its incidences.
1477 ///
1478 /// # Errors
1479 ///
1480 /// Returns [`DbError::UnknownRelation`] when the relation is not visible.
1481 ///
1482 /// # Performance
1483 ///
1484 /// This method is `O(incidence count)`.
1485 pub fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
1486 self.require_relation(id)?;
1487 let base = self.parent.base_records();
1488 self.delta.tombstone_relation(base, id);
1489 let incidences: Vec<IncidenceId> = self
1490 .merged()
1491 .incidences()
1492 .filter(|record| record.relation == id)
1493 .map(|record| record.id)
1494 .collect();
1495 for incidence in incidences {
1496 self.delta
1497 .tombstone_incidence(self.parent.base_records(), incidence);
1498 }
1499 Ok(())
1500 }
1501
1502 /// Tombstones a canonical incidence.
1503 ///
1504 /// # Errors
1505 ///
1506 /// Returns [`DbError::UnknownIncidence`] when the incidence is not visible.
1507 ///
1508 /// # Performance
1509 ///
1510 /// This method is `O(log incidence change)`.
1511 pub fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
1512 self.require_incidence(id)?;
1513 self.delta
1514 .tombstone_incidence(self.parent.base_records(), id);
1515 Ok(())
1516 }
1517
1518 /// Adds a label to an element.
1519 ///
1520 /// # Errors
1521 ///
1522 /// Returns [`DbError`] when the element or label is unknown.
1523 ///
1524 /// # Performance
1525 ///
1526 /// This method is `O(log element change + log label count)`.
1527 pub fn add_element_label(&mut self, element: ElementId, label: LabelId) -> Result<(), DbError> {
1528 self.require_element(element)?;
1529 self.require_label(label)?;
1530 self.delta
1531 .add_element_label(self.parent.base_records(), element, label);
1532 Ok(())
1533 }
1534
1535 /// Adds a label to a relation.
1536 ///
1537 /// # Errors
1538 ///
1539 /// Returns [`DbError`] when the relation or label is unknown.
1540 ///
1541 /// # Performance
1542 ///
1543 /// This method is `O(log relation change + log label count)`.
1544 pub fn add_relation_label(
1545 &mut self,
1546 relation: RelationId,
1547 label: LabelId,
1548 ) -> Result<(), DbError> {
1549 self.require_relation(relation)?;
1550 self.require_label(label)?;
1551 self.delta
1552 .add_relation_label(self.parent.base_records(), relation, label);
1553 Ok(())
1554 }
1555
1556 /// Sets a relation type.
1557 ///
1558 /// # Errors
1559 ///
1560 /// Returns [`DbError`] when the relation or relation type is unknown.
1561 ///
1562 /// # Performance
1563 ///
1564 /// This method is `O(log relation change + log relation type count)`.
1565 pub fn set_relation_type(
1566 &mut self,
1567 relation: RelationId,
1568 relation_type: RelationTypeId,
1569 ) -> Result<(), DbError> {
1570 self.require_relation(relation)?;
1571 self.require_relation_type(relation_type)?;
1572 self.delta
1573 .set_relation_type(self.parent.base_records(), relation, relation_type);
1574 Ok(())
1575 }
1576
1577 /// Sets a property value.
1578 ///
1579 /// # Errors
1580 ///
1581 /// Returns [`DbError`] when the subject or key is unknown, or the value
1582 /// does not match the key schema.
1583 ///
1584 /// # Performance
1585 ///
1586 /// This method is `O(log subject change + log key count)`.
1587 pub fn set_property(
1588 &mut self,
1589 subject: PropertySubject,
1590 key: PropertyKeyId,
1591 value: PropertyValue,
1592 ) -> Result<(), DbError> {
1593 // Referential integrity: the subject must be visible (this rejects an
1594 // orphan property against a tombstoned/absent subject at the transaction
1595 // boundary — the overlay layer is permissive by design).
1596 self.require_subject(subject)?;
1597 let definition = self
1598 .merged()
1599 .catalog()
1600 .property_key(key)
1601 .cloned()
1602 .ok_or(DbError::UnknownPropertyKey { id: key })?;
1603 if definition.family != subject.family() {
1604 return Err(DbError::WrongPropertyFamily {
1605 expected: definition.family,
1606 actual: subject.family(),
1607 });
1608 }
1609 if definition.value_type != value.value_type() {
1610 return Err(DbError::PropertyTypeMismatch {
1611 expected: definition.value_type,
1612 actual: value.value_type(),
1613 });
1614 }
1615 self.delta
1616 .set_property(self.parent.base_records(), subject, key, value);
1617 Ok(())
1618 }
1619
1620 /// Removes a property value.
1621 ///
1622 /// # Errors
1623 ///
1624 /// Returns [`DbError`] when the subject or key is unknown.
1625 ///
1626 /// # Performance
1627 ///
1628 /// This method is `O(log subject change + log key count)`.
1629 pub fn remove_property(
1630 &mut self,
1631 subject: PropertySubject,
1632 key: PropertyKeyId,
1633 ) -> Result<(), DbError> {
1634 self.require_subject(subject)?;
1635 if self.merged().catalog().property_key(key).is_none() {
1636 return Err(DbError::UnknownPropertyKey { id: key });
1637 }
1638 self.delta
1639 .remove_property(self.parent.base_records(), subject, key);
1640 Ok(())
1641 }
1642
1643 /// Commits this write transaction durably.
1644 ///
1645 /// A non-dirty commit returns the parent's commit sequence without appending
1646 /// to the WAL or publishing. A dirty commit encodes the overlay's mutation
1647 /// log into one WAL frame (with the watermark op last), appends it with an
1648 /// fsync (truncating back to the captured EOF on any write error so no
1649 /// interior torn record survives), THEN folds the delta into a fresh
1650 /// `Arc<Overlay>` and publishes a new `Arc<Snapshot>`.
1651 ///
1652 /// After publishing, a dirty commit consults the configured
1653 /// [`CheckpointPolicy`]: it releases the writer lock FIRST (so the fold can
1654 /// re-acquire it), then folds when the delta-log has outgrown the base. The
1655 /// committed frame is already durable, so an auto-fold failure does not lose
1656 /// data; it is surfaced to the caller.
1657 ///
1658 /// # Errors
1659 ///
1660 /// Returns [`DbError`] when commit-sequence allocation, frame encoding, the
1661 /// durable append, or a triggered auto-checkpoint fold fails.
1662 ///
1663 /// # Performance
1664 ///
1665 /// This method is `O(change)` for the dirty path — flat as the base grows.
1666 /// The publish step shares the parent snapshot's already-materialized
1667 /// [`crate::overlay::BaseRecords`] and derived index by `Arc` (a commit never
1668 /// folds, so the base is byte-identical within the generation), so it neither
1669 /// re-decodes the base nor rebuilds the index. A triggered fold adds
1670 /// `O(visible state bytes)` on top.
1671 pub fn commit(self) -> Result<CommitSeq, DbError> {
1672 if self.delta.is_empty() {
1673 // Non-dirty commit: no append, no publish, no durable id advance.
1674 return Ok(self.parent.lsn());
1675 }
1676 let lsn = self
1677 .parent
1678 .lsn()
1679 .checked_next()
1680 .ok_or(DbError::CommitSeqOverflow)?;
1681 let (ops, blob) = self.delta.encode_frame();
1682 let frame = wal::encode_commit(
1683 lsn.get(),
1684 self.transaction_id.get(),
1685 self.database.base_generation,
1686 &ops,
1687 &blob,
1688 )?;
1689 let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
1690 wal::append_commit(&mut log, &frame)?;
1691
1692 // Durable: the delta was seeded from the parent overlay and only added
1693 // this writer's changes, so freezing it directly is the full new
1694 // published overlay (parent state + this commit). The parent overlay was
1695 // never mutated — this is a brand-new frozen `Arc<Overlay>`, so a reader
1696 // pinning the parent is unaffected.
1697 let new_overlay = Arc::new(self.delta.freeze());
1698 // A commit never folds, so the new snapshot pins the SAME base generation
1699 // as the parent — the base wire bytes are byte-identical, and so are the
1700 // owned records and the derived index built from them. Share the parent's
1701 // `Arc<BaseRecords>` (and its `BaseIndex`) instead of re-decoding the base
1702 // and rebuilding the index, which keeps a single-element commit `O(change)`
1703 // rather than `O(base)` regardless of how large the base has grown.
1704 let snapshot = Snapshot::with_shared_base_records(
1705 self.parent.generation(),
1706 lsn,
1707 Arc::clone(self.parent.base()),
1708 new_overlay,
1709 Arc::clone(self.parent.base_records()),
1710 );
1711 self.database.current = Arc::new(snapshot);
1712 self.database.last_transaction_id = self.transaction_id;
1713 // Release the writer lock before any auto-fold so the fold can re-acquire
1714 // it (a partial move out of `self`, legal because `WriteTransaction` has
1715 // no `Drop` impl; the remaining `&mut Database` borrow stays live).
1716 drop(self.lock);
1717 self.database.maybe_auto_checkpoint()?;
1718 Ok(lsn)
1719 }
1720
1721 /// Drops this write transaction without committing.
1722 ///
1723 /// # Performance
1724 ///
1725 /// This method is `O(1)` excluding staged-delta drop cost.
1726 pub fn rollback(self) {}
1727
1728 /// Returns the merged read view this writer sees (overlay over base).
1729 ///
1730 /// # Performance
1731 ///
1732 /// This method is `O(1)` to construct.
1733 fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
1734 crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
1735 }
1736
1737 /// Requires an element to be visible in the writer's merged view.
1738 ///
1739 /// # Errors
1740 ///
1741 /// Returns [`DbError::UnknownElement`] when absent.
1742 ///
1743 /// # Performance
1744 ///
1745 /// This method is `O(log change + log n)`.
1746 fn require_element(&self, id: ElementId) -> Result<(), DbError> {
1747 if self.merged().contains_element(id) {
1748 Ok(())
1749 } else {
1750 Err(DbError::UnknownElement { id })
1751 }
1752 }
1753
1754 /// Requires a relation to be visible.
1755 ///
1756 /// # Errors
1757 ///
1758 /// Returns [`DbError::UnknownRelation`] when absent.
1759 ///
1760 /// # Performance
1761 ///
1762 /// This method is `O(log change + log n)`.
1763 fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
1764 if self.merged().contains_relation(id) {
1765 Ok(())
1766 } else {
1767 Err(DbError::UnknownRelation { id })
1768 }
1769 }
1770
1771 /// Requires an incidence to be visible.
1772 ///
1773 /// # Errors
1774 ///
1775 /// Returns [`DbError::UnknownIncidence`] when absent.
1776 ///
1777 /// # Performance
1778 ///
1779 /// This method is `O(log change + log n)`.
1780 fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
1781 if self.merged().contains_incidence(id) {
1782 Ok(())
1783 } else {
1784 Err(DbError::UnknownIncidence { id })
1785 }
1786 }
1787
1788 /// Requires a role to exist in the merged catalog.
1789 ///
1790 /// # Errors
1791 ///
1792 /// Returns [`DbError::UnknownRole`] when absent.
1793 ///
1794 /// # Performance
1795 ///
1796 /// This method is `O(log role count)`.
1797 fn require_role(&self, id: RoleId) -> Result<(), DbError> {
1798 if self.delta.catalog().role(id).is_some() {
1799 Ok(())
1800 } else {
1801 Err(DbError::UnknownRole { id })
1802 }
1803 }
1804
1805 /// Requires a label to exist in the merged catalog.
1806 ///
1807 /// # Errors
1808 ///
1809 /// Returns [`DbError::UnknownLabel`] when absent.
1810 ///
1811 /// # Performance
1812 ///
1813 /// This method is `O(log label count)`.
1814 fn require_label(&self, id: LabelId) -> Result<(), DbError> {
1815 if self.delta.catalog().label(id).is_some() {
1816 Ok(())
1817 } else {
1818 Err(DbError::UnknownLabel { id })
1819 }
1820 }
1821
1822 /// Requires a relation type to exist in the merged catalog.
1823 ///
1824 /// # Errors
1825 ///
1826 /// Returns [`DbError::UnknownRelationType`] when absent.
1827 ///
1828 /// # Performance
1829 ///
1830 /// This method is `O(log relation type count)`.
1831 fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
1832 if self.delta.catalog().relation_type(id).is_some() {
1833 Ok(())
1834 } else {
1835 Err(DbError::UnknownRelationType { id })
1836 }
1837 }
1838
1839 /// Requires a property subject to be visible.
1840 ///
1841 /// # Errors
1842 ///
1843 /// Returns the matching `Unknown*` error when the subject is absent.
1844 ///
1845 /// # Performance
1846 ///
1847 /// This method is `O(log change + log n)`.
1848 fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
1849 match subject {
1850 PropertySubject::Element(id) => self.require_element(id),
1851 PropertySubject::Relation(id) => self.require_relation(id),
1852 PropertySubject::Incidence(id) => self.require_incidence(id),
1853 }
1854 }
1855
1856 /// Validates one projection definition against the merged catalog.
1857 ///
1858 /// # Errors
1859 ///
1860 /// Returns [`DbError`] when a referenced role or relation type is unknown.
1861 ///
1862 /// # Performance
1863 ///
1864 /// This method is `O(definition size)`.
1865 fn validate_projection_definition(
1866 &self,
1867 definition: &ProjectionDefinition,
1868 ) -> Result<(), DbError> {
1869 match definition {
1870 ProjectionDefinition::Graph(graph) => {
1871 self.require_role(graph.source_role)?;
1872 self.require_role(graph.target_role)?;
1873 for relation_type in &graph.relation_types {
1874 self.require_relation_type(*relation_type)?;
1875 }
1876 Ok(())
1877 }
1878 ProjectionDefinition::Hypergraph(hyper) => {
1879 for role in &hyper.source_roles {
1880 self.require_role(*role)?;
1881 }
1882 for role in &hyper.target_roles {
1883 self.require_role(*role)?;
1884 }
1885 for relation_type in &hyper.relation_types {
1886 self.require_relation_type(*relation_type)?;
1887 }
1888 Ok(())
1889 }
1890 }
1891 }
1892
1893 /// Validates one index definition against the merged catalog.
1894 ///
1895 /// # Errors
1896 ///
1897 /// Returns [`DbError`] when a referenced catalog id is unknown or a
1898 /// composite index has no keys.
1899 ///
1900 /// # Performance
1901 ///
1902 /// This method is `O(definition size)`.
1903 fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
1904 let catalog = self.delta.catalog();
1905 match definition {
1906 IndexDefinition::Label { label } => self.require_label(*label),
1907 IndexDefinition::RelationType { relation_type } => {
1908 self.require_relation_type(*relation_type)
1909 }
1910 IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
1911 self.require_property_key(*key)
1912 }
1913 IndexDefinition::CompositeEquality { keys } => {
1914 if keys.is_empty() {
1915 return Err(DbError::unsupported(
1916 "composite equality index requires at least one key",
1917 ));
1918 }
1919 for key in keys {
1920 self.require_property_key(*key)?;
1921 }
1922 Ok(())
1923 }
1924 IndexDefinition::Projection { projection } => catalog
1925 .projection(*projection)
1926 .is_some()
1927 .then_some(())
1928 .ok_or(DbError::UnknownProjection { id: *projection }),
1929 }
1930 }
1931
1932 /// Requires a property key to exist in the merged catalog.
1933 ///
1934 /// # Errors
1935 ///
1936 /// Returns [`DbError::UnknownPropertyKey`] when absent.
1937 ///
1938 /// # Performance
1939 ///
1940 /// This method is `O(log property key count)`.
1941 fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
1942 if self.delta.catalog().property_key(id).is_some() {
1943 Ok(())
1944 } else {
1945 Err(DbError::UnknownPropertyKey { id })
1946 }
1947 }
1948}
1949
1950#[cfg(test)]
1951#[cfg(not(miri))]
1952mod tests {
1953 use std::{
1954 path::PathBuf,
1955 sync::atomic::{AtomicU64, Ordering},
1956 };
1957
1958 use super::*;
1959
1960 /// Per-process path counter for unique temporary store directories.
1961 static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
1962
1963 /// Returns a unique temporary store path and removes any prior contents.
1964 fn temp_store(name: &str) -> PathBuf {
1965 let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
1966 let path =
1967 std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
1968 let _ = std::fs::remove_dir_all(&path);
1969 path
1970 }
1971
1972 /// The exact logical state the crash-matrix asserts recovery preserves: the
1973 /// visible element ids, the rank-keyed property values, and the `Person`
1974 /// label membership.
1975 #[derive(Debug, Eq, PartialEq)]
1976 struct LogicalState {
1977 /// Visible element ids in ascending order.
1978 elements: Vec<ElementId>,
1979 /// Subjects whose `rank` equals each probed value, by value.
1980 rank_eq_500: Vec<PropertySubject>,
1981 /// Element ids carrying the `Person` label.
1982 person_members: Vec<ElementId>,
1983 }
1984
1985 /// Catalog/topology fixture ids returned by [`build_fixture`].
1986 struct Fixture {
1987 /// `rank` integer property key.
1988 rank: PropertyKeyId,
1989 /// `Person` label.
1990 person: LabelId,
1991 }
1992
1993 /// Builds a committed fixture: 8 elements, each ranked `index * 100`, the
1994 /// even-indexed ones labelled `Person`. Returns the fixture ids.
1995 fn build_fixture(database: &mut Database) -> Fixture {
1996 let mut writer = database.begin_write().expect("begin write");
1997 let rank = writer
1998 .register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
1999 .expect("rank key");
2000 let person = writer.register_label("Person").expect("person label");
2001 for index in 0..8u64 {
2002 let element = writer.create_element().expect("element");
2003 writer
2004 .set_property(
2005 PropertySubject::Element(element),
2006 rank,
2007 PropertyValue::Integer(i64::try_from(index).expect("index") * 100),
2008 )
2009 .expect("set rank");
2010 if index % 2 == 0 {
2011 writer
2012 .add_element_label(element, person)
2013 .expect("add label");
2014 }
2015 }
2016 writer.commit().expect("commit fixture");
2017 Fixture { rank, person }
2018 }
2019
2020 /// Reads the logical state through the index-backed read surface.
2021 fn read_logical(database: &Database, fixture: &Fixture) -> LogicalState {
2022 let read = database.begin_read();
2023 let elements = read.element_ids();
2024 let rank_eq_500 = read
2025 .lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
2026 .expect("rank lookup");
2027 let person_members = read.snapshot.view().elements_with_label(fixture.person);
2028 LogicalState {
2029 elements,
2030 rank_eq_500,
2031 person_members,
2032 }
2033 }
2034
2035 /// Asserts ids are never reused across a fold BEHAVIORALLY: the next element
2036 /// `database` mints must take the id one past the current maximum visible
2037 /// element id, i.e. the recovered watermark survived the fold. A regression
2038 /// that dropped the watermark on fold (so the recovered record set is
2039 /// unchanged but the next-id counter reset) would reuse an existing id and
2040 /// fail this assertion — which the unchanged-record-set checks alone miss.
2041 ///
2042 /// The probe element is rolled back, so it does not perturb the logical state
2043 /// the surrounding test re-reads.
2044 fn assert_no_id_reuse_across_fold(database: &mut Database) {
2045 let max_existing = database
2046 .begin_read()
2047 .element_ids()
2048 .into_iter()
2049 .map(ElementId::get)
2050 .max()
2051 .unwrap_or(0);
2052 let expected = ElementId::new(max_existing + 1);
2053 let mut writer = database.begin_write().expect("watermark probe writer");
2054 let minted = writer.create_element().expect("watermark probe element");
2055 assert_eq!(
2056 minted, expected,
2057 "the next minted id must be one past the max existing id (watermark \
2058 survived the fold; ids are never reused)",
2059 );
2060 // Roll the probe back so it leaves no trace in the logical state.
2061 writer.rollback();
2062 }
2063
2064 /// CHECKPOINT-CRASH-MATRIX: a crash after each fsync point in `checkpoint`
2065 /// recovers EXACTLY the correct logical state. After a crash before the
2066 /// superblock lands, the OLD generation stays authoritative (the orphan new
2067 /// base is ignored); after a crash once the superblock names the new
2068 /// generation, the NEW base is authoritative. The completed checkpoint
2069 /// recovers the same logical state from the folded base. In every case the
2070 /// index-backed lookups return the same answers as before the (attempted)
2071 /// fold.
2072 #[test]
2073 fn checkpoint_crash_matrix_recovers_exact_state() {
2074 for stop in [
2075 CheckpointStop::BeforeSuperblock,
2076 CheckpointStop::BeforeRotate,
2077 CheckpointStop::Complete,
2078 ] {
2079 let path = temp_store(&format!("crash-{stop:?}"));
2080 let mut database = Database::create(&path).expect("create");
2081 let fixture = build_fixture(&mut database);
2082 let before = read_logical(&database, &fixture);
2083 let before_generation = database.base_generation;
2084
2085 // Simulate a crash at `stop`: the checkpoint returns right after the
2086 // chosen fsync, leaving the intermediate files in place. We then drop
2087 // the handle (as a crash would) and reopen from disk.
2088 database
2089 .checkpoint_inner(stop)
2090 .expect("checkpoint stop returns ok");
2091 drop(database);
2092
2093 let mut recovered = Database::open(&path).expect("reopen after crash");
2094 let after = read_logical(&recovered, &fixture);
2095 assert_eq!(
2096 after, before,
2097 "crash at {stop:?} must recover the exact logical state",
2098 );
2099
2100 // The recovered watermark survives every crash window: the next minted
2101 // id is one past the max recovered element id, so ids are never reused
2102 // across the (attempted) fold — asserted behaviorally, not merely
2103 // inferred from the unchanged record set.
2104 assert_no_id_reuse_across_fold(&mut recovered);
2105
2106 // Generation expectation per crash window.
2107 match stop {
2108 CheckpointStop::BeforeSuperblock => assert_eq!(
2109 recovered.base_generation, before_generation,
2110 "old superblock stays authoritative before the new one lands",
2111 ),
2112 CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
2113 recovered.base_generation,
2114 before_generation + 1,
2115 "the new superblock names the folded generation",
2116 ),
2117 }
2118
2119 // A second open is idempotent (orphan files from a partial crash do
2120 // not derail a repeat recovery).
2121 let reopened = Database::open(&path).expect("second reopen");
2122 assert_eq!(read_logical(&reopened, &fixture), before);
2123
2124 drop(reopened);
2125 let _ = std::fs::remove_dir_all(&path);
2126 }
2127 }
2128
2129 /// The auto-checkpoint policy folds the delta-log into a fresh base once the
2130 /// log outgrows the base by the configured factor: under a tiny factor, a
2131 /// run of dirty commits advances the live generation (the log was folded),
2132 /// and the logical state is preserved across the fold. The manual policy
2133 /// never auto-folds.
2134 #[test]
2135 fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
2136 // Manual policy: many commits, generation never advances on its own.
2137 let manual_path = temp_store("auto-manual");
2138 let mut manual = Database::create(&manual_path).expect("create manual");
2139 manual.set_checkpoint_policy(CheckpointPolicy::Manual);
2140 let _fixture = build_fixture(&mut manual);
2141 for _ in 0..200 {
2142 let mut writer = manual.begin_write().expect("writer");
2143 writer.create_element().expect("element");
2144 writer.commit().expect("commit");
2145 }
2146 assert_eq!(
2147 manual.live_generation(),
2148 CheckpointGeneration::new(0),
2149 "manual policy must never auto-fold",
2150 );
2151 drop(manual);
2152 let _ = std::fs::remove_dir_all(&manual_path);
2153
2154 // Size-ratio policy with the smallest factor: the log soon outgrows the
2155 // tiny base floor, so a run of commits triggers at least one fold.
2156 let auto_path = temp_store("auto-ratio");
2157 let mut auto = Database::create(&auto_path).expect("create auto");
2158 auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
2159 let fixture = build_fixture(&mut auto);
2160 let before = read_logical(&auto, &fixture);
2161 for _ in 0..400 {
2162 let mut writer = auto.begin_write().expect("writer");
2163 writer.create_element().expect("element");
2164 writer.commit().expect("commit");
2165 }
2166 assert!(
2167 auto.live_generation() > CheckpointGeneration::new(0),
2168 "size-ratio policy must auto-fold once the log outgrows the base",
2169 );
2170 // The pre-existing logical state survives every fold; the policy is also
2171 // surfaced in status and preserved across the fold.
2172 let after = read_logical(&auto, &fixture);
2173 assert_eq!(after.rank_eq_500, before.rank_eq_500);
2174 assert_eq!(after.person_members, before.person_members);
2175 // Ids are never reused across the auto-fold: the next minted id is one
2176 // past the max existing id (the watermark folded into the new base).
2177 assert_no_id_reuse_across_fold(&mut auto);
2178 assert_eq!(
2179 auto.checkpoint_policy(),
2180 CheckpointPolicy::SizeRatio { factor: 1 },
2181 "the auto-fold reopen must preserve the configured policy",
2182 );
2183 // Status surfaces the live generation and the (now small) log size.
2184 let status = auto.status();
2185 assert_eq!(status.live_generation, auto.live_generation());
2186 assert!(status.base_byte_size > 0, "live base has bytes");
2187 drop(auto);
2188 let _ = std::fs::remove_dir_all(&auto_path);
2189 }
2190}