use std::{
borrow::Cow,
path::{Path, PathBuf},
sync::Arc,
};
use crate::{
Catalog, CheckpointGeneration, CommitSeq, DbError, ElementId, ElementRecord, GraphProjection,
HypergraphProjection, IncidenceId, IncidenceRecord, IndexId, LabelId, PreparedQuery,
ProjectionDefinition, ProjectionId, PropertyKeyId, PropertySubject, PropertyType,
PropertyValue, QueryLanguage, QueryResult, RelationId, RelationRecord, RelationTypeId, RoleId,
TransactionId,
backing::Base,
catalog::{IndexDefinition, PropertyFamily},
freeze::{self, FreezeStamps},
lock::WriterLock,
overlay::{Overlay, Snapshot, StateView, WriteOverlay},
projection,
state::NextIds,
storage,
traversal::{self, TraversalOptions, TraversalResult},
wal,
wire::SuperblockRecord,
};
#[derive(Clone, Copy, Debug)]
pub enum IndexLookup<'value> {
All,
Equal(&'value PropertyValue),
Range {
min: &'value PropertyValue,
max: &'value PropertyValue,
},
CompositeEqual(&'value [PropertyValue]),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CheckpointPolicy {
Manual,
SizeRatio {
factor: u32,
},
}
impl CheckpointPolicy {
pub const DEFAULT_FACTOR: u32 = 4;
const MIN_BASE_BYTES: u64 = 4 * 1024;
#[must_use]
const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
match self {
Self::Manual => false,
Self::SizeRatio { factor } => {
let floor = if base_bytes < Self::MIN_BASE_BYTES {
Self::MIN_BASE_BYTES
} else {
base_bytes
};
log_bytes > floor.saturating_mul(factor as u64)
}
}
}
}
impl Default for CheckpointPolicy {
fn default() -> Self {
Self::SizeRatio {
factor: Self::DEFAULT_FACTOR,
}
}
}
fn base_file(generation: u64) -> String {
format!("base-{generation}.oxgdb")
}
fn delta_file(generation: u64) -> String {
format!("delta-{generation}.log")
}
pub struct Database {
root: PathBuf,
current: Arc<Snapshot>,
base_generation: u64,
last_transaction_id: TransactionId,
checkpoint_policy: CheckpointPolicy,
}
impl Database {
pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
if root.join(wal::SUPERBLOCK_FILE).exists() {
return Err(DbError::AlreadyExists);
}
let empty_base = crate::overlay::BaseRecords::empty();
let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq: 0,
transaction_id: 0,
generation: 0,
},
)?;
storage::atomic_write(
&root,
&root.join(format!("{}.tmp", base_file(0))),
&root.join(base_file(0)),
&base_bytes,
)?;
create_empty_log(&root, 0)?;
write_superblock(&root, 0, 0, 0, 0)?;
Self::open(&root)
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
let superblock = wal::read_superblock(&root)?;
let generation = superblock.base_generation.get();
let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
let base_records = Arc::new(crate::overlay::BaseRecords::from_view(base.get())?);
let base_header = *base.get().header();
let base_catalog = base.get().catalog().clone();
let base_next = NextIds::from_header(&base_header);
let log_path = root.join(delta_file(generation));
let log_bytes = read_log(&log_path)?;
let outcome = wal::replay(generation, &log_bytes)?;
if outcome.valid_len < log_bytes.len() {
truncate_log(&log_path, outcome.valid_len)?;
}
let mut write = WriteOverlay::new(base_next, base_catalog);
let mut recovered_next = base_next;
let mut last_commit_seq = superblock.commit_seq.get();
let mut last_txn = superblock.transaction_id.get();
for frame in &outcome.frames {
for op in &frame.ops {
write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
}
recovered_next = recovered_next.elementwise_max(write.next_ids());
last_commit_seq = frame.lsn;
last_txn = last_txn.max(frame.txn_id);
}
write.set_next_ids(recovered_next);
let overlay = Arc::new(write.freeze());
let snapshot = Arc::new(Snapshot::new(
CheckpointGeneration::new(generation),
CommitSeq::new(last_commit_seq),
base,
overlay,
)?);
Ok(Self {
root,
current: snapshot,
base_generation: generation,
last_transaction_id: TransactionId::new(last_txn),
checkpoint_policy: CheckpointPolicy::default(),
})
}
#[must_use]
pub const fn live_generation(&self) -> CheckpointGeneration {
CheckpointGeneration::new(self.base_generation)
}
#[must_use]
pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
self.checkpoint_policy
}
pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
self.checkpoint_policy = policy;
}
pub fn validate(&self) -> Result<(), DbError> {
wal::read_superblock(&self.root)?;
Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
}
pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
Self::open(path).map(|_database| ())
}
pub fn compact(&mut self) -> Result<(), DbError> {
self.checkpoint()
}
pub fn checkpoint(&mut self) -> Result<(), DbError> {
self.checkpoint_inner(
#[cfg(test)]
CheckpointStop::Complete,
)
}
fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
let _lock = WriterLock::acquire(&self.root)?;
let next_generation = self
.base_generation
.checked_add(1)
.ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
let view = self.current.view();
let commit_seq = self.current.lsn().get();
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq,
transaction_id: self.last_transaction_id.get(),
generation: next_generation,
},
)?;
storage::atomic_write(
&self.root,
&self
.root
.join(format!("{}.tmp", base_file(next_generation))),
&self.root.join(base_file(next_generation)),
&base_bytes,
)?;
create_empty_log(&self.root, next_generation)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeSuperblock) {
return Ok(());
}
write_superblock(
&self.root,
next_generation,
commit_seq,
commit_seq,
self.last_transaction_id.get(),
)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeRotate) {
return Ok(());
}
let reopened = Self::open(&self.root)?;
let old_generation = self.base_generation;
let policy = self.checkpoint_policy;
self.current = reopened.current;
self.base_generation = reopened.base_generation;
self.last_transaction_id = reopened.last_transaction_id;
self.checkpoint_policy = policy;
let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
let _ = storage::sync_directory(&self.root);
Ok(())
}
fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
if self
.checkpoint_policy
.should_checkpoint(log_bytes, base_bytes)
{
self.checkpoint()?;
}
Ok(())
}
#[must_use]
pub fn status(&self) -> DatabaseStatus {
let view = self.current.view();
DatabaseStatus {
visible_commit_seq: self.current.lsn(),
last_transaction_id: self.last_transaction_id,
live_generation: CheckpointGeneration::new(self.base_generation),
base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
element_count: view.element_count(),
relation_count: view.relation_count(),
incidence_count: view.incidence_count(),
catalog: self.catalog_summary(),
}
}
#[must_use]
pub fn catalog_summary(&self) -> CatalogSummary {
CatalogSummary::from_catalog(self.current.view().catalog())
}
#[must_use]
pub fn begin_read(&self) -> ReadTransaction {
ReadTransaction {
snapshot: Arc::clone(&self.current),
}
}
pub fn begin_write(&mut self) -> Result<WriteTransaction<'_>, DbError> {
let lock = WriterLock::acquire(&self.root)?;
let transaction_id = self
.last_transaction_id
.checked_next()
.ok_or(DbError::TransactionIdOverflow)?;
self.last_transaction_id = transaction_id;
let parent = Arc::clone(&self.current);
let delta = WriteOverlay::from_overlay(parent.overlay());
Ok(WriteTransaction {
database: self,
parent,
delta,
transaction_id,
lock,
})
}
pub fn prepare(&self, language: QueryLanguage, query: &str) -> Result<PreparedQuery, DbError> {
PreparedQuery::prepare(language, query, &self.current.view())
}
}
fn file_len(path: &Path) -> u64 {
std::fs::metadata(path).map_or(0, |meta| meta.len())
}
#[cfg(test)]
#[cfg_attr(
miri,
expect(
dead_code,
reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
)
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum CheckpointStop {
Complete,
BeforeSuperblock,
BeforeRotate,
}
fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
match std::fs::read(path) {
Ok(bytes) => Ok(bytes),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
Err(error) => Err(DbError::io("read delta-log", error)),
}
}
fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
let file = std::fs::OpenOptions::new()
.write(true)
.open(path)
.map_err(|error| DbError::io("open delta-log for truncate", error))?;
let len = u64::try_from(len)
.map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
file.set_len(len)
.map_err(|error| DbError::io("truncate delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync truncated delta-log", error))
}
fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
let path = root.join(delta_file(generation));
let file =
std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync delta-log", error))?;
storage::sync_directory(root)
}
fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.append(true)
.open(root.join(delta_file(generation)))
.map_err(|error| DbError::io("open delta-log for append", error))
}
fn write_superblock(
root: &Path,
generation: u64,
checkpoint_lsn: u64,
commit_seq: u64,
transaction_id: u64,
) -> Result<(), DbError> {
wal::write_superblock(
root,
&SuperblockRecord {
magic: crate::wire::SUPERBLOCK_MAGIC,
base_generation: generation.into(),
checkpoint_lsn: checkpoint_lsn.into(),
log_byte_offset: 0u64.into(),
commit_seq: commit_seq.into(),
transaction_id: transaction_id.into(),
format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
},
)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct DatabaseStatus {
pub visible_commit_seq: CommitSeq,
pub last_transaction_id: TransactionId,
pub live_generation: CheckpointGeneration,
pub base_byte_size: u64,
pub log_byte_size: u64,
pub element_count: usize,
pub relation_count: usize,
pub incidence_count: usize,
pub catalog: CatalogSummary,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CatalogSummary {
pub role_count: usize,
pub label_count: usize,
pub relation_type_count: usize,
pub property_key_count: usize,
pub projection_count: usize,
pub index_count: usize,
}
impl CatalogSummary {
#[must_use]
pub fn from_catalog(catalog: &Catalog) -> Self {
Self {
role_count: catalog.roles().count(),
label_count: catalog.labels().count(),
relation_type_count: catalog.relation_types().count(),
property_key_count: catalog.property_keys().count(),
projection_count: catalog.projections().count(),
index_count: catalog.indexes().count(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ReadPin {
pub visible_commit_seq: CommitSeq,
pub generation: CheckpointGeneration,
}
pub struct ReadTransaction {
snapshot: Arc<Snapshot>,
}
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<ReadTransaction>();
const _: () = assert_send_sync::<Arc<Snapshot>>();
impl ReadTransaction {
#[must_use]
pub fn pin(&self) -> ReadPin {
ReadPin {
visible_commit_seq: self.snapshot.lsn(),
generation: self.snapshot.generation(),
}
}
#[must_use]
pub fn catalog(&self) -> &Catalog {
self.snapshot.view().catalog_ref()
}
#[must_use]
pub fn element_count(&self) -> usize {
self.snapshot.view().element_count()
}
#[must_use]
pub fn relation_count(&self) -> usize {
self.snapshot.view().relation_count()
}
#[must_use]
pub fn incidence_count(&self) -> usize {
self.snapshot.view().incidence_count()
}
#[must_use]
pub fn element_ids(&self) -> Vec<ElementId> {
self.snapshot
.view()
.elements()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn relation_ids(&self) -> Vec<RelationId> {
self.snapshot
.view()
.relations()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn contains_element(&self, id: ElementId) -> bool {
self.snapshot.view().contains_element(id)
}
#[must_use]
pub fn contains_relation(&self, id: RelationId) -> bool {
self.snapshot.view().contains_relation(id)
}
#[must_use]
pub fn contains_incidence(&self, id: IncidenceId) -> bool {
self.snapshot.view().contains_incidence(id)
}
#[must_use]
pub fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>> {
self.snapshot.view().element_ref(id)
}
#[must_use]
pub fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>> {
self.snapshot.view().relation_ref(id)
}
#[must_use]
pub fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>> {
self.snapshot.view().incidence_ref(id)
}
#[must_use]
pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
self.snapshot.view().element_incidences(id)
}
#[must_use]
pub fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>> {
self.snapshot.view().property_ref(subject, key)
}
pub fn lookup_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_equal(key, value)
}
pub fn lookup_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_range(key, min, max)
}
pub fn lookup_index(
&self,
index: IndexId,
lookup: IndexLookup<'_>,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.index(index)
.ok_or(DbError::UnknownIndex { id: index })?;
match (&entry.definition, lookup) {
(IndexDefinition::Label { label }, IndexLookup::All) => Ok(view
.elements_with_label(*label)
.into_iter()
.map(PropertySubject::Element)
.collect()),
(IndexDefinition::Label { .. }, _lookup) => {
Err(DbError::unsupported("label index expects all lookup"))
}
(IndexDefinition::RelationType { relation_type }, IndexLookup::All) => Ok(view
.relations_with_type(*relation_type)
.into_iter()
.map(PropertySubject::Relation)
.collect()),
(IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
"relation type index expects all lookup",
)),
(IndexDefinition::PropertyEquality { key }, IndexLookup::Equal(value)) => {
view.typed_property_equal(*key, value)
}
(IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
"property equality index expects equality lookup",
)),
(IndexDefinition::PropertyRange { key }, IndexLookup::Range { min, max }) => {
view.typed_property_range(*key, min, max)
}
(IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
"property range index expects range lookup",
)),
(IndexDefinition::CompositeEquality { keys }, IndexLookup::CompositeEqual(values)) => {
view.typed_property_composite_equal(keys, values)
}
(IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
"composite equality index expects composite equality lookup",
)),
(IndexDefinition::Projection { projection }, IndexLookup::All) => {
self.projection_index_subjects(*projection)
}
(IndexDefinition::Projection { .. }, _lookup) => {
Err(DbError::unsupported("projection index expects all lookup"))
}
}
}
pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
projection::GraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Hypergraph(_definition) => {
Err(DbError::invalid_projection("projection is not a graph"))
}
}
}
pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
let id = self
.snapshot
.view()
.catalog()
.projection_id(name)
.ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
self.graph_projection(id)
}
pub fn traverse_graph(
&self,
projection: ProjectionId,
seeds: &[ElementId],
options: TraversalOptions,
) -> Result<TraversalResult, DbError> {
if seeds.is_empty() || options.limit == 0 {
return Ok(TraversalResult::new(Vec::new()));
}
let graph = self.graph_projection(projection)?;
traversal::traverse_graph_projection(&graph, seeds, options)
}
pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Hypergraph(definition) => {
projection::HypergraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
"projection is not a hypergraph",
)),
}
}
pub fn execute(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
query.execute(&self.snapshot.view())
}
#[must_use]
pub fn explain(&self, query: &PreparedQuery) -> String {
query.explain()
}
fn projection_index_subjects(
&self,
projection: ProjectionId,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(projection)
.ok_or(DbError::UnknownProjection { id: projection })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
}
ProjectionDefinition::Hypergraph(definition) => Ok(
projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
),
}
}
}
pub struct WriteTransaction<'db> {
database: &'db mut Database,
parent: Arc<Snapshot>,
delta: WriteOverlay,
transaction_id: TransactionId,
lock: WriterLock,
}
impl WriteTransaction<'_> {
pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
self.delta.register_role(name.into())
}
pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
self.delta.register_label(name.into())
}
pub fn register_relation_type(
&mut self,
name: impl Into<String>,
) -> Result<RelationTypeId, DbError> {
self.delta.register_relation_type(name.into())
}
pub fn register_property_key(
&mut self,
name: impl Into<String>,
family: PropertyFamily,
value_type: PropertyType,
) -> Result<PropertyKeyId, DbError> {
self.delta
.register_property_key(name.into(), family, value_type)
}
pub fn define_projection(
&mut self,
definition: ProjectionDefinition,
) -> Result<ProjectionId, DbError> {
self.validate_projection_definition(&definition)?;
self.delta.register_projection(definition)
}
pub fn define_index(
&mut self,
name: impl Into<String>,
definition: IndexDefinition,
) -> Result<IndexId, DbError> {
self.validate_index_definition(&definition)?;
self.delta.register_index(name.into(), definition)
}
pub fn create_element(&mut self) -> Result<ElementId, DbError> {
self.delta.create_element()
}
pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
self.delta.create_relation()
}
pub fn create_incidence(
&mut self,
relation: RelationId,
element: ElementId,
role: RoleId,
) -> Result<IncidenceId, DbError> {
self.require_relation(relation)?;
self.require_element(element)?;
self.require_role(role)?;
self.delta.create_incidence(relation, element, role)
}
pub fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
self.require_element(id)?;
let base = self.parent.base_records();
self.delta.tombstone_element(base, id);
let incidences: Vec<IncidenceId> = self
.merged()
.incidences()
.filter(|record| record.element == id)
.map(|record| record.id)
.collect();
for incidence in incidences {
self.delta
.tombstone_incidence(self.parent.base_records(), incidence);
}
Ok(())
}
pub fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
self.require_relation(id)?;
let base = self.parent.base_records();
self.delta.tombstone_relation(base, id);
let incidences: Vec<IncidenceId> = self
.merged()
.incidences()
.filter(|record| record.relation == id)
.map(|record| record.id)
.collect();
for incidence in incidences {
self.delta
.tombstone_incidence(self.parent.base_records(), incidence);
}
Ok(())
}
pub fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
self.require_incidence(id)?;
self.delta
.tombstone_incidence(self.parent.base_records(), id);
Ok(())
}
pub fn add_element_label(&mut self, element: ElementId, label: LabelId) -> Result<(), DbError> {
self.require_element(element)?;
self.require_label(label)?;
self.delta
.add_element_label(self.parent.base_records(), element, label);
Ok(())
}
pub fn add_relation_label(
&mut self,
relation: RelationId,
label: LabelId,
) -> Result<(), DbError> {
self.require_relation(relation)?;
self.require_label(label)?;
self.delta
.add_relation_label(self.parent.base_records(), relation, label);
Ok(())
}
pub fn set_relation_type(
&mut self,
relation: RelationId,
relation_type: RelationTypeId,
) -> Result<(), DbError> {
self.require_relation(relation)?;
self.require_relation_type(relation_type)?;
self.delta
.set_relation_type(self.parent.base_records(), relation, relation_type);
Ok(())
}
pub fn set_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
value: PropertyValue,
) -> Result<(), DbError> {
self.require_subject(subject)?;
let definition = self
.merged()
.catalog()
.property_key(key)
.cloned()
.ok_or(DbError::UnknownPropertyKey { id: key })?;
if definition.family != subject.family() {
return Err(DbError::WrongPropertyFamily {
expected: definition.family,
actual: subject.family(),
});
}
if definition.value_type != value.value_type() {
return Err(DbError::PropertyTypeMismatch {
expected: definition.value_type,
actual: value.value_type(),
});
}
self.delta
.set_property(self.parent.base_records(), subject, key, value);
Ok(())
}
pub fn remove_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Result<(), DbError> {
self.require_subject(subject)?;
if self.merged().catalog().property_key(key).is_none() {
return Err(DbError::UnknownPropertyKey { id: key });
}
self.delta
.remove_property(self.parent.base_records(), subject, key);
Ok(())
}
pub fn commit(self) -> Result<CommitSeq, DbError> {
if self.delta.is_empty() {
return Ok(self.parent.lsn());
}
let lsn = self
.parent
.lsn()
.checked_next()
.ok_or(DbError::CommitSeqOverflow)?;
let (ops, blob) = self.delta.encode_frame();
let frame = wal::encode_commit(
lsn.get(),
self.transaction_id.get(),
self.database.base_generation,
&ops,
&blob,
)?;
let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
wal::append_commit(&mut log, &frame)?;
let new_overlay = Arc::new(self.delta.freeze());
let snapshot = Snapshot::with_shared_base_records(
self.parent.generation(),
lsn,
Arc::clone(self.parent.base()),
new_overlay,
Arc::clone(self.parent.base_records()),
);
self.database.current = Arc::new(snapshot);
self.database.last_transaction_id = self.transaction_id;
drop(self.lock);
self.database.maybe_auto_checkpoint()?;
Ok(lsn)
}
pub fn rollback(self) {}
fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
}
fn require_element(&self, id: ElementId) -> Result<(), DbError> {
if self.merged().contains_element(id) {
Ok(())
} else {
Err(DbError::UnknownElement { id })
}
}
fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
if self.merged().contains_relation(id) {
Ok(())
} else {
Err(DbError::UnknownRelation { id })
}
}
fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
if self.merged().contains_incidence(id) {
Ok(())
} else {
Err(DbError::UnknownIncidence { id })
}
}
fn require_role(&self, id: RoleId) -> Result<(), DbError> {
if self.delta.catalog().role(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownRole { id })
}
}
fn require_label(&self, id: LabelId) -> Result<(), DbError> {
if self.delta.catalog().label(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownLabel { id })
}
}
fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
if self.delta.catalog().relation_type(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownRelationType { id })
}
}
fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
match subject {
PropertySubject::Element(id) => self.require_element(id),
PropertySubject::Relation(id) => self.require_relation(id),
PropertySubject::Incidence(id) => self.require_incidence(id),
}
}
fn validate_projection_definition(
&self,
definition: &ProjectionDefinition,
) -> Result<(), DbError> {
match definition {
ProjectionDefinition::Graph(graph) => {
self.require_role(graph.source_role)?;
self.require_role(graph.target_role)?;
for relation_type in &graph.relation_types {
self.require_relation_type(*relation_type)?;
}
Ok(())
}
ProjectionDefinition::Hypergraph(hyper) => {
for role in &hyper.source_roles {
self.require_role(*role)?;
}
for role in &hyper.target_roles {
self.require_role(*role)?;
}
for relation_type in &hyper.relation_types {
self.require_relation_type(*relation_type)?;
}
Ok(())
}
}
}
fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
let catalog = self.delta.catalog();
match definition {
IndexDefinition::Label { label } => self.require_label(*label),
IndexDefinition::RelationType { relation_type } => {
self.require_relation_type(*relation_type)
}
IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
self.require_property_key(*key)
}
IndexDefinition::CompositeEquality { keys } => {
if keys.is_empty() {
return Err(DbError::unsupported(
"composite equality index requires at least one key",
));
}
for key in keys {
self.require_property_key(*key)?;
}
Ok(())
}
IndexDefinition::Projection { projection } => catalog
.projection(*projection)
.is_some()
.then_some(())
.ok_or(DbError::UnknownProjection { id: *projection }),
}
}
fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
if self.delta.catalog().property_key(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownPropertyKey { id })
}
}
}
#[cfg(test)]
#[cfg(not(miri))]
mod tests {
use std::{
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use super::*;
static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
fn temp_store(name: &str) -> PathBuf {
let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
let _ = std::fs::remove_dir_all(&path);
path
}
#[derive(Debug, Eq, PartialEq)]
struct LogicalState {
elements: Vec<ElementId>,
rank_eq_500: Vec<PropertySubject>,
person_members: Vec<ElementId>,
}
struct Fixture {
rank: PropertyKeyId,
person: LabelId,
}
fn build_fixture(database: &mut Database) -> Fixture {
let mut writer = database.begin_write().expect("begin write");
let rank = writer
.register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
.expect("rank key");
let person = writer.register_label("Person").expect("person label");
for index in 0..8u64 {
let element = writer.create_element().expect("element");
writer
.set_property(
PropertySubject::Element(element),
rank,
PropertyValue::Integer(i64::try_from(index).expect("index") * 100),
)
.expect("set rank");
if index % 2 == 0 {
writer
.add_element_label(element, person)
.expect("add label");
}
}
writer.commit().expect("commit fixture");
Fixture { rank, person }
}
fn read_logical(database: &Database, fixture: &Fixture) -> LogicalState {
let read = database.begin_read();
let elements = read.element_ids();
let rank_eq_500 = read
.lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
.expect("rank lookup");
let person_members = read.snapshot.view().elements_with_label(fixture.person);
LogicalState {
elements,
rank_eq_500,
person_members,
}
}
fn assert_no_id_reuse_across_fold(database: &mut Database) {
let max_existing = database
.begin_read()
.element_ids()
.into_iter()
.map(ElementId::get)
.max()
.unwrap_or(0);
let expected = ElementId::new(max_existing + 1);
let mut writer = database.begin_write().expect("watermark probe writer");
let minted = writer.create_element().expect("watermark probe element");
assert_eq!(
minted, expected,
"the next minted id must be one past the max existing id (watermark \
survived the fold; ids are never reused)",
);
writer.rollback();
}
#[test]
fn checkpoint_crash_matrix_recovers_exact_state() {
for stop in [
CheckpointStop::BeforeSuperblock,
CheckpointStop::BeforeRotate,
CheckpointStop::Complete,
] {
let path = temp_store(&format!("crash-{stop:?}"));
let mut database = Database::create(&path).expect("create");
let fixture = build_fixture(&mut database);
let before = read_logical(&database, &fixture);
let before_generation = database.base_generation;
database
.checkpoint_inner(stop)
.expect("checkpoint stop returns ok");
drop(database);
let mut recovered = Database::open(&path).expect("reopen after crash");
let after = read_logical(&recovered, &fixture);
assert_eq!(
after, before,
"crash at {stop:?} must recover the exact logical state",
);
assert_no_id_reuse_across_fold(&mut recovered);
match stop {
CheckpointStop::BeforeSuperblock => assert_eq!(
recovered.base_generation, before_generation,
"old superblock stays authoritative before the new one lands",
),
CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
recovered.base_generation,
before_generation + 1,
"the new superblock names the folded generation",
),
}
let reopened = Database::open(&path).expect("second reopen");
assert_eq!(read_logical(&reopened, &fixture), before);
drop(reopened);
let _ = std::fs::remove_dir_all(&path);
}
}
#[test]
fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
let manual_path = temp_store("auto-manual");
let mut manual = Database::create(&manual_path).expect("create manual");
manual.set_checkpoint_policy(CheckpointPolicy::Manual);
let _fixture = build_fixture(&mut manual);
for _ in 0..200 {
let mut writer = manual.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert_eq!(
manual.live_generation(),
CheckpointGeneration::new(0),
"manual policy must never auto-fold",
);
drop(manual);
let _ = std::fs::remove_dir_all(&manual_path);
let auto_path = temp_store("auto-ratio");
let mut auto = Database::create(&auto_path).expect("create auto");
auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
let fixture = build_fixture(&mut auto);
let before = read_logical(&auto, &fixture);
for _ in 0..400 {
let mut writer = auto.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert!(
auto.live_generation() > CheckpointGeneration::new(0),
"size-ratio policy must auto-fold once the log outgrows the base",
);
let after = read_logical(&auto, &fixture);
assert_eq!(after.rank_eq_500, before.rank_eq_500);
assert_eq!(after.person_members, before.person_members);
assert_no_id_reuse_across_fold(&mut auto);
assert_eq!(
auto.checkpoint_policy(),
CheckpointPolicy::SizeRatio { factor: 1 },
"the auto-fold reopen must preserve the configured policy",
);
let status = auto.status();
assert_eq!(status.live_generation, auto.live_generation());
assert!(status.base_byte_size > 0, "live base has bytes");
drop(auto);
let _ = std::fs::remove_dir_all(&auto_path);
}
}