use std::{fs::File, io::Read, ops::Deref, path::Path};
use oxgraph_snapshot::Snapshot;
use yoke::Yoke;
use zerocopy::{
FromBytes,
byteorder::{LE, U64},
};
use crate::{Catalog, DbError, crc, freeze, index::BorrowedBaseIndex, wire};
pub(crate) enum Backing {
#[cfg(all(unix, not(miri)))]
Mmap(oxgraph_mmap::Mmap),
Owned(Vec<u8>),
}
impl Backing {
const fn owned(bytes: Vec<u8>) -> Self {
Self::Owned(bytes)
}
}
impl Deref for Backing {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
#[cfg(all(unix, not(miri)))]
Self::Mmap(map) => &map[..],
Self::Owned(bytes) => bytes.as_slice(),
}
}
}
pub(crate) fn open_backing(path: &Path, force_owned: bool) -> Result<Backing, DbError> {
let file = File::open(path).map_err(|error| match error.kind() {
std::io::ErrorKind::NotFound => DbError::NotFound,
_kind => DbError::io("open base file", error),
})?;
map_or_read(file, force_owned)
}
#[cfg(all(unix, not(miri)))]
fn map_or_read(file: File, force_owned: bool) -> Result<Backing, DbError> {
if force_owned {
return read_owned(file);
}
let map =
oxgraph_mmap::map_read_only(&file).map_err(|error| DbError::io("mmap base file", error))?;
Ok(Backing::Mmap(map))
}
#[cfg(not(all(unix, not(miri))))]
fn map_or_read(file: File, force_owned: bool) -> Result<Backing, DbError> {
let _ = force_owned;
read_owned(file)
}
fn read_owned(mut file: File) -> Result<Backing, DbError> {
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)
.map_err(|error| DbError::io("read base file", error))?;
Ok(Backing::owned(bytes))
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct DbHeader {
pub(crate) format_version: u32,
pub(crate) commit_seq: u64,
pub(crate) transaction_id: u64,
pub(crate) checkpoint_generation: u64,
pub(crate) next_element: u64,
pub(crate) next_relation: u64,
pub(crate) next_incidence: u64,
pub(crate) next_role: u64,
pub(crate) next_label: u64,
pub(crate) next_relation_type: u64,
pub(crate) next_property_key: u64,
pub(crate) next_projection: u64,
pub(crate) next_index: u64,
}
impl DbHeader {
const fn from_record(record: &wire::DbHeaderRecord) -> Self {
Self {
format_version: record.format_version.get(),
commit_seq: record.commit_seq.get(),
transaction_id: record.transaction_id.get(),
checkpoint_generation: record.checkpoint_generation.get(),
next_element: record.next_element.get(),
next_relation: record.next_relation.get(),
next_incidence: record.next_incidence.get(),
next_role: record.next_role.get(),
next_label: record.next_label.get(),
next_relation_type: record.next_relation_type.get(),
next_property_key: record.next_property_key.get(),
next_projection: record.next_projection.get(),
next_index: record.next_index.get(),
}
}
}
pub(crate) struct BaseCart {
pub(crate) bytes: Backing,
}
#[derive(yoke::Yokeable)]
#[yoke(prove_covariant)]
pub(crate) struct BaseView<'a> {
elements: &'a [wire::ElementWire],
relations: &'a [wire::RelationWire],
incidences: &'a [wire::IncidenceWire],
element_labels: &'a [U64<LE>],
relation_labels: &'a [U64<LE>],
properties: &'a [wire::PropertyWire],
property_text: &'a [u8],
index: BorrowedBaseIndex<'a>,
catalog: Catalog,
header: DbHeader,
}
impl<'a> BaseView<'a> {
#[must_use]
pub(crate) fn element_label_run(&self, record: &wire::ElementWire) -> Option<&'a [U64<LE>]> {
label_slice(
self.element_labels,
record.label_off.get(),
record.label_len.get(),
)
}
#[must_use]
pub(crate) fn relation_label_run(&self, record: &wire::RelationWire) -> Option<&'a [U64<LE>]> {
label_slice(
self.relation_labels,
record.label_off.get(),
record.label_len.get(),
)
}
#[must_use]
pub(crate) fn property_text(&self, record: &wire::PropertyWire) -> Option<&'a [u8]> {
let start = record.text_off.get() as usize;
let end = start.checked_add(record.text_len.get() as usize)?;
self.property_text.get(start..end)
}
pub(crate) fn elements(&self) -> impl Iterator<Item = &'a wire::ElementWire> {
self.elements.iter()
}
pub(crate) fn relations(&self) -> impl Iterator<Item = &'a wire::RelationWire> {
self.relations.iter()
}
pub(crate) fn incidences(&self) -> impl Iterator<Item = &'a wire::IncidenceWire> {
self.incidences.iter()
}
pub(crate) fn properties(&self) -> impl Iterator<Item = &'a wire::PropertyWire> {
self.properties.iter()
}
#[must_use]
pub(crate) const fn index(&self) -> BorrowedBaseIndex<'a> {
self.index
}
#[must_use]
pub(crate) const fn catalog(&self) -> &Catalog {
&self.catalog
}
#[must_use]
pub(crate) const fn header(&self) -> &DbHeader {
&self.header
}
}
fn label_slice(run: &[U64<LE>], offset: u32, len: u32) -> Option<&[U64<LE>]> {
let start = offset as usize;
let end = start.checked_add(len as usize)?;
run.get(start..end)
}
pub(crate) struct Base {
yoke: Yoke<BaseView<'static>, Box<BaseCart>>,
}
impl Base {
pub(crate) fn open(path: &Path, force_owned: bool) -> Result<Self, DbError> {
let backing = open_backing(path, force_owned)?;
Self::attach(backing)
}
#[cfg(test)]
pub(crate) fn open_owned_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
Self::attach(Backing::owned(bytes))
}
fn attach(backing: Backing) -> Result<Self, DbError> {
verify_base_crc(&backing)?;
let cart = Box::new(BaseCart { bytes: backing });
let yoke = Yoke::try_attach_to_cart(cart, |cart: &BaseCart| attach_view(&cart.bytes))?;
Ok(Self { yoke })
}
#[must_use]
pub(crate) fn get(&self) -> &BaseView<'_> {
self.yoke.get()
}
}
fn verify_base_crc(bytes: &[u8]) -> Result<(), DbError> {
let snapshot =
Snapshot::open(bytes).map_err(|error| DbError::invalid_store(error.to_string()))?;
let trailer_section = snapshot
.section(wire::SECTION_BASE_TRAILER)
.ok_or_else(|| DbError::invalid_store("base is missing its content trailer"))?;
let payload = trailer_section.bytes();
let trailer = wire::BaseTrailer::ref_from_prefix(payload)
.map(|(record, _rest)| record)
.map_err(|_error| DbError::invalid_store("base trailer payload is truncated"))?;
let payload_offset = payload.as_ptr().addr() - bytes.as_ptr().addr();
let covered = bytes
.get(..payload_offset)
.ok_or_else(|| DbError::invalid_store("base trailer offset out of bounds"))?;
let recomputed = crc::checksum(covered);
if recomputed == trailer.crc32c.get() {
Ok(())
} else {
Err(DbError::invalid_store("base content CRC mismatch"))
}
}
fn attach_view(bytes: &[u8]) -> Result<BaseView<'_>, DbError> {
let snapshot =
Snapshot::open(bytes).map_err(|error| DbError::invalid_store(error.to_string()))?;
let headers =
freeze::typed_records::<wire::DbHeaderRecord>(&snapshot, wire::SECTION_DB_HEADER)?;
let header_record = headers
.first()
.ok_or_else(|| DbError::invalid_store("base is missing the header section"))?;
if header_record.format_version.get() != wire::OXGDB_FORMAT_VERSION {
return Err(DbError::UnsupportedFormat {
found: header_record.format_version.get(),
expected: wire::OXGDB_FORMAT_VERSION,
});
}
let header = DbHeader::from_record(header_record);
let string_table = freeze::raw_blob(&snapshot, wire::SECTION_STRING_TABLE);
let defs = freeze::typed_records::<U64<LE>>(&snapshot, wire::SECTION_CATALOG_DEFS)?;
let catalog = freeze::decode_catalog(&snapshot, string_table, defs)?;
let properties =
freeze::typed_records::<wire::PropertyWire>(&snapshot, wire::SECTION_PROPERTY_RECORDS)?;
verify_properties_sorted(properties)?;
let index = attach_index(&snapshot)?;
Ok(BaseView {
elements: freeze::typed_records::<wire::ElementWire>(
&snapshot,
wire::SECTION_ELEMENT_RECORDS,
)?,
relations: freeze::typed_records::<wire::RelationWire>(
&snapshot,
wire::SECTION_RELATION_RECORDS,
)?,
incidences: freeze::typed_records::<wire::IncidenceWire>(
&snapshot,
wire::SECTION_INCIDENCE_RECORDS,
)?,
element_labels: freeze::typed_records::<U64<LE>>(&snapshot, wire::SECTION_ELEMENT_LABELS)?,
relation_labels: freeze::typed_records::<U64<LE>>(
&snapshot,
wire::SECTION_RELATION_LABELS,
)?,
properties,
property_text: freeze::raw_blob(&snapshot, wire::SECTION_PROPERTY_TEXT),
index,
catalog,
header,
})
}
fn attach_index<'a>(snapshot: &Snapshot<'a>) -> Result<BorrowedBaseIndex<'a>, DbError> {
let (label_dir, label_pool) = posting_slices(snapshot, wire::SECTION_INDEX_LABEL_POSTINGS)?;
let (relation_type_dir, relation_type_pool) =
posting_slices(snapshot, wire::SECTION_INDEX_RELATION_TYPE_POSTINGS)?;
let (element_incidence_dir, element_incidence_pool) =
posting_slices(snapshot, wire::SECTION_INDEX_ELEMENT_INCIDENCES)?;
let (relation_incidence_dir, relation_incidence_pool) =
posting_slices(snapshot, wire::SECTION_INDEX_RELATION_INCIDENCES)?;
let (equality_dir, equality_pool) = posting_slices(snapshot, wire::SECTION_INDEX_EQUALITY)?;
let equality_text = freeze::raw_blob(snapshot, wire::SECTION_INDEX_EQUALITY_TEXT);
BorrowedBaseIndex::from_sections(
label_dir,
label_pool,
relation_type_dir,
relation_type_pool,
equality_dir,
equality_pool,
equality_text,
element_incidence_dir,
element_incidence_pool,
relation_incidence_dir,
relation_incidence_pool,
)
}
#[expect(
clippy::type_complexity,
reason = "the directory `[T]` and value-pool `[U64<LE>]` slices are returned together as one framed section's two regions"
)]
fn posting_slices<'a, T>(
snapshot: &Snapshot<'a>,
kind: u32,
) -> Result<(&'a [T], &'a [U64<LE>]), DbError>
where
T: FromBytes + zerocopy::Immutable + zerocopy::KnownLayout,
{
let Some(section) = snapshot.section(kind) else {
return Ok((&[], &[]));
};
let (dir_bytes, pool_bytes) = freeze::split_posting_section(section.bytes())?;
let dir = <[T]>::ref_from_bytes(dir_bytes)
.map_err(|_error| DbError::invalid_store("posting directory is not a whole array"))?;
let pool = <[U64<LE>]>::ref_from_bytes(pool_bytes)
.map_err(|_error| DbError::invalid_store("posting value pool is not a whole array"))?;
Ok((dir, pool))
}
const fn property_sort_key(record: &wire::PropertyWire) -> (u32, u64, u64) {
(
record.subject_kind.get(),
record.subject_id.get(),
record.key.get(),
)
}
fn verify_properties_sorted(properties: &[wire::PropertyWire]) -> Result<(), DbError> {
let ordered = properties
.windows(2)
.all(|pair| property_sort_key(&pair[0]) <= property_sort_key(&pair[1]));
if ordered {
Ok(())
} else {
Err(DbError::invalid_store(
"base property records are not sorted by (subject_kind, subject_id, key)",
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::overlay::test_support::small_base;
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<Base>();
fn assert_small_reads(base: &Base) {
let view = base.get();
assert_eq!(view.header().format_version, wire::OXGDB_FORMAT_VERSION);
assert_eq!(view.elements().count(), 3);
assert_eq!(view.relations().count(), 2);
assert_eq!(view.incidences().count(), 2);
let person = view.catalog().label_id("Person").expect("Person label");
let labelled = view
.elements()
.filter(|record| {
view.element_label_run(record)
.is_some_and(|labels| labels.iter().any(|word| word.get() == person.get()))
})
.count();
assert_eq!(labelled, 2, "two elements carry the Person label");
let alice = view
.properties()
.find(|record| view.property_text(record).is_some())
.expect("a text property");
assert_eq!(view.property_text(alice).expect("text"), b"Alice");
assert!(view.catalog().property_key_id("name").is_some());
}
#[test]
fn owned_backing_borrows_canonical_reads() {
assert_small_reads(&small_base());
}
#[test]
fn property_sort_check_accepts_sorted_rejects_unsorted() {
use zerocopy::byteorder::{U32, U64};
let make = |kind: u32, id: u64, key: u64| wire::PropertyWire {
subject_kind: U32::new(kind),
value_tag: U32::new(0),
subject_id: U64::new(id),
key: U64::new(key),
scalar: U64::new(0),
text_off: U32::new(0),
text_len: U32::new(0),
};
let sorted = [make(0, 1, 1), make(0, 1, 2), make(0, 2, 1), make(2, 1, 1)];
assert!(verify_properties_sorted(&sorted).is_ok());
assert!(verify_properties_sorted(&[]).is_ok());
assert!(verify_properties_sorted(&[make(1, 5, 9)]).is_ok());
assert!(verify_properties_sorted(&[make(0, 1, 2), make(0, 1, 1)]).is_err());
assert!(verify_properties_sorted(&[make(0, 2, 1), make(0, 1, 1)]).is_err());
assert!(verify_properties_sorted(&[make(2, 1, 1), make(0, 1, 1)]).is_err());
}
#[test]
fn unsupported_format_version_is_rejected() {
use zerocopy::IntoBytes;
let mut bytes = small_base_bytes();
let (header_offset, trailer_payload_offset) = {
let snapshot = Snapshot::open(&bytes).expect("reopen frozen base");
let header = snapshot
.section(wire::SECTION_DB_HEADER)
.expect("header section");
let trailer = snapshot
.section(wire::SECTION_BASE_TRAILER)
.expect("trailer section");
(
header.bytes().as_ptr().addr() - bytes.as_ptr().addr(),
trailer.bytes().as_ptr().addr() - bytes.as_ptr().addr(),
)
};
let bogus = wire::OXGDB_FORMAT_VERSION + 1;
let version_field = size_of::<zerocopy::byteorder::U32<LE>>();
bytes[header_offset..header_offset + version_field]
.copy_from_slice(zerocopy::byteorder::U32::<LE>::new(bogus).as_bytes());
let crc = crate::crc::checksum(&bytes[..trailer_payload_offset]);
bytes[trailer_payload_offset..trailer_payload_offset + version_field]
.copy_from_slice(zerocopy::byteorder::U32::<LE>::new(crc).as_bytes());
let result = Base::open_owned_bytes(bytes).map(|_base| ());
assert!(
matches!(
result,
Err(DbError::UnsupportedFormat { found, expected })
if found == bogus && expected == wire::OXGDB_FORMAT_VERSION
),
"unsupported format must be rejected loudly, got {result:?}",
);
}
#[test]
fn corrupt_covered_byte_fails_attach() {
let mut bytes = small_base_bytes();
bytes[16] ^= 0xFF;
let result = Base::open_owned_bytes(bytes).map(|_base| ());
assert!(
matches!(result, Err(DbError::InvalidStore { .. })),
"corrupt base must fail, got {result:?}",
);
}
fn small_base_bytes() -> Vec<u8> {
use crate::{
freeze::{FreezeStamps, freeze_view},
overlay::{BaseRecords, MergedState, Overlay},
state::NextIds,
};
let base = small_base();
let records = BaseRecords::from_view(base.get()).expect("base records");
let header = *base.get().header();
let overlay = Overlay::empty(NextIds::from_header(&header), base.get().catalog().clone());
let view = MergedState::new(&records, &overlay);
freeze_view(
&view,
FreezeStamps {
commit_seq: 1,
transaction_id: 1,
generation: 1,
},
)
.expect("freeze small base bytes")
}
#[cfg(not(miri))]
mod file_backed {
use std::{
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use super::*;
static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
fn write_temp_base(name: &str, bytes: &[u8]) -> PathBuf {
let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!(
"oxgraph-db-backing-{name}-{}-{id}.oxgdb",
std::process::id()
));
std::fs::write(&path, bytes).expect("write temp base");
path
}
#[test]
fn file_open_borrows_canonical_reads() {
let bytes = small_base_bytes();
let path = write_temp_base("open", &bytes);
let default_open = Base::open(&path, false).expect("open base");
let forced_owned = Base::open(&path, true).expect("open base force_owned");
assert_small_reads(&default_open);
assert_small_reads(&forced_owned);
let lhs = default_open.get();
let rhs = forced_owned.get();
assert_eq!(lhs.elements().count(), rhs.elements().count());
for (lhs_record, rhs_record) in lhs.elements().zip(rhs.elements()) {
assert_eq!(lhs_record.id.get(), rhs_record.id.get());
assert_eq!(
lhs.element_label_run(lhs_record),
rhs.element_label_run(rhs_record),
);
}
let _ = std::fs::remove_file(&path);
}
#[test]
fn missing_base_is_not_found() {
let path = std::env::temp_dir().join(format!(
"oxgraph-db-backing-missing-{}-{}.oxgdb",
std::process::id(),
NEXT_PATH.fetch_add(1, Ordering::Relaxed),
));
let _ = std::fs::remove_file(&path);
let result = Base::open(&path, true).map(|_base| ());
assert!(matches!(result, Err(DbError::NotFound)), "got {result:?}");
}
}
}