use std::{
borrow::Cow,
collections::BTreeSet,
path::{Path, PathBuf},
sync::Arc,
};
use oxgraph_algo::{
PageRankConfig, PageRankError, PageRankWorkspace, Uniform, longest_path_dag,
pagerank_graph_with_workspace,
};
use oxgraph_graph::{CanonicalElementIdentity, ElementIndex, LocalElementIdentity};
use crate::{
Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, Element, ElementId,
GraphProjectionDefinition, GraphProjectionSpec, IncidenceId, IncidenceRecord, IndexId, LabelId,
PreparedQuery, ProjectionDefinition, ProjectionId, Properties, PropertyKeyId, PropertySubject,
PropertyType, PropertyValue, QueryResult, Relation, RelationId, RelationTypeId, RoleId, Schema,
TransactionId,
backing::Base,
catalog::{IndexDefinition, PropertyFamily},
freeze::{self, FreezeStamps},
lock::WriterLock,
overlay::{Overlay, Snapshot, StateView, WriteOverlay},
projection::{self, GraphProjection, HypergraphProjection, ProjectionElementId},
state::NextIds,
storage,
traversal::{self, Direction, Subgraph, Walk},
typed::{Assignable, EqualityIndex, Key, ValueType},
wal,
wire::SuperblockRecord,
};
#[derive(Clone, Copy, Debug)]
pub enum Match<'value> {
All,
Equal(&'value PropertyValue),
Range {
min: &'value PropertyValue,
max: &'value PropertyValue,
},
Composite(&'value [PropertyValue]),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CheckpointPolicy {
Manual,
SizeRatio {
factor: u32,
},
}
impl CheckpointPolicy {
pub const DEFAULT_FACTOR: u32 = 4;
const MIN_BASE_BYTES: u64 = 4 * 1024;
#[must_use]
const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
match self {
Self::Manual => false,
Self::SizeRatio { factor } => {
let floor = if base_bytes < Self::MIN_BASE_BYTES {
Self::MIN_BASE_BYTES
} else {
base_bytes
};
log_bytes > floor.saturating_mul(factor as u64)
}
}
}
}
impl Default for CheckpointPolicy {
fn default() -> Self {
Self::SizeRatio {
factor: Self::DEFAULT_FACTOR,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum CommitOutcome {
Empty,
Committed(CommitSeq),
}
fn base_file(generation: u64) -> String {
format!("base-{generation}.oxgdb")
}
fn delta_file(generation: u64) -> String {
format!("delta-{generation}.log")
}
pub struct Db {
root: PathBuf,
current: Arc<Snapshot>,
base_generation: u64,
last_transaction_id: TransactionId,
checkpoint_policy: CheckpointPolicy,
}
impl Db {
pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
if root.join(wal::SUPERBLOCK_FILE).exists() {
return Err(DbError::AlreadyExists);
}
let empty_base = crate::overlay::BaseRecords::empty();
let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq: 0,
transaction_id: 0,
generation: 0,
},
)?;
storage::atomic_write(
&root,
&root.join(format!("{}.tmp", base_file(0))),
&root.join(base_file(0)),
&base_bytes,
)?;
create_empty_log(&root, 0)?;
write_superblock(&root, 0, 0, 0, 0)?;
Self::open(&root)
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
let superblock = wal::read_superblock(&root)?;
let generation = superblock.base_generation.get();
let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
let base_header = *base.get().header();
let base_catalog = base.get().catalog().clone();
let base_next = NextIds::from_header(&base_header);
let log_path = root.join(delta_file(generation));
let log_bytes = read_log(&log_path)?;
let outcome = wal::replay(generation, &log_bytes)?;
if outcome.valid_len < log_bytes.len() {
truncate_log(&log_path, outcome.valid_len)?;
}
let mut write = WriteOverlay::new(base_next, base_catalog);
let mut recovered_next = base_next;
let mut last_commit_seq = superblock.commit_seq.get();
let mut last_txn = superblock.transaction_id.get();
for frame in &outcome.frames {
for op in &frame.ops {
write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
}
recovered_next = recovered_next.elementwise_max(write.next_ids());
last_commit_seq = frame.lsn;
last_txn = last_txn.max(frame.txn_id);
}
write.set_next_ids(recovered_next);
let overlay = Arc::new(write.freeze());
let snapshot = Arc::new(Snapshot::with_shared_base_records(
CheckpointGeneration::new(generation),
CommitSeq::new(last_commit_seq),
base,
overlay,
base_records,
));
Ok(Self {
root,
current: snapshot,
base_generation: generation,
last_transaction_id: TransactionId::new(last_txn),
checkpoint_policy: CheckpointPolicy::default(),
})
}
#[must_use]
pub const fn live_generation(&self) -> CheckpointGeneration {
CheckpointGeneration::new(self.base_generation)
}
#[must_use]
pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
self.checkpoint_policy
}
pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
self.checkpoint_policy = policy;
}
pub fn validate(&self) -> Result<(), DbError> {
wal::read_superblock(&self.root)?;
Base::open(&self.root.join(base_file(self.base_generation)), false).map(|_base| ())
}
pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
Self::open(path).map(|_database| ())
}
pub fn compact(&mut self) -> Result<(), DbError> {
self.checkpoint()
}
pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
self.checkpoint_inner(
#[cfg(test)]
CheckpointStop::Complete,
)
}
fn checkpoint_inner(&mut self, #[cfg(test)] stop: CheckpointStop) -> Result<(), DbError> {
let _lock = WriterLock::acquire(&self.root)?;
let next_generation = self
.base_generation
.checked_add(1)
.ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
let view = self.current.view();
let commit_seq = self.current.lsn().get();
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq,
transaction_id: self.last_transaction_id.get(),
generation: next_generation,
},
)?;
storage::atomic_write(
&self.root,
&self
.root
.join(format!("{}.tmp", base_file(next_generation))),
&self.root.join(base_file(next_generation)),
&base_bytes,
)?;
create_empty_log(&self.root, next_generation)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeSuperblock) {
return Ok(());
}
write_superblock(
&self.root,
next_generation,
commit_seq,
commit_seq,
self.last_transaction_id.get(),
)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeRotate) {
return Ok(());
}
let reopened = Self::open(&self.root)?;
let old_generation = self.base_generation;
let policy = self.checkpoint_policy;
self.current = reopened.current;
self.base_generation = reopened.base_generation;
self.last_transaction_id = reopened.last_transaction_id;
self.checkpoint_policy = policy;
let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
let _ = storage::sync_directory(&self.root);
Ok(())
}
fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
if self
.checkpoint_policy
.should_checkpoint(log_bytes, base_bytes)
{
self.checkpoint()?;
}
Ok(())
}
#[must_use]
pub fn stats(&self) -> Stats {
let view = self.current.view();
Stats {
visible_commit_seq: self.current.lsn(),
last_transaction_id: self.last_transaction_id,
live_generation: CheckpointGeneration::new(self.base_generation),
base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
element_count: view.element_count(),
relation_count: view.relation_count(),
incidence_count: view.incidence_count(),
catalog: self.catalog_summary(),
}
}
#[must_use]
pub fn catalog_summary(&self) -> CatalogSummary {
CatalogSummary::from_catalog(self.current.view().catalog())
}
#[must_use]
pub fn reader(&self) -> Reader {
Reader {
snapshot: Arc::clone(&self.current),
}
}
pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
let lock = WriterLock::acquire(&self.root)?;
let transaction_id = self
.last_transaction_id
.checked_next()
.ok_or(DbError::TransactionIdOverflow)?;
self.last_transaction_id = transaction_id;
let parent = Arc::clone(&self.current);
let delta = WriteOverlay::from_overlay(parent.overlay());
Ok(Writer {
database: self,
parent,
delta,
transaction_id,
lock,
})
}
pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
f(&self.reader())
}
pub fn write<R>(
&mut self,
f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
) -> Result<(R, CommitOutcome), DbError> {
let mut writer = self.begin_write()?;
let value = f(&mut writer)?;
let committed = !writer.delta.is_empty();
let lsn = writer.commit()?;
let outcome = if committed {
CommitOutcome::Committed(lsn)
} else {
CommitOutcome::Empty
};
Ok((value, outcome))
}
pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
let view = self.current.view();
let catalog = view.catalog();
let mut bound = Bound::default();
for name in &schema.roles {
let id = catalog.role_id(name).ok_or_else(|| DbError::UnknownName {
kind: "role",
name: name.clone(),
})?;
bound.roles.insert(name.clone(), id);
}
for name in &schema.labels {
let id = catalog.label_id(name).ok_or_else(|| DbError::UnknownName {
kind: "label",
name: name.clone(),
})?;
bound.labels.insert(name.clone(), id);
}
for name in &schema.relation_types {
let id = catalog
.relation_type_id(name)
.ok_or_else(|| DbError::UnknownName {
kind: "relation type",
name: name.clone(),
})?;
bound.relation_types.insert(name.clone(), id);
}
for (name, _family, value_type) in &schema.keys {
let id = catalog
.property_key_id(name)
.ok_or_else(|| DbError::UnknownName {
kind: "property key",
name: name.clone(),
})?;
bound.keys.insert(name.clone(), (id, *value_type));
}
for (name, key_name) in &schema.equality_indexes {
let (_key_id, value_type) =
*bound
.keys
.get(key_name)
.ok_or_else(|| DbError::UnknownName {
kind: "property key",
name: key_name.clone(),
})?;
let id = catalog.index_id(name).ok_or_else(|| DbError::UnknownName {
kind: "index",
name: name.clone(),
})?;
bound
.equality_indexes
.insert(name.clone(), (id, value_type));
}
for spec in &schema.graph_projections {
let id = catalog
.projection_id(&spec.name)
.ok_or_else(|| DbError::UnknownName {
kind: "projection",
name: spec.name.clone(),
})?;
bound.projections.insert(spec.name.clone(), id);
}
Ok(bound)
}
pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
PreparedQuery::prepare(query, &self.current.view())
}
}
fn file_len(path: &Path) -> u64 {
std::fs::metadata(path).map_or(0, |meta| meta.len())
}
#[cfg(test)]
#[cfg_attr(
miri,
expect(
dead_code,
reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
)
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum CheckpointStop {
Complete,
BeforeSuperblock,
BeforeRotate,
}
fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
match std::fs::read(path) {
Ok(bytes) => Ok(bytes),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
Err(error) => Err(DbError::io("read delta-log", error)),
}
}
fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
let file = std::fs::OpenOptions::new()
.write(true)
.open(path)
.map_err(|error| DbError::io("open delta-log for truncate", error))?;
let len = u64::try_from(len)
.map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
file.set_len(len)
.map_err(|error| DbError::io("truncate delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync truncated delta-log", error))
}
fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
let path = root.join(delta_file(generation));
let file =
std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync delta-log", error))?;
storage::sync_directory(root)
}
fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.append(true)
.open(root.join(delta_file(generation)))
.map_err(|error| DbError::io("open delta-log for append", error))
}
fn write_superblock(
root: &Path,
generation: u64,
checkpoint_lsn: u64,
commit_seq: u64,
transaction_id: u64,
) -> Result<(), DbError> {
wal::write_superblock(
root,
&SuperblockRecord {
magic: crate::wire::SUPERBLOCK_MAGIC,
base_generation: generation.into(),
checkpoint_lsn: checkpoint_lsn.into(),
log_byte_offset: 0u64.into(),
commit_seq: commit_seq.into(),
transaction_id: transaction_id.into(),
format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
},
)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Stats {
pub visible_commit_seq: CommitSeq,
pub last_transaction_id: TransactionId,
pub live_generation: CheckpointGeneration,
pub base_byte_size: u64,
pub log_byte_size: u64,
pub element_count: usize,
pub relation_count: usize,
pub incidence_count: usize,
pub catalog: CatalogSummary,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CatalogSummary {
pub role_count: usize,
pub label_count: usize,
pub relation_type_count: usize,
pub property_key_count: usize,
pub projection_count: usize,
pub index_count: usize,
}
impl CatalogSummary {
#[must_use]
pub fn from_catalog(catalog: &Catalog) -> Self {
Self {
role_count: catalog.roles().count(),
label_count: catalog.labels().count(),
relation_type_count: catalog.relation_types().count(),
property_key_count: catalog.property_keys().count(),
projection_count: catalog.projections().count(),
index_count: catalog.indexes().count(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ReadPin {
pub visible_commit_seq: CommitSeq,
pub generation: CheckpointGeneration,
}
pub struct Reader {
snapshot: Arc<Snapshot>,
}
const fn follow_direction(direction: Direction, from: IncidenceId, to: IncidenceId) -> bool {
match direction {
Direction::Outgoing => from.get() < to.get(),
Direction::Incoming => from.get() > to.get(),
Direction::Both => true,
}
}
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<Reader>();
const _: () = assert_send_sync::<Arc<Snapshot>>();
impl Reader {
#[must_use]
pub fn pin(&self) -> ReadPin {
ReadPin {
visible_commit_seq: self.snapshot.lsn(),
generation: self.snapshot.generation(),
}
}
#[must_use]
pub fn catalog(&self) -> &Catalog {
self.snapshot.view().catalog_ref()
}
#[must_use]
pub fn element_count(&self) -> usize {
self.snapshot.view().element_count()
}
#[must_use]
pub fn relation_count(&self) -> usize {
self.snapshot.view().relation_count()
}
#[must_use]
pub fn incidence_count(&self) -> usize {
self.snapshot.view().incidence_count()
}
#[must_use]
pub fn element_ids(&self) -> Vec<ElementId> {
self.snapshot
.view()
.elements()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn relation_ids(&self) -> Vec<RelationId> {
self.snapshot
.view()
.relations()
.map(|record| record.id)
.collect()
}
#[must_use]
pub fn contains_element(&self, id: ElementId) -> bool {
self.snapshot.view().contains_element(id)
}
#[must_use]
pub fn contains_relation(&self, id: RelationId) -> bool {
self.snapshot.view().contains_relation(id)
}
#[must_use]
pub fn contains_incidence(&self, id: IncidenceId) -> bool {
self.snapshot.view().contains_incidence(id)
}
#[must_use]
pub fn element(&self, id: ElementId) -> Option<Element> {
let view = self.snapshot.view();
let record = view.element_ref(id)?;
let labels = record.labels.iter().copied().collect();
let properties =
Properties::from_pairs(view.subject_properties(PropertySubject::Element(id)));
Some(Element::new(id, labels, properties))
}
#[must_use]
pub fn relation(&self, id: RelationId) -> Option<Relation> {
let view = self.snapshot.view();
let record = view.relation_ref(id)?;
let labels = record.labels.iter().copied().collect();
let properties =
Properties::from_pairs(view.subject_properties(PropertySubject::Relation(id)));
Some(Relation::new(id, record.relation_type, labels, properties))
}
#[must_use]
pub fn incidence(&self, id: IncidenceId) -> Option<IncidenceRecord> {
self.snapshot.view().incidence_ref(id).map(Cow::into_owned)
}
#[must_use]
pub fn element_incidences(&self, id: ElementId) -> Vec<IncidenceRecord> {
self.snapshot.view().element_incidences(id)
}
#[must_use]
pub fn endpoints(&self, relation: RelationId) -> Option<(ElementId, ElementId)> {
let incidences = self.snapshot.view().relation_incidences(relation);
match incidences.as_slice() {
[first, second, ..] => Some((first.element, second.element)),
_too_few => None,
}
}
#[must_use]
pub fn neighbors(
&self,
element: ElementId,
relation_type: RelationTypeId,
direction: Direction,
) -> Vec<ElementId> {
let view = self.snapshot.view();
let mut neighbors = BTreeSet::new();
for incidence in view.element_incidences(element) {
let matches_type = view
.relation_ref(incidence.relation)
.is_some_and(|record| record.relation_type == Some(relation_type));
if !matches_type {
continue;
}
neighbors.extend(
view.relation_incidences(incidence.relation)
.into_iter()
.filter(|other| other.element != element)
.filter(|other| follow_direction(direction, incidence.id, other.id))
.map(|other| other.element),
);
}
neighbors.into_iter().collect()
}
#[must_use]
pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<PropertyValue> {
self.snapshot
.view()
.property_ref(subject, key)
.map(Cow::into_owned)
}
pub fn element_by_key<T: ValueType>(
&self,
index: EqualityIndex<T>,
value: impl Assignable<T>,
) -> Result<Option<Element>, DbError> {
let value = value.into_value()?;
let matched = self
.lookup(index.id(), Match::Equal(&value))?
.into_iter()
.find_map(|subject| match subject {
PropertySubject::Element(id) => Some(id),
PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
});
Ok(matched.and_then(|id| self.element(id)))
}
pub fn count(&self, index: IndexId) -> Result<usize, DbError> {
self.lookup(index, Match::All)
.map(|subjects| subjects.len())
}
pub fn lookup_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_equal(key, value)
}
pub fn lookup_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.snapshot.view().typed_property_range(key, min, max)
}
pub fn lookup(
&self,
index: IndexId,
lookup: Match<'_>,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.index(index)
.ok_or(DbError::UnknownIndex { id: index })?;
match (&entry.definition, lookup) {
(IndexDefinition::Label { label }, Match::All) => Ok(view
.elements_with_label(*label)
.into_iter()
.map(PropertySubject::Element)
.collect()),
(IndexDefinition::Label { .. }, _lookup) => {
Err(DbError::unsupported("label index expects all lookup"))
}
(IndexDefinition::RelationType { relation_type }, Match::All) => Ok(view
.relations_with_type(*relation_type)
.into_iter()
.map(PropertySubject::Relation)
.collect()),
(IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
"relation type index expects all lookup",
)),
(IndexDefinition::PropertyEquality { key }, Match::Equal(value)) => {
view.typed_property_equal(*key, value)
}
(IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
"property equality index expects equality lookup",
)),
(IndexDefinition::PropertyRange { key }, Match::Range { min, max }) => {
view.typed_property_range(*key, min, max)
}
(IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
"property range index expects range lookup",
)),
(IndexDefinition::CompositeEquality { keys }, Match::Composite(values)) => {
view.typed_property_composite_equal(keys, values)
}
(IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
"composite equality index expects composite equality lookup",
)),
(IndexDefinition::Projection { projection }, Match::All) => {
self.projection_index_subjects(*projection)
}
(IndexDefinition::Projection { .. }, _lookup) => {
Err(DbError::unsupported("projection index expects all lookup"))
}
}
}
pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
projection::GraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Hypergraph(_definition) => {
Err(DbError::invalid_projection("projection is not a graph"))
}
}
}
pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
let id = self
.snapshot
.view()
.catalog()
.projection_id(name)
.ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
self.graph_projection(id)
}
pub fn walk(
&self,
projection: ProjectionId,
seeds: &[ElementId],
walk: Walk,
) -> Result<Subgraph, DbError> {
if seeds.is_empty() || walk.limit == 0 {
return Ok(Subgraph::default());
}
let graph = self.graph_projection(projection)?;
traversal::walk_graph_projection(&graph, seeds, walk)
}
pub fn personalized_pagerank(
&self,
projection: ProjectionId,
seeds: &[ElementId],
config: PageRankConfig<f64>,
) -> Result<Vec<(ElementId, f64)>, DbError> {
let graph = self.graph_projection(projection)?;
let bound = graph.element_bound();
let element_count = u32::try_from(bound).map_err(|_| {
DbError::traversal("projection exceeds the personalized pagerank index bound")
})?;
let mut personalization = vec![0.0_f64; bound];
let mut seeded = false;
for &seed in seeds {
if let Some(local) = graph.local_element_id(seed) {
personalization[graph.element_index(local)] = 1.0;
seeded = true;
}
}
let mut ranks = vec![0.0_f64; bound];
let mut workspace = PageRankWorkspace::for_graph(&graph);
pagerank_graph_with_workspace(
&graph,
&Uniform,
(0..element_count).map(ProjectionElementId::new),
config,
seeded.then_some(personalization.as_slice()),
&mut ranks,
&mut workspace,
)
.map_err(|error| {
DbError::traversal(match error {
PageRankError::InvalidDamping { .. }
| PageRankError::InvalidTolerance { .. }
| PageRankError::InvalidMaxIterations => "invalid pagerank configuration",
PageRankError::NonConverged { .. } => "personalized pagerank did not converge",
_ => "personalized pagerank failed",
})
})?;
let mut ranked: Vec<(ElementId, f64)> = (0..element_count)
.map(|index| {
let local = ProjectionElementId::new(index);
(
graph.canonical_element_id(local),
ranks[graph.element_index(local)],
)
})
.collect();
ranked.sort_by(|left, right| right.1.total_cmp(&left.1));
Ok(ranked)
}
pub fn longest_path(
&self,
projection: ProjectionId,
elements: &[ElementId],
) -> Result<Vec<ElementId>, DbError> {
if elements.is_empty() {
return Ok(Vec::new());
}
let graph = self.graph_projection(projection)?;
let locals = elements
.iter()
.filter_map(|&element| graph.local_element_id(element))
.collect::<Vec<ProjectionElementId>>();
let path = longest_path_dag(&graph, &locals)
.map_err(|_| DbError::traversal("longest path found a cycle"))?;
Ok(path
.into_iter()
.map(|local| graph.canonical_element_id(local))
.collect())
}
pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Hypergraph(definition) => {
projection::HypergraphProjection::from_state(&view, definition.clone())
}
ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
"projection is not a hypergraph",
)),
}
}
pub fn run(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
query.execute(&self.snapshot.view())
}
#[must_use]
pub fn explain(&self, query: &PreparedQuery) -> String {
query.explain()
}
fn projection_index_subjects(
&self,
projection: ProjectionId,
) -> Result<Vec<PropertySubject>, DbError> {
let view = self.snapshot.view();
let entry = view
.catalog()
.projection(projection)
.ok_or(DbError::UnknownProjection { id: projection })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
Ok(projection::GraphProjection::from_state(&view, definition.clone())?.subjects())
}
ProjectionDefinition::Hypergraph(definition) => Ok(
projection::HypergraphProjection::from_state(&view, definition.clone())?.subjects(),
),
}
}
}
pub struct Writer<'db> {
database: &'db mut Db,
parent: Arc<Snapshot>,
delta: WriteOverlay,
transaction_id: TransactionId,
lock: WriterLock,
}
impl Writer<'_> {
pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
self.delta.register_role(name.into())
}
pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
self.delta.register_label(name.into())
}
pub fn register_relation_type(
&mut self,
name: impl Into<String>,
) -> Result<RelationTypeId, DbError> {
self.delta.register_relation_type(name.into())
}
pub fn register_property_key(
&mut self,
name: impl Into<String>,
family: PropertyFamily,
value_type: PropertyType,
) -> Result<PropertyKeyId, DbError> {
self.delta
.register_property_key(name.into(), family, value_type)
}
pub fn define_projection(
&mut self,
definition: ProjectionDefinition,
) -> Result<ProjectionId, DbError> {
self.validate_projection_definition(&definition)?;
self.delta.register_projection(definition)
}
pub fn define_index(
&mut self,
name: impl Into<String>,
definition: IndexDefinition,
) -> Result<IndexId, DbError> {
self.validate_index_definition(&definition)?;
self.delta.register_index(name.into(), definition)
}
pub fn apply_schema(&mut self, schema: &Schema) -> Result<Bound, DbError> {
let mut bound = Bound::default();
for name in &schema.roles {
let id = match self.merged().catalog().role_id(name) {
Some(id) => id,
None => self.register_role(name.clone())?,
};
bound.roles.insert(name.clone(), id);
}
for name in &schema.labels {
let id = match self.merged().catalog().label_id(name) {
Some(id) => id,
None => self.register_label(name.clone())?,
};
bound.labels.insert(name.clone(), id);
}
for name in &schema.relation_types {
let id = match self.merged().catalog().relation_type_id(name) {
Some(id) => id,
None => self.register_relation_type(name.clone())?,
};
bound.relation_types.insert(name.clone(), id);
}
for (name, family, value_type) in &schema.keys {
let id = self.register_key_or_get(name, *family, *value_type)?;
bound.keys.insert(name.clone(), (id, *value_type));
}
for (name, key_name) in &schema.equality_indexes {
let (key_id, value_type) =
*bound
.keys
.get(key_name)
.ok_or_else(|| DbError::UnknownName {
kind: "property key",
name: key_name.clone(),
})?;
let id = match self.merged().catalog().index_id(name) {
Some(id) => id,
None => self.define_index(
name.clone(),
IndexDefinition::PropertyEquality { key: key_id },
)?,
};
bound
.equality_indexes
.insert(name.clone(), (id, value_type));
}
for spec in &schema.graph_projections {
let id = match self.merged().catalog().projection_id(&spec.name) {
Some(id) => id,
None => self.define_graph_projection(spec, &bound)?,
};
bound.projections.insert(spec.name.clone(), id);
}
Ok(bound)
}
fn register_key_or_get(
&mut self,
name: &str,
family: PropertyFamily,
value_type: PropertyType,
) -> Result<PropertyKeyId, DbError> {
let Some(existing) = self.merged().catalog().property_key_id(name) else {
return self.register_property_key(name.to_owned(), family, value_type);
};
let matches = self
.merged()
.catalog()
.property_key(existing)
.is_some_and(|def| def.family == family && def.value_type == value_type);
if matches {
Ok(existing)
} else {
Err(DbError::SchemaConflict {
name: name.to_owned(),
reason: "property key family/value type differs from the existing catalog entry",
})
}
}
fn define_graph_projection(
&mut self,
spec: &GraphProjectionSpec,
bound: &Bound,
) -> Result<ProjectionId, DbError> {
let mut relation_types = BTreeSet::new();
for name in &spec.relation_types {
relation_types.insert(bound.relation_type(name)?);
}
let source_role = bound.role(&spec.source_role)?;
let target_role = bound.role(&spec.target_role)?;
self.define_projection(ProjectionDefinition::Graph(GraphProjectionDefinition {
name: spec.name.clone(),
relation_types,
source_role,
target_role,
}))
}
pub fn create_element(&mut self) -> Result<ElementId, DbError> {
self.delta.create_element()
}
pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
self.delta.create_relation()
}
pub fn create_incidence(
&mut self,
relation: RelationId,
element: ElementId,
role: RoleId,
) -> Result<IncidenceId, DbError> {
self.require_relation(relation)?;
self.require_element(element)?;
self.require_role(role)?;
self.delta.create_incidence(relation, element, role)
}
pub(crate) fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
self.require_element(id)?;
let incidences: Vec<IncidenceId> = self
.merged()
.element_incidences(id)
.into_iter()
.map(|record| record.id)
.collect();
let base = self.parent.base_records();
self.delta.tombstone_element(base, id);
for incidence in incidences {
self.delta
.tombstone_incidence(self.parent.base_records(), incidence);
}
Ok(())
}
pub(crate) fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
self.require_relation(id)?;
let incidences: Vec<IncidenceId> = self
.merged()
.relation_incidences(id)
.into_iter()
.map(|record| record.id)
.collect();
let base = self.parent.base_records();
self.delta.tombstone_relation(base, id);
for incidence in incidences {
self.delta
.tombstone_incidence(self.parent.base_records(), incidence);
}
Ok(())
}
pub(crate) fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
self.require_incidence(id)?;
self.delta
.tombstone_incidence(self.parent.base_records(), id);
Ok(())
}
pub(crate) fn add_element_label(
&mut self,
element: ElementId,
label: LabelId,
) -> Result<(), DbError> {
self.require_element(element)?;
self.require_label(label)?;
self.delta
.add_element_label(self.parent.base_records(), element, label);
Ok(())
}
pub(crate) fn add_relation_label(
&mut self,
relation: RelationId,
label: LabelId,
) -> Result<(), DbError> {
self.require_relation(relation)?;
self.require_label(label)?;
self.delta
.add_relation_label(self.parent.base_records(), relation, label);
Ok(())
}
pub fn set_relation_type(
&mut self,
relation: RelationId,
relation_type: RelationTypeId,
) -> Result<(), DbError> {
self.require_relation(relation)?;
self.require_relation_type(relation_type)?;
self.delta
.set_relation_type(self.parent.base_records(), relation, relation_type);
Ok(())
}
pub(crate) fn set_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
value: PropertyValue,
) -> Result<(), DbError> {
self.require_subject(subject)?;
let definition = self
.merged()
.catalog()
.property_key(key)
.cloned()
.ok_or(DbError::UnknownPropertyKey { id: key })?;
if definition.family != subject.family() {
return Err(DbError::WrongPropertyFamily {
expected: definition.family,
actual: subject.family(),
});
}
if definition.value_type != value.value_type() {
return Err(DbError::PropertyTypeMismatch {
expected: definition.value_type,
actual: value.value_type(),
});
}
self.delta
.set_property(self.parent.base_records(), subject, key, value);
Ok(())
}
pub(crate) fn remove_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Result<(), DbError> {
self.require_subject(subject)?;
if self.merged().catalog().property_key(key).is_none() {
return Err(DbError::UnknownPropertyKey { id: key });
}
self.delta
.remove_property(self.parent.base_records(), subject, key);
Ok(())
}
fn equality_index_key(&self, index: IndexId) -> Result<PropertyKeyId, DbError> {
let view = self.merged();
let entry = view
.catalog()
.index(index)
.ok_or(DbError::UnknownIndex { id: index })?;
match &entry.definition {
IndexDefinition::PropertyEquality { key } => Ok(*key),
_other => Err(DbError::unsupported(
"reconcile requires a property-equality index",
)),
}
}
pub fn upsert_element<T: ValueType>(
&mut self,
index: EqualityIndex<T>,
value: impl Assignable<T>,
) -> Result<ElementId, DbError> {
let value = value.into_value()?;
let key = self.equality_index_key(index.id())?;
let existing = self
.merged()
.property_equal(key, &value)
.into_iter()
.find_map(|subject| match subject {
PropertySubject::Element(id) => Some(id),
PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
});
if let Some(id) = existing {
return Ok(id);
}
let element = self.create_element()?;
self.set_property(PropertySubject::Element(element), key, value)?;
Ok(element)
}
pub fn upsert_relation<T: ValueType>(
&mut self,
index: EqualityIndex<T>,
value: impl Assignable<T>,
relation_type: RelationTypeId,
endpoints: &[(ElementId, RoleId)],
) -> Result<RelationId, DbError> {
let value = value.into_value()?;
let key = self.equality_index_key(index.id())?;
let existing = self
.merged()
.property_equal(key, &value)
.into_iter()
.find_map(|subject| match subject {
PropertySubject::Relation(id) => Some(id),
PropertySubject::Element(_) | PropertySubject::Incidence(_) => None,
});
if let Some(id) = existing {
return Ok(id);
}
let relation = self.create_relation()?;
self.set_relation_type(relation, relation_type)?;
self.set_property(PropertySubject::Relation(relation), key, value)?;
for (element, role) in endpoints {
self.create_incidence(relation, *element, *role)?;
}
Ok(relation)
}
pub fn retain<T: ValueType, V: Assignable<T> + Copy>(
&mut self,
index: EqualityIndex<T>,
keep: &[V],
) -> Result<(), DbError> {
let key = self.equality_index_key(index.id())?;
let mut keep_values: BTreeSet<PropertyValue> = BTreeSet::new();
for value in keep {
keep_values.insert((*value).into_value()?);
}
let stale: Vec<PropertySubject> = self
.merged()
.property_key_subjects(key)
.into_iter()
.filter(|(_subject, value)| !keep_values.contains(value))
.map(|(subject, _value)| subject)
.collect();
for subject in stale {
match subject {
PropertySubject::Element(id) => self.tombstone_element(id)?,
PropertySubject::Relation(id) => self.tombstone_relation(id)?,
PropertySubject::Incidence(id) => self.tombstone_incidence(id)?,
}
}
Ok(())
}
pub fn set<T: ValueType>(
&mut self,
subject: impl Into<PropertySubject>,
key: Key<T>,
value: impl Assignable<T>,
) -> Result<(), DbError> {
self.set_property(subject.into(), key.id(), value.into_value()?)
}
pub fn unset<T: ValueType>(
&mut self,
subject: impl Into<PropertySubject>,
key: Key<T>,
) -> Result<(), DbError> {
self.remove_property(subject.into(), key.id())
}
pub fn add_label(
&mut self,
subject: impl Into<PropertySubject>,
label: LabelId,
) -> Result<(), DbError> {
match subject.into() {
PropertySubject::Element(id) => self.add_element_label(id, label),
PropertySubject::Relation(id) => self.add_relation_label(id, label),
PropertySubject::Incidence(_) => {
Err(DbError::unsupported("incidences do not carry labels"))
}
}
}
pub fn tombstone(&mut self, subject: impl Into<PropertySubject>) -> Result<(), DbError> {
match subject.into() {
PropertySubject::Element(id) => self.tombstone_element(id),
PropertySubject::Relation(id) => self.tombstone_relation(id),
PropertySubject::Incidence(id) => self.tombstone_incidence(id),
}
}
pub(crate) fn commit(self) -> Result<CommitSeq, DbError> {
if self.delta.is_empty() {
return Ok(self.parent.lsn());
}
let lsn = self
.parent
.lsn()
.checked_next()
.ok_or(DbError::CommitSeqOverflow)?;
let (ops, blob) = self.delta.encode_frame();
let frame = wal::encode_commit(
lsn.get(),
self.transaction_id.get(),
self.database.base_generation,
&ops,
&blob,
)?;
let mut log = open_log_for_append(&self.database.root, self.database.base_generation)?;
wal::append_commit(&mut log, &frame)?;
let new_overlay = Arc::new(self.delta.freeze());
let snapshot = Snapshot::with_shared_base_records(
self.parent.generation(),
lsn,
Arc::clone(self.parent.base()),
new_overlay,
Arc::clone(self.parent.base_records()),
);
self.database.current = Arc::new(snapshot);
self.database.last_transaction_id = self.transaction_id;
drop(self.lock);
self.database.maybe_auto_checkpoint()?;
Ok(lsn)
}
fn merged(&self) -> crate::overlay::WriteMergedState<'_> {
crate::overlay::WriteMergedState::new(self.parent.base_records(), &self.delta)
}
fn require_element(&self, id: ElementId) -> Result<(), DbError> {
if self.merged().contains_element(id) {
Ok(())
} else {
Err(DbError::UnknownElement { id })
}
}
fn require_relation(&self, id: RelationId) -> Result<(), DbError> {
if self.merged().contains_relation(id) {
Ok(())
} else {
Err(DbError::UnknownRelation { id })
}
}
fn require_incidence(&self, id: IncidenceId) -> Result<(), DbError> {
if self.merged().contains_incidence(id) {
Ok(())
} else {
Err(DbError::UnknownIncidence { id })
}
}
fn require_role(&self, id: RoleId) -> Result<(), DbError> {
if self.delta.catalog().role(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownRole { id })
}
}
fn require_label(&self, id: LabelId) -> Result<(), DbError> {
if self.delta.catalog().label(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownLabel { id })
}
}
fn require_relation_type(&self, id: RelationTypeId) -> Result<(), DbError> {
if self.delta.catalog().relation_type(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownRelationType { id })
}
}
fn require_subject(&self, subject: PropertySubject) -> Result<(), DbError> {
match subject {
PropertySubject::Element(id) => self.require_element(id),
PropertySubject::Relation(id) => self.require_relation(id),
PropertySubject::Incidence(id) => self.require_incidence(id),
}
}
fn validate_projection_definition(
&self,
definition: &ProjectionDefinition,
) -> Result<(), DbError> {
match definition {
ProjectionDefinition::Graph(graph) => {
self.require_role(graph.source_role)?;
self.require_role(graph.target_role)?;
for relation_type in &graph.relation_types {
self.require_relation_type(*relation_type)?;
}
Ok(())
}
ProjectionDefinition::Hypergraph(hyper) => {
for role in &hyper.source_roles {
self.require_role(*role)?;
}
for role in &hyper.target_roles {
self.require_role(*role)?;
}
for relation_type in &hyper.relation_types {
self.require_relation_type(*relation_type)?;
}
Ok(())
}
}
}
fn validate_index_definition(&self, definition: &IndexDefinition) -> Result<(), DbError> {
let catalog = self.delta.catalog();
match definition {
IndexDefinition::Label { label } => self.require_label(*label),
IndexDefinition::RelationType { relation_type } => {
self.require_relation_type(*relation_type)
}
IndexDefinition::PropertyEquality { key } | IndexDefinition::PropertyRange { key } => {
self.require_property_key(*key)
}
IndexDefinition::CompositeEquality { keys } => {
if keys.is_empty() {
return Err(DbError::unsupported(
"composite equality index requires at least one key",
));
}
for key in keys {
self.require_property_key(*key)?;
}
Ok(())
}
IndexDefinition::Projection { projection } => catalog
.projection(*projection)
.is_some()
.then_some(())
.ok_or(DbError::UnknownProjection { id: *projection }),
}
}
fn require_property_key(&self, id: PropertyKeyId) -> Result<(), DbError> {
if self.delta.catalog().property_key(id).is_some() {
Ok(())
} else {
Err(DbError::UnknownPropertyKey { id })
}
}
}
#[cfg(test)]
#[cfg(not(miri))]
mod tests {
use std::{
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use super::*;
static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
fn temp_store(name: &str) -> PathBuf {
let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
let _ = std::fs::remove_dir_all(&path);
path
}
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_ELEMENTS: usize = 100_000;
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_RELATIONS: usize = 320_000;
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_RUNS: u32 = 5;
#[cfg(not(debug_assertions))]
fn populate_large_store(database: &mut Db) {
database.set_checkpoint_policy(CheckpointPolicy::Manual);
database
.write(|writer| {
let rank = writer.register_property_key(
"rank",
PropertyFamily::Element,
PropertyType::Integer,
)?;
let weight = writer.register_property_key(
"weight",
PropertyFamily::Relation,
PropertyType::Integer,
)?;
let role = writer.register_role("party")?;
let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
for index in 0..OPEN_LATENCY_ELEMENTS {
let element = writer.create_element()?;
writer.set_property(
PropertySubject::Element(element),
rank,
PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
)?;
elements.push(element);
}
for index in 0..OPEN_LATENCY_RELATIONS {
let relation = writer.create_relation()?;
writer.set_property(
PropertySubject::Relation(relation),
weight,
PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
)?;
let source = elements[index % OPEN_LATENCY_ELEMENTS];
let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
writer.create_incidence(relation, source, role)?;
writer.create_incidence(relation, target, role)?;
}
Ok(())
})
.expect("populate");
database.compact().expect("compact");
}
#[cfg(not(debug_assertions))]
fn mean_open_ms(path: &std::path::Path) -> f64 {
let mut total = std::time::Duration::ZERO;
for _run in 0..OPEN_LATENCY_RUNS {
let start = std::time::Instant::now();
let opened = Db::open(path).expect("timed open");
total += start.elapsed();
drop(opened);
}
total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
}
#[cfg(not(debug_assertions))]
fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
let superblock = wal::read_superblock(path).expect("superblock");
let base_path = path.join(base_file(superblock.base_generation.get()));
let mut total = std::time::Duration::ZERO;
for _run in 0..OPEN_LATENCY_RUNS {
let base = Base::open(&base_path, false).expect("base open");
let start = std::time::Instant::now();
let records =
crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
total += start.elapsed();
drop(records);
drop(base);
}
total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
}
#[test]
#[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
#[cfg(not(debug_assertions))]
fn open_latency_large_base() {
let path = temp_store("open-latency");
let mut database = Db::create(&path).expect("create");
populate_large_store(&mut database);
drop(database);
let _warm = Db::open(&path).expect("warm open");
let after_ms = mean_open_ms(&path);
let before_ms = mean_old_from_view_ms(&path);
println!(
"open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
{OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
OPEN_LATENCY_RELATIONS * 2,
OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
);
println!(" BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
println!(" AFTER full Db::open (decode + BORROWED index): {after_ms:.1} ms / open");
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
let path = temp_store("reconcile");
let mut database = Db::create(&path).expect("create");
let index = {
let mut writer = database.begin_write().expect("begin write");
let key = writer
.register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
.expect("key");
let index = writer
.define_index(
"element_stable_key_eq",
IndexDefinition::PropertyEquality { key },
)
.expect("index");
writer.commit().expect("commit schema");
index
};
let eq = EqualityIndex::<crate::Text>::from_id(index);
let (a1, b1) = {
let mut writer = database.begin_write().expect("begin write");
let a = writer.upsert_element(eq, "a").expect("upsert a");
let b = writer.upsert_element(eq, "b").expect("upsert b");
writer.commit().expect("commit");
(a, b)
};
let (a2, c1) = {
let mut writer = database.begin_write().expect("begin write");
let a = writer.upsert_element(eq, "a").expect("re-upsert a");
let c = writer.upsert_element(eq, "c").expect("upsert c");
writer.retain(eq, &["a", "c"]).expect("retain");
writer.commit().expect("commit");
(a, c)
};
assert_eq!(a1, a2, "an unchanged identity reuses its element id");
assert_ne!(c1, a1);
assert_ne!(c1, b1);
let read = database.reader();
assert!(read.contains_element(a1), "kept a");
assert!(read.contains_element(c1), "kept c");
assert!(!read.contains_element(b1), "retain tombstoned b");
assert_eq!(
read.element_by_key(eq, "a")
.expect("lookup a")
.map(|element| element.id),
Some(a1)
);
assert!(
read.element_by_key(eq, "b").expect("lookup b").is_none(),
"b is not resolvable after the prune"
);
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
let path = temp_store("write-closure");
let mut database = Db::create(&path).expect("create");
let (id, outcome) = database
.write(|writer| {
let id = writer.create_element()?;
Ok(id)
})
.expect("write");
assert!(matches!(outcome, CommitOutcome::Committed(_)));
database
.read(|read| {
assert!(read.contains_element(id));
Ok(())
})
.expect("read");
let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
assert_eq!(outcome, CommitOutcome::Empty);
let before = database
.read(|read| Ok(read.element_count()))
.expect("count");
let result = database.write(|writer| {
writer.create_element()?;
Err::<(), DbError>(DbError::EmptyQuery)
});
assert!(result.is_err());
let after = database
.read(|read| Ok(read.element_count()))
.expect("count");
assert_eq!(before, after, "the failed write staged nothing durable");
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
let path = temp_store("set-noop");
let mut database = Db::create(&path).expect("create");
let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
let id = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
let id = writer.create_element()?;
writer.set(id, name, "alpha")?;
Ok(id)
})
.expect("first write")
.0;
let ((), outcome) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
writer.set(id, name, "alpha")?;
Ok(())
})
.expect("idempotent set");
assert_eq!(
outcome,
CommitOutcome::Empty,
"re-setting the same property value must log no mutation"
);
let ((), outcome) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
writer.set(id, name, "beta")?;
Ok(())
})
.expect("changed set");
assert!(matches!(outcome, CommitOutcome::Committed(_)));
let name = database
.bind(&schema)
.expect("bind")
.key::<crate::Text>("name")
.expect("name key");
let value = database
.read(|read| {
Ok(read
.element(id)
.and_then(|element| element.properties().get::<crate::Text, String>(name)))
})
.expect("read");
assert_eq!(value.as_deref(), Some("beta"));
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
let path = temp_store("schema");
let mut database = Db::create(&path).expect("create");
let schema = Schema::new()
.label("function")
.key::<crate::Text>("name", PropertyFamily::Element)
.equality_index("name_eq", "name");
let (alpha, beta) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
let function = bound.label("function")?;
let alpha = writer.upsert_element(name_eq, "alpha")?;
writer.add_label(alpha, function)?;
let beta = writer.upsert_element(name_eq, "beta")?;
Ok((alpha, beta))
})
.expect("apply + write")
.0;
assert_ne!(alpha, beta);
let (_bound, outcome) = database
.write(|writer| writer.apply_schema(&schema))
.expect("re-apply");
assert_eq!(
outcome,
CommitOutcome::Empty,
"re-applying a schema registers nothing new"
);
let reopened = Db::open(&path).expect("open");
let bound = reopened.bind(&schema).expect("bind");
let name_eq = bound
.equality_index::<crate::Text>("name_eq")
.expect("typed index");
assert!(
bound.equality_index::<crate::Int>("name_eq").is_err(),
"a wrong-value-type handle request is a SchemaConflict"
);
let found = reopened
.read(|read| read.element_by_key(name_eq, "alpha"))
.expect("read")
.expect("alpha present");
assert_eq!(found.id, alpha);
let _ = std::fs::remove_dir_all(&path);
}
#[derive(Debug, Eq, PartialEq)]
struct LogicalState {
elements: Vec<ElementId>,
rank_eq_500: Vec<PropertySubject>,
person_members: Vec<ElementId>,
}
struct Fixture {
rank: PropertyKeyId,
person: LabelId,
}
fn build_fixture(database: &mut Db) -> Fixture {
let mut writer = database.begin_write().expect("begin write");
let rank = writer
.register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
.expect("rank key");
let person = writer.register_label("Person").expect("person label");
for index in 0..8u64 {
let element = writer.create_element().expect("element");
writer
.set(
element,
Key::<crate::Int>::from_id(rank),
i64::try_from(index).expect("index") * 100,
)
.expect("set rank");
if index % 2 == 0 {
writer.add_label(element, person).expect("add label");
}
}
writer.commit().expect("commit fixture");
Fixture { rank, person }
}
fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
let read = database.reader();
let elements = read.element_ids();
let rank_eq_500 = read
.lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
.expect("rank lookup");
let person_members = read.snapshot.view().elements_with_label(fixture.person);
LogicalState {
elements,
rank_eq_500,
person_members,
}
}
fn assert_no_id_reuse_across_fold(database: &mut Db) {
let max_existing = database
.reader()
.element_ids()
.into_iter()
.map(ElementId::get)
.max()
.unwrap_or(0);
let expected = ElementId::new(max_existing + 1);
let mut writer = database.begin_write().expect("watermark probe writer");
let minted = writer.create_element().expect("watermark probe element");
assert_eq!(
minted, expected,
"the next minted id must be one past the max existing id (watermark \
survived the fold; ids are never reused)",
);
drop(writer);
}
#[test]
fn checkpoint_crash_matrix_recovers_exact_state() {
for stop in [
CheckpointStop::BeforeSuperblock,
CheckpointStop::BeforeRotate,
CheckpointStop::Complete,
] {
let path = temp_store(&format!("crash-{stop:?}"));
let mut database = Db::create(&path).expect("create");
let fixture = build_fixture(&mut database);
let before = read_logical(&database, &fixture);
let before_generation = database.base_generation;
database
.checkpoint_inner(stop)
.expect("checkpoint stop returns ok");
drop(database);
let mut recovered = Db::open(&path).expect("reopen after crash");
let after = read_logical(&recovered, &fixture);
assert_eq!(
after, before,
"crash at {stop:?} must recover the exact logical state",
);
assert_no_id_reuse_across_fold(&mut recovered);
match stop {
CheckpointStop::BeforeSuperblock => assert_eq!(
recovered.base_generation, before_generation,
"old superblock stays authoritative before the new one lands",
),
CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
recovered.base_generation,
before_generation + 1,
"the new superblock names the folded generation",
),
}
let reopened = Db::open(&path).expect("second reopen");
assert_eq!(read_logical(&reopened, &fixture), before);
drop(reopened);
let _ = std::fs::remove_dir_all(&path);
}
}
#[test]
fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
let manual_path = temp_store("auto-manual");
let mut manual = Db::create(&manual_path).expect("create manual");
manual.set_checkpoint_policy(CheckpointPolicy::Manual);
let _fixture = build_fixture(&mut manual);
for _ in 0..200 {
let mut writer = manual.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert_eq!(
manual.live_generation(),
CheckpointGeneration::new(0),
"manual policy must never auto-fold",
);
drop(manual);
let _ = std::fs::remove_dir_all(&manual_path);
let auto_path = temp_store("auto-ratio");
let mut auto = Db::create(&auto_path).expect("create auto");
auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
let fixture = build_fixture(&mut auto);
let before = read_logical(&auto, &fixture);
for _ in 0..400 {
let mut writer = auto.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert!(
auto.live_generation() > CheckpointGeneration::new(0),
"size-ratio policy must auto-fold once the log outgrows the base",
);
let after = read_logical(&auto, &fixture);
assert_eq!(after.rank_eq_500, before.rank_eq_500);
assert_eq!(after.person_members, before.person_members);
assert_no_id_reuse_across_fold(&mut auto);
assert_eq!(
auto.checkpoint_policy(),
CheckpointPolicy::SizeRatio { factor: 1 },
"the auto-fold reopen must preserve the configured policy",
);
let status = auto.stats();
assert_eq!(status.live_generation, auto.live_generation());
assert!(status.base_byte_size > 0, "live base has bytes");
drop(auto);
let _ = std::fs::remove_dir_all(&auto_path);
}
}