oxgraph_db/database/open.rs
1//! Store lifecycle: create, open/recover, and the on-disk file helpers.
2//!
3//! `create` writes base-0, an empty delta-log, and the superblock (last, as
4//! the create-complete marker); `open` recovers the live frontier by
5//! replaying the valid delta-log prefix over the base the superblock names.
6
7use std::{path::Path, sync::Arc};
8
9use super::{CheckpointPolicy, Db};
10use crate::{
11 Catalog, CheckpointGeneration, CommitSeq, DbError, TransactionId,
12 backing::Base,
13 freeze::{self, FreezeStamps},
14 overlay::{Overlay, Snapshot, WriteOverlay},
15 state::NextIds,
16 storage, wal,
17 wire::SuperblockRecord,
18};
19
20/// Builds the base filename for generation `generation`.
21///
22/// # Performance
23///
24/// This function is `O(1)`.
25pub(super) fn base_file(generation: u64) -> String {
26 format!("base-{generation}.oxgdb")
27}
28
29/// Builds the delta-log filename for generation `generation`.
30///
31/// # Performance
32///
33/// This function is `O(1)`.
34pub(super) fn delta_file(generation: u64) -> String {
35 format!("delta-{generation}.log")
36}
37
38impl Db {
39 /// Creates a new empty OXGDB database at `path`.
40 ///
41 /// The create order is base-0 then empty delta-0.log then the writer lock
42 /// file then the superblock (written LAST as the create-complete marker), so
43 /// a half-created store is detected on open rather than silently opened
44 /// empty.
45 ///
46 /// # Errors
47 ///
48 /// Returns [`DbError::Storage(crate::error::StorageError::AlreadyExists)`] when a store already
49 /// exists, or [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
50 ///
51 /// # Performance
52 ///
53 /// This function is `O(empty base bytes)`.
54 pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
55 let root = path.as_ref().to_path_buf();
56 if root.join(wal::SUPERBLOCK_FILE).exists() {
57 return Err(DbError::Storage(crate::error::StorageError::AlreadyExists));
58 }
59 // Base-0: an empty merged view (empty base under an empty overlay).
60 let empty_base = crate::overlay::BaseRecords::empty();
61 let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
62 let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
63 let base_bytes = freeze::freeze_view(
64 &view,
65 FreezeStamps {
66 commit_seq: 0,
67 transaction_id: 0,
68 generation: 0,
69 },
70 )?;
71 storage::atomic_write(
72 &root,
73 &root.join(format!("{}.tmp", base_file(0))),
74 &root.join(base_file(0)),
75 &base_bytes,
76 )?;
77 // Empty delta-0.log, durably created.
78 create_empty_log(&root, 0)?;
79 // Superblock is written LAST; its existence is the create-complete marker.
80 write_superblock(&root, 0, 0, 0, 0)?;
81 Self::open(&root)
82 }
83
84 /// Opens an existing OXGDB database, recovering the live frontier from the
85 /// valid prefix of the delta-log replayed over the base named by the
86 /// superblock.
87 ///
88 /// # Errors
89 ///
90 /// Returns [`DbError`] when the store is missing, malformed, or the log is
91 /// corrupt beyond a torn tail.
92 ///
93 /// # Performance
94 ///
95 /// This function is `O(base bytes + log bytes)`. Base integrity is fused
96 /// into the bind pass: the container's table checksum is verified once,
97 /// then each bound section's payload CRC-32C is verified as the section is
98 /// borrowed — there is no separate whole-base CRC scan.
99 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
100 let root = path.as_ref().to_path_buf();
101 let superblock = wal::read_superblock(&root)?;
102 let generation = superblock.base_generation.get();
103
104 let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
105 let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
106 let base_header = *base.get().header();
107 let base_catalog = base.get().catalog().clone();
108 let base_next = NextIds::from_header(&base_header);
109
110 // Replay the valid prefix of the per-generation delta-log.
111 let log_path = root.join(delta_file(generation));
112 let log_bytes = read_log(&log_path)?;
113 let outcome = wal::replay(generation, &log_bytes)?;
114 // A torn tail truncates the log back to its last-good byte length.
115 if outcome.valid_len < log_bytes.len() {
116 truncate_log(&log_path, outcome.valid_len)?;
117 }
118
119 // Fold the replayed frames into a fresh overlay over the base, deriving
120 // the live frontier (commit_seq/txn_id) from the last good frame.
121 let mut write = WriteOverlay::new(base_next, base_catalog);
122 let mut recovered_next = base_next;
123 let mut last_commit_seq = superblock.commit_seq.get();
124 let mut last_txn = superblock.transaction_id.get();
125 for frame in &outcome.frames {
126 for op in &frame.ops {
127 write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
128 }
129 recovered_next = recovered_next.elementwise_max(write.next_ids());
130 last_commit_seq = frame.lsn;
131 last_txn = last_txn.max(frame.txn_id);
132 }
133 // ids are never reused: the recovered watermark is the elementwise max of
134 // the base header and every replayed frame's watermark.
135 write.set_next_ids(recovered_next);
136 let overlay = Arc::new(write.freeze());
137
138 // Reuse the records already decoded for replay instead of decoding the base
139 // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
140 // the records (and their derived index) match. Halves open's base-decode cost.
141 let snapshot = Arc::new(Snapshot::with_shared_base_records(
142 CheckpointGeneration::new(generation),
143 CommitSeq::new(last_commit_seq),
144 base,
145 overlay,
146 base_records,
147 ));
148
149 Ok(Self {
150 root,
151 current: snapshot,
152 base_generation: generation,
153 last_transaction_id: TransactionId::new(last_txn),
154 checkpoint_policy: CheckpointPolicy::default(),
155 })
156 }
157}
158
159/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
160/// be stat'd (size is advisory — used for status reporting and the
161/// auto-checkpoint heuristic, never for correctness).
162///
163/// # Performance
164///
165/// This function is `O(1)`: one `stat` syscall.
166pub(super) fn file_len(path: &Path) -> u64 {
167 std::fs::metadata(path).map_or(0, |meta| meta.len())
168}
169
170/// Reads the whole delta-log into memory, treating a missing file as empty.
171///
172/// # Errors
173///
174/// Returns [`DbError::Io`] when the file cannot be read.
175///
176/// # Performance
177///
178/// This function is `O(log bytes)`.
179pub(super) fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
180 match std::fs::read(path) {
181 Ok(bytes) => Ok(bytes),
182 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
183 Err(error) => Err(DbError::io("read delta-log", error)),
184 }
185}
186
187/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
188/// discarding a torn tail under the open path.
189///
190/// # Errors
191///
192/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
193///
194/// # Performance
195///
196/// This function is `O(1)`.
197pub(super) fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
198 let file = std::fs::OpenOptions::new()
199 .write(true)
200 .open(path)
201 .map_err(|error| DbError::io("open delta-log for truncate", error))?;
202 let len = u64::try_from(len)
203 .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
204 file.set_len(len)
205 .map_err(|error| DbError::io("truncate delta-log", error))?;
206 file.sync_all()
207 .map_err(|error| DbError::io("sync truncated delta-log", error))
208}
209
210/// Creates an empty per-generation delta-log, fsyncing the file and the
211/// directory entry so the new (empty) log is durable.
212///
213/// # Errors
214///
215/// Returns [`DbError::Io`] when creation or syncing fails.
216///
217/// # Performance
218///
219/// This function is `O(1)`.
220pub(super) fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
221 let path = root.join(delta_file(generation));
222 let file =
223 std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
224 file.sync_all()
225 .map_err(|error| DbError::io("sync delta-log", error))?;
226 Ok(storage::sync_directory(root)?)
227}
228
229/// Opens the live delta-log for appending (create when absent, read+append).
230///
231/// # Errors
232///
233/// Returns [`DbError::Io`] when the log cannot be opened.
234///
235/// # Performance
236///
237/// This function is `O(1)`.
238pub(super) fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
239 std::fs::OpenOptions::new()
240 .create(true)
241 .truncate(false)
242 .read(true)
243 .append(true)
244 .open(root.join(delta_file(generation)))
245 .map_err(|error| DbError::io("open delta-log for append", error))
246}
247
248/// Writes the superblock naming `generation` with the given frontier stamps.
249///
250/// # Errors
251///
252/// Returns [`DbError::Io`] when publishing fails.
253///
254/// # Performance
255///
256/// This function is `O(1)`.
257pub(super) fn write_superblock(
258 root: &Path,
259 generation: u64,
260 checkpoint_lsn: u64,
261 commit_seq: u64,
262 transaction_id: u64,
263) -> Result<(), DbError> {
264 Ok(wal::write_superblock(
265 root,
266 &SuperblockRecord {
267 magic: crate::wire::SUPERBLOCK_MAGIC,
268 base_generation: generation.into(),
269 checkpoint_lsn: checkpoint_lsn.into(),
270 log_byte_offset: 0u64.into(),
271 commit_seq: commit_seq.into(),
272 transaction_id: transaction_id.into(),
273 format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
274 flags: 0u32.into(),
275 crc32c: 0u32.into(),
276 pad: 0u32.into(),
277 },
278 )?)
279}