Skip to main content

oxgraph_db/database/
open.rs

1//! Store lifecycle: create, open/recover, and the on-disk file helpers.
2//!
3//! `create` writes base-0, an empty delta-log, and the superblock (last, as
4//! the create-complete marker); `open` recovers the live frontier by
5//! replaying the valid delta-log prefix over the base the superblock names.
6
7use std::{path::Path, sync::Arc};
8
9use super::{CheckpointPolicy, Db};
10use crate::{
11    Catalog, CheckpointGeneration, CommitSeq, DbError, TransactionId,
12    backing::Base,
13    freeze::{self, FreezeStamps},
14    overlay::{Overlay, Snapshot, WriteOverlay},
15    state::NextIds,
16    storage, wal,
17    wire::SuperblockRecord,
18};
19
20/// Builds the base filename for generation `generation`.
21///
22/// # Performance
23///
24/// This function is `O(1)`.
25pub(super) fn base_file(generation: u64) -> String {
26    format!("base-{generation}.oxgdb")
27}
28
29/// Builds the delta-log filename for generation `generation`.
30///
31/// # Performance
32///
33/// This function is `O(1)`.
34pub(super) fn delta_file(generation: u64) -> String {
35    format!("delta-{generation}.log")
36}
37
38impl Db {
39    /// Creates a new empty OXGDB database at `path`.
40    ///
41    /// The create order is base-0 then empty delta-0.log then the writer lock
42    /// file then the superblock (written LAST as the create-complete marker), so
43    /// a half-created store is detected on open rather than silently opened
44    /// empty.
45    ///
46    /// # Errors
47    ///
48    /// Returns [`DbError::Storage(crate::error::StorageError::AlreadyExists)`] when a store already
49    /// exists, or [`DbError::Io`]/[`DbError::InvalidStore`] when creation fails.
50    ///
51    /// # Performance
52    ///
53    /// This function is `O(empty base bytes)`.
54    pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
55        let root = path.as_ref().to_path_buf();
56        if root.join(wal::SUPERBLOCK_FILE).exists() {
57            return Err(DbError::Storage(crate::error::StorageError::AlreadyExists));
58        }
59        // Base-0: an empty merged view (empty base under an empty overlay).
60        let empty_base = crate::overlay::BaseRecords::empty();
61        let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
62        let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
63        let base_bytes = freeze::freeze_view(
64            &view,
65            FreezeStamps {
66                commit_seq: 0,
67                transaction_id: 0,
68                generation: 0,
69            },
70        )?;
71        storage::atomic_write(
72            &root,
73            &root.join(format!("{}.tmp", base_file(0))),
74            &root.join(base_file(0)),
75            &base_bytes,
76        )?;
77        // Empty delta-0.log, durably created.
78        create_empty_log(&root, 0)?;
79        // Superblock is written LAST; its existence is the create-complete marker.
80        write_superblock(&root, 0, 0, 0, 0)?;
81        Self::open(&root)
82    }
83
84    /// Opens an existing OXGDB database, recovering the live frontier from the
85    /// valid prefix of the delta-log replayed over the base named by the
86    /// superblock.
87    ///
88    /// # Errors
89    ///
90    /// Returns [`DbError`] when the store is missing, malformed, or the log is
91    /// corrupt beyond a torn tail.
92    ///
93    /// # Performance
94    ///
95    /// This function is `O(base bytes + log bytes)`. Base integrity is fused
96    /// into the bind pass: the container's table checksum is verified once,
97    /// then each bound section's payload CRC-32C is verified as the section is
98    /// borrowed — there is no separate whole-base CRC scan.
99    pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
100        let root = path.as_ref().to_path_buf();
101        let superblock = wal::read_superblock(&root)?;
102        let generation = superblock.base_generation.get();
103
104        let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
105        let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
106        let base_header = *base.get().header();
107        let base_catalog = base.get().catalog().clone();
108        let base_next = NextIds::from_header(&base_header);
109
110        // Replay the valid prefix of the per-generation delta-log.
111        let log_path = root.join(delta_file(generation));
112        let log_bytes = read_log(&log_path)?;
113        let outcome = wal::replay(generation, &log_bytes)?;
114        // A torn tail truncates the log back to its last-good byte length.
115        if outcome.valid_len < log_bytes.len() {
116            truncate_log(&log_path, outcome.valid_len)?;
117        }
118
119        // Fold the replayed frames into a fresh overlay over the base, deriving
120        // the live frontier (commit_seq/txn_id) from the last good frame.
121        let mut write = WriteOverlay::new(base_next, base_catalog);
122        let mut recovered_next = base_next;
123        let mut last_commit_seq = superblock.commit_seq.get();
124        let mut last_txn = superblock.transaction_id.get();
125        for frame in &outcome.frames {
126            for op in &frame.ops {
127                write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
128            }
129            recovered_next = recovered_next.elementwise_max(write.next_ids());
130            last_commit_seq = frame.lsn;
131            last_txn = last_txn.max(frame.txn_id);
132        }
133        // ids are never reused: the recovered watermark is the elementwise max of
134        // the base header and every replayed frame's watermark.
135        write.set_next_ids(recovered_next);
136        let overlay = Arc::new(write.freeze());
137
138        // Reuse the records already decoded for replay instead of decoding the base
139        // a second time inside `Snapshot::new`: the pinned base is byte-identical, so
140        // the records (and their derived index) match. Halves open's base-decode cost.
141        let snapshot = Arc::new(Snapshot::with_shared_base_records(
142            CheckpointGeneration::new(generation),
143            CommitSeq::new(last_commit_seq),
144            base,
145            overlay,
146            base_records,
147        ));
148
149        Ok(Self {
150            root,
151            current: snapshot,
152            base_generation: generation,
153            last_transaction_id: TransactionId::new(last_txn),
154            checkpoint_policy: CheckpointPolicy::default(),
155        })
156    }
157}
158
159/// Returns the on-disk byte length of `path`, or `0` when it is absent or cannot
160/// be stat'd (size is advisory — used for status reporting and the
161/// auto-checkpoint heuristic, never for correctness).
162///
163/// # Performance
164///
165/// This function is `O(1)`: one `stat` syscall.
166pub(super) fn file_len(path: &Path) -> u64 {
167    std::fs::metadata(path).map_or(0, |meta| meta.len())
168}
169
170/// Reads the whole delta-log into memory, treating a missing file as empty.
171///
172/// # Errors
173///
174/// Returns [`DbError::Io`] when the file cannot be read.
175///
176/// # Performance
177///
178/// This function is `O(log bytes)`.
179pub(super) fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
180    match std::fs::read(path) {
181        Ok(bytes) => Ok(bytes),
182        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
183        Err(error) => Err(DbError::io("read delta-log", error)),
184    }
185}
186
187/// Truncates the delta-log back to `len` (its last-good byte length) and fsyncs,
188/// discarding a torn tail under the open path.
189///
190/// # Errors
191///
192/// Returns [`DbError::Io`] when opening, truncating, or syncing fails.
193///
194/// # Performance
195///
196/// This function is `O(1)`.
197pub(super) fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
198    let file = std::fs::OpenOptions::new()
199        .write(true)
200        .open(path)
201        .map_err(|error| DbError::io("open delta-log for truncate", error))?;
202    let len = u64::try_from(len)
203        .map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
204    file.set_len(len)
205        .map_err(|error| DbError::io("truncate delta-log", error))?;
206    file.sync_all()
207        .map_err(|error| DbError::io("sync truncated delta-log", error))
208}
209
210/// Creates an empty per-generation delta-log, fsyncing the file and the
211/// directory entry so the new (empty) log is durable.
212///
213/// # Errors
214///
215/// Returns [`DbError::Io`] when creation or syncing fails.
216///
217/// # Performance
218///
219/// This function is `O(1)`.
220pub(super) fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
221    let path = root.join(delta_file(generation));
222    let file =
223        std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
224    file.sync_all()
225        .map_err(|error| DbError::io("sync delta-log", error))?;
226    Ok(storage::sync_directory(root)?)
227}
228
229/// Opens the live delta-log for appending (create when absent, read+append).
230///
231/// # Errors
232///
233/// Returns [`DbError::Io`] when the log cannot be opened.
234///
235/// # Performance
236///
237/// This function is `O(1)`.
238pub(super) fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
239    std::fs::OpenOptions::new()
240        .create(true)
241        .truncate(false)
242        .read(true)
243        .append(true)
244        .open(root.join(delta_file(generation)))
245        .map_err(|error| DbError::io("open delta-log for append", error))
246}
247
248/// Writes the superblock naming `generation` with the given frontier stamps.
249///
250/// # Errors
251///
252/// Returns [`DbError::Io`] when publishing fails.
253///
254/// # Performance
255///
256/// This function is `O(1)`.
257pub(super) fn write_superblock(
258    root: &Path,
259    generation: u64,
260    checkpoint_lsn: u64,
261    commit_seq: u64,
262    transaction_id: u64,
263) -> Result<(), DbError> {
264    Ok(wal::write_superblock(
265        root,
266        &SuperblockRecord {
267            magic: crate::wire::SUPERBLOCK_MAGIC,
268            base_generation: generation.into(),
269            checkpoint_lsn: checkpoint_lsn.into(),
270            log_byte_offset: 0u64.into(),
271            commit_seq: commit_seq.into(),
272            transaction_id: transaction_id.into(),
273            format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
274            flags: 0u32.into(),
275            crc32c: 0u32.into(),
276            pad: 0u32.into(),
277        },
278    )?)
279}