use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, btree_map},
sync::Arc,
};
use yoke::Yoke;
use zerocopy::byteorder::{LE, U64};
use crate::{
Catalog, CheckpointGeneration, DbError, ElementId, IncidenceId, IndexId, LabelId, ProjectionId,
PropertyKeyId, RelationId, RelationTypeId, RoleId,
backing::{Base, BaseView},
catalog::{IndexDefinition, ProjectionDefinition, PropertyFamily, PropertyKeyDefinition},
id::CommitSeq,
index::{BaseIndex, BorrowedBaseIndex, OverlayIndex, OwnedBaseIndex},
state::{ElementRecord, IncidenceRecord, NextIds, PropertySubject, RelationRecord},
value::{PropertyType, PropertyValue},
wire::{self, MUTATION_OP_PAYLOAD_WORDS, MutationOp},
};
type Delta<R> = BTreeMap<<R as Keyed>::Id, Option<R>>;
pub(crate) trait Keyed {
type Id: Copy + Ord;
fn record_id(&self) -> Self::Id;
}
impl Keyed for ElementRecord {
type Id = ElementId;
fn record_id(&self) -> Self::Id {
self.id
}
}
impl Keyed for RelationRecord {
type Id = RelationId;
fn record_id(&self) -> Self::Id {
self.id
}
}
impl Keyed for IncidenceRecord {
type Id = IncidenceId;
fn record_id(&self) -> Self::Id {
self.id
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct MutationLog {
ops: Vec<MutationOp>,
blob: Vec<u8>,
}
impl MutationLog {
pub(crate) const fn is_empty(&self) -> bool {
self.ops.is_empty()
}
fn intern(&mut self, value: &[u8]) -> (u64, u64) {
let offset = self.blob.len() as u64;
self.blob.extend_from_slice(value);
let len = value.len() as u64;
(offset, len)
}
fn intern_words(&mut self, words: &[u64]) -> (u64, u64) {
let offset = self.blob.len() as u64;
for word in words {
self.blob.extend_from_slice(&word.to_le_bytes());
}
let len = size_of_val(words) as u64;
(offset, len)
}
fn push(&mut self, op_kind: u32, flags: u32, words: &[u64]) {
let mut payload = [U64::<LE>::new(0); MUTATION_OP_PAYLOAD_WORDS];
for (slot, value) in payload.iter_mut().zip(words) {
*slot = U64::new(*value);
}
self.ops.push(MutationOp {
op_kind: op_kind.into(),
flags: flags.into(),
payload,
});
}
fn push_watermark(&mut self, next: NextIds) {
self.push(
wire::OP_NEXT_ID_WATERMARK,
0,
&[
next.element.get(),
next.relation.get(),
next.incidence.get(),
next.role.get(),
next.label.get(),
next.relation_type.get(),
next.property_key.get(),
next.projection.get(),
next.index.get(),
],
);
}
}
fn blob_str(blob: &[u8], offset: u64, len: u64, lsn: u64) -> Result<String, DbError> {
let start = usize::try_from(offset).map_err(|_overflow| DbError::LogCorrupt {
lsn,
reason: "blob offset overflow",
})?;
let length = usize::try_from(len).map_err(|_overflow| DbError::LogCorrupt {
lsn,
reason: "blob length overflow",
})?;
let end = start.checked_add(length).ok_or(DbError::LogCorrupt {
lsn,
reason: "blob slice overflow",
})?;
let bytes = blob.get(start..end).ok_or(DbError::LogCorrupt {
lsn,
reason: "blob slice out of bounds",
})?;
core::str::from_utf8(bytes)
.map(str::to_owned)
.map_err(|_error| DbError::LogCorrupt {
lsn,
reason: "blob slice is not UTF-8",
})
}
#[derive(Clone, Debug)]
pub(crate) struct WriteOverlay {
elements: Delta<ElementRecord>,
relations: Delta<RelationRecord>,
incidences: Delta<IncidenceRecord>,
properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>>,
catalog: Catalog,
next: NextIds,
log: MutationLog,
index: OverlayIndex,
}
impl WriteOverlay {
pub(crate) const fn new(next: NextIds, catalog: Catalog) -> Self {
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog,
next,
log: MutationLog {
ops: Vec::new(),
blob: Vec::new(),
},
index: OverlayIndex::new(),
}
}
pub(crate) fn from_overlay(parent: &Overlay) -> Self {
Self {
elements: parent.elements.clone(),
relations: parent.relations.clone(),
incidences: parent.incidences.clone(),
properties: parent.properties.clone(),
catalog: parent.catalog.clone(),
next: parent.next,
log: MutationLog {
ops: Vec::new(),
blob: Vec::new(),
},
index: parent.index.clone(),
}
}
pub(crate) const fn is_empty(&self) -> bool {
self.log.is_empty()
}
pub(crate) fn encode_frame(&self) -> (Vec<MutationOp>, Vec<u8>) {
let mut log = self.log.clone();
log.push_watermark(self.next);
(log.ops, log.blob)
}
pub(crate) const fn catalog(&self) -> &Catalog {
&self.catalog
}
pub(crate) fn apply_replay_op(
&mut self,
base: &BaseRecords,
op: &MutationOp,
blob: &[u8],
lsn: u64,
) -> Result<(), DbError> {
let kind = op.op_kind.get();
let payload = &op.payload;
let word = |index: usize| payload.get(index).map_or(0, |w| w.get());
match kind {
wire::OP_CREATE_ELEMENT
| wire::OP_CREATE_RELATION
| wire::OP_CREATE_INCIDENCE
| wire::OP_TOMBSTONE_ELEMENT
| wire::OP_TOMBSTONE_RELATION
| wire::OP_TOMBSTONE_INCIDENCE
| wire::OP_ADD_ELEMENT_LABEL
| wire::OP_ADD_RELATION_LABEL
| wire::OP_SET_RELATION_TYPE => {
self.apply_topology_op(kind, base, op);
}
wire::OP_SET_PROPERTY => {
self.apply_set_property(base, op, blob, lsn)?;
}
wire::OP_REMOVE_PROPERTY => {
let (subject_kind, _high) = wire::unpack_flags(op.flags.get());
let subject =
wire::decode_subject(subject_kind, word(0)).ok_or(DbError::LogCorrupt {
lsn,
reason: "remove-property subject kind",
})?;
self.remove_property_inner(base, subject, PropertyKeyId::new(word(1)));
}
wire::OP_CATALOG_REGISTER_ROLE
| wire::OP_CATALOG_REGISTER_LABEL
| wire::OP_CATALOG_REGISTER_RELATION_TYPE
| wire::OP_CATALOG_REGISTER_PROPERTY_KEY
| wire::OP_CATALOG_REGISTER_PROJECTION
| wire::OP_CATALOG_REGISTER_INDEX => {
self.apply_catalog_op(kind, op, blob, lsn)?;
}
wire::OP_NEXT_ID_WATERMARK => {
self.next = NextIds {
element: ElementId::new(word(0)),
relation: RelationId::new(word(1)),
incidence: IncidenceId::new(word(2)),
role: RoleId::new(word(3)),
label: LabelId::new(word(4)),
relation_type: RelationTypeId::new(word(5)),
property_key: PropertyKeyId::new(word(6)),
projection: ProjectionId::new(word(7)),
index: IndexId::new(word(8)),
};
}
_other => {
return Err(DbError::LogCorrupt {
lsn,
reason: "unknown mutation op kind",
});
}
}
Ok(())
}
fn apply_topology_op(&mut self, kind: u32, base: &BaseRecords, op: &MutationOp) {
let payload = &op.payload;
let word = |index: usize| payload.get(index).map_or(0, |w| w.get());
match kind {
wire::OP_CREATE_ELEMENT => {
let id = ElementId::new(word(0));
self.elements.insert(
id,
Some(ElementRecord {
id,
labels: BTreeSet::new(),
}),
);
}
wire::OP_CREATE_RELATION => {
let id = RelationId::new(word(0));
self.relations.insert(
id,
Some(RelationRecord {
id,
relation_type: None,
labels: BTreeSet::new(),
}),
);
}
wire::OP_CREATE_INCIDENCE => {
let id = IncidenceId::new(word(0));
let relation = RelationId::new(word(1));
let element = ElementId::new(word(2));
let role = RoleId::new(word(3));
self.incidences.insert(
id,
Some(IncidenceRecord {
id,
relation,
element,
role,
}),
);
self.index.on_create_incidence(id, relation, element);
}
wire::OP_TOMBSTONE_ELEMENT => {
self.tombstone_element_replay(base, ElementId::new(word(0)));
}
wire::OP_TOMBSTONE_RELATION => {
self.tombstone_relation_replay(base, RelationId::new(word(0)));
}
wire::OP_TOMBSTONE_INCIDENCE => {
self.tombstone_incidence_replay(base, IncidenceId::new(word(0)));
}
wire::OP_ADD_ELEMENT_LABEL => {
self.add_element_label_replay(base, ElementId::new(word(0)), LabelId::new(word(1)));
}
wire::OP_ADD_RELATION_LABEL => {
self.add_relation_label_replay(
base,
RelationId::new(word(0)),
LabelId::new(word(1)),
);
}
_set_type => {
self.set_relation_type_replay(
base,
RelationId::new(word(0)),
RelationTypeId::new(word(1)),
);
}
}
}
fn apply_catalog_op(
&mut self,
kind: u32,
op: &MutationOp,
blob: &[u8],
lsn: u64,
) -> Result<(), DbError> {
let payload = &op.payload;
let word = |index: usize| payload.get(index).map_or(0, |w| w.get());
let name = blob_str(blob, word(1), word(2), lsn)?;
match kind {
wire::OP_CATALOG_REGISTER_ROLE => self.catalog.insert_role(RoleId::new(word(0)), name),
wire::OP_CATALOG_REGISTER_LABEL => {
self.catalog.insert_label(LabelId::new(word(0)), name)
}
wire::OP_CATALOG_REGISTER_RELATION_TYPE => self
.catalog
.insert_relation_type(RelationTypeId::new(word(0)), name),
wire::OP_CATALOG_REGISTER_PROPERTY_KEY => {
let (family_tag, value_tag) = wire::unpack_flags(op.flags.get());
let family =
wire::property_family_from_tag(family_tag).ok_or(DbError::LogCorrupt {
lsn,
reason: "property-key family tag",
})?;
let value_type =
wire::property_type_from_tag(value_tag).ok_or(DbError::LogCorrupt {
lsn,
reason: "property-key value tag",
})?;
self.catalog.insert_property_key(PropertyKeyDefinition {
id: PropertyKeyId::new(word(0)),
name,
family,
value_type,
})
}
wire::OP_CATALOG_REGISTER_PROJECTION => {
let words = decode_def_words(blob, word(3), word(4), lsn)?;
let definition = decode_projection_def(op.flags.get(), name, &words, lsn)?;
self.catalog
.insert_projection(ProjectionId::new(word(0)), definition)
}
_index => {
let words = decode_def_words(blob, word(3), word(4), lsn)?;
let definition = decode_index_def(op.flags.get(), &words, lsn)?;
self.catalog
.insert_index(IndexId::new(word(0)), name, definition)
}
}
}
fn apply_set_property(
&mut self,
base: &BaseRecords,
op: &MutationOp,
blob: &[u8],
lsn: u64,
) -> Result<(), DbError> {
let payload = &op.payload;
let word = |index: usize| payload.get(index).map_or(0, |w| w.get());
let (subject_kind, value_tag) = wire::unpack_flags(op.flags.get());
let subject = wire::decode_subject(subject_kind, word(0)).ok_or(DbError::LogCorrupt {
lsn,
reason: "set-property subject kind",
})?;
let value_type = wire::property_type_from_tag(value_tag).ok_or(DbError::LogCorrupt {
lsn,
reason: "set-property value tag",
})?;
let value = match value_type {
PropertyType::Boolean => PropertyValue::Boolean(word(2) != 0),
PropertyType::Integer => PropertyValue::Integer(word(2).cast_signed()),
PropertyType::Text => PropertyValue::Text(blob_str(blob, word(3), word(4), lsn)?),
};
self.set_property_inner(base, subject, PropertyKeyId::new(word(1)), value);
Ok(())
}
fn tombstone_element_replay(&mut self, base: &BaseRecords, id: ElementId) {
self.tombstone_element_inner(base, id);
}
fn tombstone_relation_replay(&mut self, base: &BaseRecords, id: RelationId) {
self.tombstone_relation_inner(base, id);
}
fn tombstone_incidence_replay(&mut self, base: &BaseRecords, id: IncidenceId) {
self.tombstone_incidence_inner(base, id);
}
fn add_element_label_replay(&mut self, base: &BaseRecords, element: ElementId, label: LabelId) {
self.add_element_label_inner(base, element, label);
}
fn add_relation_label_replay(
&mut self,
base: &BaseRecords,
relation: RelationId,
label: LabelId,
) {
match self.relations.entry(relation) {
btree_map::Entry::Occupied(mut entry) => {
if let Some(record) = entry.get_mut() {
record.labels.insert(label);
}
}
btree_map::Entry::Vacant(entry) => {
if let Some(record) = base.relation(relation) {
let mut record = record.clone();
record.labels.insert(label);
entry.insert(Some(record));
}
}
}
}
fn set_relation_type_replay(
&mut self,
base: &BaseRecords,
relation: RelationId,
relation_type: RelationTypeId,
) {
self.set_relation_type_inner(base, relation, relation_type);
}
pub(crate) const fn next_ids(&self) -> NextIds {
self.next
}
pub(crate) const fn set_next_ids(&mut self, next: NextIds) {
self.next = next;
}
fn visible_property(
&self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<PropertyValue> {
self.properties
.get(&subject)
.and_then(|keys| keys.get(&key))
.map_or_else(|| base.property(subject, key).cloned(), Clone::clone)
}
fn visible_property_is(
&self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
value: &PropertyValue,
) -> bool {
self.properties
.get(&subject)
.and_then(|keys| keys.get(&key))
.map_or_else(
|| base.property(subject, key) == Some(value),
|slot| slot.as_ref() == Some(value),
)
}
fn visible_element_labels(&self, base: &BaseRecords, element: ElementId) -> BTreeSet<LabelId> {
match self.elements.get(&element) {
Some(Some(record)) => record.labels.clone(),
Some(None) => BTreeSet::new(),
None => base
.element(element)
.map_or_else(BTreeSet::new, |record| record.labels.clone()),
}
}
fn visible_relation_type(
&self,
base: &BaseRecords,
relation: RelationId,
) -> Option<RelationTypeId> {
match self.relations.get(&relation) {
Some(Some(record)) => record.relation_type,
Some(None) => None,
None => base
.relation(relation)
.and_then(|record| record.relation_type),
}
}
fn visible_incidence(&self, base: &BaseRecords, id: IncidenceId) -> Option<IncidenceRecord> {
match self.incidences.get(&id) {
Some(Some(record)) => Some(*record),
Some(None) => None,
None => base.incidence(id).copied(),
}
}
fn visible_subject_properties(
&self,
base: &BaseRecords,
subject: PropertySubject,
) -> BTreeMap<PropertyKeyId, PropertyValue> {
let mut visible: BTreeMap<PropertyKeyId, PropertyValue> = BTreeMap::new();
if let Some(keys) = base.properties.get(&subject) {
for (key, value) in keys {
visible.insert(*key, value.clone());
}
}
let Some(keys) = self.properties.get(&subject) else {
return visible;
};
for (key, entry) in keys {
if let Some(value) = entry {
visible.insert(*key, value.clone());
} else {
visible.remove(key);
}
}
visible
}
pub(crate) fn create_element(&mut self) -> Result<ElementId, DbError> {
let id = self.next.element;
self.next.element = id.checked_next().ok_or(DbError::IdOverflow)?;
self.elements.insert(
id,
Some(ElementRecord {
id,
labels: BTreeSet::new(),
}),
);
self.log.push(wire::OP_CREATE_ELEMENT, 0, &[id.get()]);
Ok(id)
}
pub(crate) fn create_relation(&mut self) -> Result<RelationId, DbError> {
let id = self.next.relation;
self.next.relation = id.checked_next().ok_or(DbError::IdOverflow)?;
self.relations.insert(
id,
Some(RelationRecord {
id,
relation_type: None,
labels: BTreeSet::new(),
}),
);
self.log.push(wire::OP_CREATE_RELATION, 0, &[id.get()]);
Ok(id)
}
pub(crate) fn create_incidence(
&mut self,
relation: RelationId,
element: ElementId,
role: RoleId,
) -> Result<IncidenceId, DbError> {
let id = self.next.incidence;
self.next.incidence = id.checked_next().ok_or(DbError::IdOverflow)?;
self.incidences.insert(
id,
Some(IncidenceRecord {
id,
relation,
element,
role,
}),
);
self.index.on_create_incidence(id, relation, element);
self.log.push(
wire::OP_CREATE_INCIDENCE,
0,
&[id.get(), relation.get(), element.get(), role.get()],
);
Ok(id)
}
pub(crate) fn tombstone_element(&mut self, base: &BaseRecords, id: ElementId) {
self.tombstone_element_inner(base, id);
self.log.push(wire::OP_TOMBSTONE_ELEMENT, 0, &[id.get()]);
}
fn tombstone_element_inner(&mut self, base: &BaseRecords, id: ElementId) {
let subject = PropertySubject::Element(id);
let labels = self.visible_element_labels(base, id);
let properties = self.visible_subject_properties(base, subject);
self.index.on_tombstone_element(id, &labels, &properties);
self.elements.insert(id, None);
self.tombstone_subject_properties(base, subject);
}
pub(crate) fn tombstone_relation(&mut self, base: &BaseRecords, id: RelationId) {
self.tombstone_relation_inner(base, id);
self.log.push(wire::OP_TOMBSTONE_RELATION, 0, &[id.get()]);
}
fn tombstone_relation_inner(&mut self, base: &BaseRecords, id: RelationId) {
let subject = PropertySubject::Relation(id);
let relation_type = self.visible_relation_type(base, id);
let properties = self.visible_subject_properties(base, subject);
self.index
.on_tombstone_relation(id, relation_type, &properties);
self.relations.insert(id, None);
self.tombstone_subject_properties(base, subject);
}
pub(crate) fn tombstone_incidence(&mut self, base: &BaseRecords, id: IncidenceId) {
self.tombstone_incidence_inner(base, id);
self.log.push(wire::OP_TOMBSTONE_INCIDENCE, 0, &[id.get()]);
}
fn tombstone_incidence_inner(&mut self, base: &BaseRecords, id: IncidenceId) {
let subject = PropertySubject::Incidence(id);
let properties = self.visible_subject_properties(base, subject);
if let Some(record) = self.visible_incidence(base, id) {
self.index
.on_tombstone_incidence(id, record.relation, record.element, &properties);
}
self.incidences.insert(id, None);
self.tombstone_subject_properties(base, subject);
}
fn tombstone_subject_properties(&mut self, base: &BaseRecords, subject: PropertySubject) {
self.properties.remove(&subject);
if let Some(keys) = base.properties.get(&subject) {
let entry = self.properties.entry(subject).or_default();
for key in keys.keys() {
entry.insert(*key, None);
}
}
}
pub(crate) fn add_element_label(
&mut self,
base: &BaseRecords,
element: ElementId,
label: LabelId,
) {
self.add_element_label_inner(base, element, label);
self.log
.push(wire::OP_ADD_ELEMENT_LABEL, 0, &[element.get(), label.get()]);
}
fn add_element_label_inner(&mut self, base: &BaseRecords, element: ElementId, label: LabelId) {
let mut labelled = false;
match self.elements.entry(element) {
btree_map::Entry::Occupied(mut entry) => {
if let Some(record) = entry.get_mut() {
record.labels.insert(label);
labelled = true;
}
}
btree_map::Entry::Vacant(entry) => {
if let Some(record) = base.element(element) {
let mut record = record.clone();
record.labels.insert(label);
entry.insert(Some(record));
labelled = true;
}
}
}
if labelled {
self.index.on_add_element_label(element, label);
}
}
pub(crate) fn add_relation_label(
&mut self,
base: &BaseRecords,
relation: RelationId,
label: LabelId,
) {
match self.relations.entry(relation) {
btree_map::Entry::Occupied(mut entry) => {
if let Some(record) = entry.get_mut() {
record.labels.insert(label);
}
}
btree_map::Entry::Vacant(entry) => {
if let Some(record) = base.relation(relation) {
let mut record = record.clone();
record.labels.insert(label);
entry.insert(Some(record));
}
}
}
self.log.push(
wire::OP_ADD_RELATION_LABEL,
0,
&[relation.get(), label.get()],
);
}
pub(crate) fn set_relation_type(
&mut self,
base: &BaseRecords,
relation: RelationId,
relation_type: RelationTypeId,
) {
self.set_relation_type_inner(base, relation, relation_type);
self.log.push(
wire::OP_SET_RELATION_TYPE,
0,
&[relation.get(), relation_type.get()],
);
}
fn set_relation_type_inner(
&mut self,
base: &BaseRecords,
relation: RelationId,
relation_type: RelationTypeId,
) {
let previous = self.visible_relation_type(base, relation);
let mut typed = false;
match self.relations.entry(relation) {
btree_map::Entry::Occupied(mut entry) => {
if let Some(record) = entry.get_mut() {
record.relation_type = Some(relation_type);
typed = true;
}
}
btree_map::Entry::Vacant(entry) => {
if let Some(record) = base.relation(relation) {
let mut record = record.clone();
record.relation_type = Some(relation_type);
entry.insert(Some(record));
typed = true;
}
}
}
if typed {
self.index
.on_set_relation_type(relation, previous, relation_type);
}
}
pub(crate) fn set_property(
&mut self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
value: PropertyValue,
) {
if self.visible_property_is(base, subject, key, &value) {
return;
}
let (subject_kind, subject_id) = wire::encode_subject(subject);
let value_tag = wire::property_type_tag(value.value_type());
let (scalar, text_off, text_len) = match &value {
PropertyValue::Boolean(flag) => (u64::from(*flag), 0, 0),
PropertyValue::Integer(number) => ((*number).cast_unsigned(), 0, 0),
PropertyValue::Text(string) => {
let (off, len) = self.log.intern(string.as_bytes());
(0, off, len)
}
};
self.log.push(
wire::OP_SET_PROPERTY,
wire::pack_flags(subject_kind, value_tag),
&[subject_id, key.get(), scalar, text_off, text_len],
);
self.set_property_inner(base, subject, key, value);
}
fn set_property_inner(
&mut self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
value: PropertyValue,
) {
let previous = self.visible_property(base, subject, key);
self.index
.on_set_property(subject, key, previous.as_ref(), &value);
self.properties
.entry(subject)
.or_default()
.insert(key, Some(value));
}
pub(crate) fn remove_property(
&mut self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
) {
let (subject_kind, subject_id) = wire::encode_subject(subject);
self.log.push(
wire::OP_REMOVE_PROPERTY,
wire::pack_flags(subject_kind, 0),
&[subject_id, key.get()],
);
self.remove_property_inner(base, subject, key);
}
fn remove_property_inner(
&mut self,
base: &BaseRecords,
subject: PropertySubject,
key: PropertyKeyId,
) {
let previous = self.visible_property(base, subject, key);
self.index
.on_remove_property(subject, key, previous.as_ref());
self.properties
.entry(subject)
.or_default()
.insert(key, None);
}
pub(crate) fn register_role(&mut self, name: String) -> Result<RoleId, DbError> {
let id = self.next.role;
self.next.role = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(name.as_bytes());
self.catalog.insert_role(id, name)?;
self.log.push(
wire::OP_CATALOG_REGISTER_ROLE,
0,
&[id.get(), name_off, name_len],
);
Ok(id)
}
pub(crate) fn register_label(&mut self, name: String) -> Result<LabelId, DbError> {
let id = self.next.label;
self.next.label = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(name.as_bytes());
self.catalog.insert_label(id, name)?;
self.log.push(
wire::OP_CATALOG_REGISTER_LABEL,
0,
&[id.get(), name_off, name_len],
);
Ok(id)
}
pub(crate) fn register_relation_type(
&mut self,
name: String,
) -> Result<RelationTypeId, DbError> {
let id = self.next.relation_type;
self.next.relation_type = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(name.as_bytes());
self.catalog.insert_relation_type(id, name)?;
self.log.push(
wire::OP_CATALOG_REGISTER_RELATION_TYPE,
0,
&[id.get(), name_off, name_len],
);
Ok(id)
}
pub(crate) fn register_property_key(
&mut self,
name: String,
family: PropertyFamily,
value_type: PropertyType,
) -> Result<PropertyKeyId, DbError> {
let id = self.next.property_key;
self.next.property_key = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(name.as_bytes());
self.catalog.insert_property_key(PropertyKeyDefinition {
id,
name,
family,
value_type,
})?;
self.log.push(
wire::OP_CATALOG_REGISTER_PROPERTY_KEY,
wire::pack_flags(
wire::property_family_tag(family),
wire::property_type_tag(value_type),
),
&[id.get(), name_off, name_len],
);
Ok(id)
}
pub(crate) fn register_projection(
&mut self,
definition: ProjectionDefinition,
) -> Result<ProjectionId, DbError> {
let id = self.next.projection;
self.next.projection = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(definition.name().as_bytes());
let (kind, words) = encode_projection_words(&definition);
let (def_off, def_len) = self.log.intern_words(&words);
self.catalog.insert_projection(id, definition)?;
self.log.push(
wire::OP_CATALOG_REGISTER_PROJECTION,
kind,
&[id.get(), name_off, name_len, def_off, def_len],
);
Ok(id)
}
pub(crate) fn register_index(
&mut self,
name: String,
definition: IndexDefinition,
) -> Result<IndexId, DbError> {
let id = self.next.index;
self.next.index = id.checked_next().ok_or(DbError::IdOverflow)?;
let (name_off, name_len) = self.log.intern(name.as_bytes());
let (kind, words) = encode_index_words(&definition);
let (def_off, def_len) = self.log.intern_words(&words);
self.catalog.insert_index(id, name, definition)?;
self.log.push(
wire::OP_CATALOG_REGISTER_INDEX,
kind,
&[id.get(), name_off, name_len, def_off, def_len],
);
Ok(id)
}
pub(crate) fn freeze(self) -> Overlay {
Overlay {
elements: self.elements,
relations: self.relations,
incidences: self.incidences,
properties: self.properties,
catalog: self.catalog,
next: self.next,
index: self.index,
}
}
}
const DEF_PROJECTION_GRAPH: u32 = 0;
const DEF_PROJECTION_HYPER: u32 = 1;
const DEF_INDEX_LABEL: u32 = 0;
const DEF_INDEX_RELATION_TYPE: u32 = 1;
const DEF_INDEX_PROPERTY_EQUALITY: u32 = 2;
const DEF_INDEX_PROPERTY_RANGE: u32 = 3;
const DEF_INDEX_COMPOSITE_EQUALITY: u32 = 4;
const DEF_INDEX_PROJECTION: u32 = 5;
fn push_id_set(words: &mut Vec<u64>, ids: impl ExactSizeIterator<Item = u64>) {
words.push(ids.len() as u64);
words.extend(ids);
}
fn encode_projection_words(definition: &ProjectionDefinition) -> (u32, Vec<u64>) {
let mut words = Vec::new();
match definition {
ProjectionDefinition::Graph(graph) => {
words.push(graph.source_role.get());
words.push(graph.target_role.get());
push_id_set(&mut words, graph.relation_types.iter().map(|id| id.get()));
(DEF_PROJECTION_GRAPH, words)
}
ProjectionDefinition::Hypergraph(hyper) => {
push_id_set(&mut words, hyper.source_roles.iter().map(|id| id.get()));
push_id_set(&mut words, hyper.target_roles.iter().map(|id| id.get()));
push_id_set(&mut words, hyper.relation_types.iter().map(|id| id.get()));
(DEF_PROJECTION_HYPER, words)
}
}
}
fn encode_index_words(definition: &IndexDefinition) -> (u32, Vec<u64>) {
match definition {
IndexDefinition::Label { label } => (DEF_INDEX_LABEL, vec![label.get()]),
IndexDefinition::RelationType { relation_type } => {
(DEF_INDEX_RELATION_TYPE, vec![relation_type.get()])
}
IndexDefinition::PropertyEquality { key } => (DEF_INDEX_PROPERTY_EQUALITY, vec![key.get()]),
IndexDefinition::PropertyRange { key } => (DEF_INDEX_PROPERTY_RANGE, vec![key.get()]),
IndexDefinition::CompositeEquality { keys } => (
DEF_INDEX_COMPOSITE_EQUALITY,
keys.iter().map(|key| key.get()).collect(),
),
IndexDefinition::Projection { projection } => {
(DEF_INDEX_PROJECTION, vec![projection.get()])
}
}
}
fn decode_def_words(blob: &[u8], offset: u64, len: u64, lsn: u64) -> Result<Vec<u64>, DbError> {
let start = usize::try_from(offset).map_err(|_overflow| DbError::LogCorrupt {
lsn,
reason: "def offset overflow",
})?;
let length = usize::try_from(len).map_err(|_overflow| DbError::LogCorrupt {
lsn,
reason: "def length overflow",
})?;
let end = start.checked_add(length).ok_or(DbError::LogCorrupt {
lsn,
reason: "def slice overflow",
})?;
let bytes = blob.get(start..end).ok_or(DbError::LogCorrupt {
lsn,
reason: "def slice out of bounds",
})?;
if !bytes.len().is_multiple_of(size_of::<u64>()) {
return Err(DbError::LogCorrupt {
lsn,
reason: "def slice is not whole u64 words",
});
}
Ok(bytes
.chunks_exact(size_of::<u64>())
.map(|chunk| {
let mut word = [0u8; 8];
word.copy_from_slice(chunk);
u64::from_le_bytes(word)
})
.collect())
}
fn read_id_set(words: &[u64], cursor: &mut usize, lsn: u64) -> Result<Vec<u64>, DbError> {
let count = usize::try_from(*words.get(*cursor).ok_or(DbError::LogCorrupt {
lsn,
reason: "def missing id-set length",
})?)
.map_err(|_overflow| DbError::LogCorrupt {
lsn,
reason: "def id-set length overflow",
})?;
*cursor += 1;
let end = cursor.checked_add(count).ok_or(DbError::LogCorrupt {
lsn,
reason: "def id-set overflow",
})?;
let slice = words.get(*cursor..end).ok_or(DbError::LogCorrupt {
lsn,
reason: "def id-set out of bounds",
})?;
let ids = slice.to_vec();
*cursor = end;
Ok(ids)
}
fn decode_projection_def(
discriminant: u32,
name: String,
words: &[u64],
lsn: u64,
) -> Result<ProjectionDefinition, DbError> {
match discriminant {
DEF_PROJECTION_GRAPH => {
let source_role = *words.first().ok_or(DbError::LogCorrupt {
lsn,
reason: "graph def missing source role",
})?;
let target_role = *words.get(1).ok_or(DbError::LogCorrupt {
lsn,
reason: "graph def missing target role",
})?;
let mut cursor = 2;
let relation_types = read_id_set(words, &mut cursor, lsn)?
.into_iter()
.map(RelationTypeId::new)
.collect();
Ok(ProjectionDefinition::Graph(
crate::catalog::GraphProjectionDefinition {
name,
relation_types,
source_role: RoleId::new(source_role),
target_role: RoleId::new(target_role),
},
))
}
DEF_PROJECTION_HYPER => {
let mut cursor = 0;
let source_roles = read_id_set(words, &mut cursor, lsn)?
.into_iter()
.map(RoleId::new)
.collect();
let target_roles = read_id_set(words, &mut cursor, lsn)?
.into_iter()
.map(RoleId::new)
.collect();
let relation_types = read_id_set(words, &mut cursor, lsn)?
.into_iter()
.map(RelationTypeId::new)
.collect();
Ok(ProjectionDefinition::Hypergraph(
crate::catalog::HypergraphProjectionDefinition {
name,
relation_types,
source_roles,
target_roles,
},
))
}
_other => Err(DbError::LogCorrupt {
lsn,
reason: "unknown projection definition kind",
}),
}
}
fn decode_index_def(
discriminant: u32,
words: &[u64],
lsn: u64,
) -> Result<IndexDefinition, DbError> {
let first = || {
words.first().copied().ok_or(DbError::LogCorrupt {
lsn,
reason: "index def missing id",
})
};
match discriminant {
DEF_INDEX_LABEL => Ok(IndexDefinition::Label {
label: LabelId::new(first()?),
}),
DEF_INDEX_RELATION_TYPE => Ok(IndexDefinition::RelationType {
relation_type: RelationTypeId::new(first()?),
}),
DEF_INDEX_PROPERTY_EQUALITY => Ok(IndexDefinition::PropertyEquality {
key: PropertyKeyId::new(first()?),
}),
DEF_INDEX_PROPERTY_RANGE => Ok(IndexDefinition::PropertyRange {
key: PropertyKeyId::new(first()?),
}),
DEF_INDEX_COMPOSITE_EQUALITY => Ok(IndexDefinition::CompositeEquality {
keys: words.iter().map(|word| PropertyKeyId::new(*word)).collect(),
}),
DEF_INDEX_PROJECTION => Ok(IndexDefinition::Projection {
projection: ProjectionId::new(first()?),
}),
_other => Err(DbError::LogCorrupt {
lsn,
reason: "unknown index definition kind",
}),
}
}
#[derive(Clone, Debug)]
pub(crate) struct Overlay {
elements: Delta<ElementRecord>,
relations: Delta<RelationRecord>,
incidences: Delta<IncidenceRecord>,
properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>>,
catalog: Catalog,
next: NextIds,
index: OverlayIndex,
}
impl Overlay {
pub(crate) const fn empty(next: NextIds, catalog: Catalog) -> Self {
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog,
next,
index: OverlayIndex::new(),
}
}
#[cfg(any(test, kani))]
pub(crate) const fn next_ids(&self) -> NextIds {
self.next
}
#[cfg(any(test, kani))]
pub(crate) fn with_applied(&self, delta: &WriteOverlay) -> Self {
let mut next = self.clone();
for (id, entry) in &delta.elements {
next.elements.insert(*id, entry.clone());
}
for (id, entry) in &delta.relations {
next.relations.insert(*id, entry.clone());
}
for (id, entry) in &delta.incidences {
next.incidences.insert(*id, *entry);
}
for (subject, keys) in &delta.properties {
let merged = next.properties.entry(*subject).or_default();
for (key, value) in keys {
merged.insert(*key, value.clone());
}
}
next.catalog = delta.catalog.clone();
next.next = delta.next;
next.index = delta.index.clone();
next
}
}
enum HeldIndex {
Owned(OwnedBaseIndex),
Borrowed(Yoke<BorrowedBaseIndex<'static>, Arc<Base>>),
}
impl std::fmt::Debug for HeldIndex {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Owned(_owned) => formatter.write_str("HeldIndex::Owned(..)"),
Self::Borrowed(_yoke) => formatter.write_str("HeldIndex::Borrowed(..)"),
}
}
}
impl HeldIndex {
fn view(&self) -> BaseIndex<'_> {
match self {
Self::Owned(owned) => BaseIndex::Owned(owned),
Self::Borrowed(yoke) => BaseIndex::Borrowed(*yoke.get()),
}
}
}
impl Clone for HeldIndex {
fn clone(&self) -> Self {
match self {
Self::Owned(owned) => Self::Owned(owned.clone()),
Self::Borrowed(yoke) => Self::Borrowed(yoke.clone()),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct BaseRecords {
elements: BTreeMap<ElementId, ElementRecord>,
relations: BTreeMap<RelationId, RelationRecord>,
incidences: BTreeMap<IncidenceId, IncidenceRecord>,
properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>,
index: HeldIndex,
}
type DecodedRecords = (
BTreeMap<ElementId, ElementRecord>,
BTreeMap<RelationId, RelationRecord>,
BTreeMap<IncidenceId, IncidenceRecord>,
BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>,
);
fn decode_records(view: &BaseView<'_>) -> Result<DecodedRecords, DbError> {
let mut elements = BTreeMap::new();
for record in view.elements() {
let labels = decode_labels(view.element_label_run(record))?;
let id = ElementId::new(record.id.get());
elements.insert(id, ElementRecord { id, labels });
}
let mut relations = BTreeMap::new();
for record in view.relations() {
let labels = decode_labels(view.relation_label_run(record))?;
let id = RelationId::new(record.id.get());
relations.insert(
id,
RelationRecord {
id,
relation_type: crate::wire::decode_relation_type(record.relation_type.get()),
labels,
},
);
}
let mut incidences = BTreeMap::new();
for record in view.incidences() {
let id = IncidenceId::new(record.id.get());
incidences.insert(
id,
IncidenceRecord {
id,
relation: RelationId::new(record.relation.get()),
element: ElementId::new(record.element.get()),
role: RoleId::new(record.role.get()),
},
);
}
let properties = decode_base_properties(view)?;
Ok((elements, relations, incidences, properties))
}
impl BaseRecords {
pub(crate) const fn empty() -> Self {
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
index: HeldIndex::Owned(OwnedBaseIndex::empty()),
}
}
pub(crate) fn open(base: &Arc<Base>) -> Result<Self, DbError> {
let (elements, relations, incidences, properties) = decode_records(base.get())?;
let index = HeldIndex::Borrowed(Yoke::attach_to_cart(Arc::clone(base), |base: &Base| {
base.get().index()
}));
let records = Self {
elements,
relations,
incidences,
properties,
index,
};
#[cfg(debug_assertions)]
records.debug_assert_index_matches();
Ok(records)
}
#[cfg(debug_assertions)]
fn debug_assert_index_matches(&self) {
let oracle = OwnedBaseIndex::from_records(
&self.elements,
&self.relations,
&self.incidences,
&self.properties,
);
debug_assert!(
crate::index::indexes_agree(self.index.view(), BaseIndex::Owned(&oracle)),
"borrowed base index disagrees with the from_records oracle",
);
}
pub(crate) fn index(&self) -> BaseIndex<'_> {
self.index.view()
}
#[cfg(test)]
pub(crate) fn from_view(view: &BaseView<'_>) -> Result<Self, DbError> {
let (elements, relations, incidences, properties) = decode_records(view)?;
let index = HeldIndex::Owned(OwnedBaseIndex::from_records(
&elements,
&relations,
&incidences,
&properties,
));
Ok(Self {
elements,
relations,
incidences,
properties,
index,
})
}
fn element(&self, id: ElementId) -> Option<&ElementRecord> {
self.elements.get(&id)
}
fn relation(&self, id: RelationId) -> Option<&RelationRecord> {
self.relations.get(&id)
}
fn incidence(&self, id: IncidenceId) -> Option<&IncidenceRecord> {
self.incidences.get(&id)
}
fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<&PropertyValue> {
self.properties
.get(&subject)
.and_then(|keys| keys.get(&key))
}
}
fn decode_labels(
run: Option<&[zerocopy::byteorder::U64<zerocopy::byteorder::LE>]>,
) -> Result<BTreeSet<LabelId>, DbError> {
let run = run.ok_or_else(|| DbError::invalid_store("base label run out of bounds"))?;
Ok(run.iter().map(|word| LabelId::new(word.get())).collect())
}
fn decode_base_properties(
view: &BaseView<'_>,
) -> Result<BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>, DbError> {
let mut properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> =
BTreeMap::new();
for record in view.properties() {
let subject =
crate::wire::decode_subject(record.subject_kind.get(), record.subject_id.get())
.ok_or_else(|| DbError::invalid_store("base property subject kind out of range"))?;
let value_type = crate::wire::property_type_from_tag(record.value_tag.get())
.ok_or_else(|| DbError::invalid_store("base property value tag out of range"))?;
let value = match value_type {
PropertyType::Boolean => PropertyValue::Boolean(record.scalar.get() != 0),
PropertyType::Integer => PropertyValue::Integer(record.scalar.get().cast_signed()),
PropertyType::Text => {
let bytes = view
.property_text(record)
.ok_or_else(|| DbError::invalid_store("base property text out of bounds"))?;
let text = core::str::from_utf8(bytes)
.map_err(|_error| DbError::invalid_store("base property text is not UTF-8"))?;
PropertyValue::Text(text.to_owned())
}
};
properties
.entry(subject)
.or_default()
.insert(PropertyKeyId::new(record.key.get()), value);
}
Ok(properties)
}
#[derive(Clone)]
pub(crate) struct Snapshot {
generation: CheckpointGeneration,
lsn: CommitSeq,
base: Arc<Base>,
overlay: Arc<Overlay>,
base_records: Arc<BaseRecords>,
}
impl Snapshot {
pub(crate) const fn with_shared_base_records(
generation: CheckpointGeneration,
lsn: CommitSeq,
base: Arc<Base>,
overlay: Arc<Overlay>,
base_records: Arc<BaseRecords>,
) -> Self {
Self {
generation,
lsn,
base,
overlay,
base_records,
}
}
pub(crate) const fn generation(&self) -> CheckpointGeneration {
self.generation
}
pub(crate) const fn lsn(&self) -> CommitSeq {
self.lsn
}
pub(crate) const fn overlay(&self) -> &Arc<Overlay> {
&self.overlay
}
pub(crate) const fn base(&self) -> &Arc<Base> {
&self.base
}
pub(crate) const fn base_records(&self) -> &Arc<BaseRecords> {
&self.base_records
}
pub(crate) fn view(&self) -> MergedState<'_> {
MergedState {
base: &self.base_records,
overlay: &self.overlay,
}
}
}
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<Snapshot>();
const _: () = assert_send_sync::<Overlay>();
pub(crate) trait StateView {
fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>>;
fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>>;
fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>>;
fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>>;
fn contains_element(&self, id: ElementId) -> bool {
self.element(id).is_some()
}
fn contains_relation(&self, id: RelationId) -> bool {
self.relation(id).is_some()
}
fn contains_incidence(&self, id: IncidenceId) -> bool {
self.incidence(id).is_some()
}
fn elements(&self) -> impl Iterator<Item = Cow<'_, ElementRecord>>;
fn relations(&self) -> impl Iterator<Item = Cow<'_, RelationRecord>>;
fn incidences(&self) -> impl Iterator<Item = Cow<'_, IncidenceRecord>>;
fn properties(
&self,
) -> impl Iterator<Item = (PropertySubject, PropertyKeyId, Cow<'_, PropertyValue>)>;
fn element_count(&self) -> usize {
self.elements().count()
}
fn relation_count(&self) -> usize {
self.relations().count()
}
fn incidence_count(&self) -> usize {
self.incidences().count()
}
fn catalog(&self) -> &Catalog;
fn next_ids(&self) -> NextIds;
fn element_incidences(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.element == element)
.map(Cow::into_owned)
.collect()
}
fn relation_incidences(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.relation == relation)
.map(Cow::into_owned)
.collect()
}
fn elements_with_label(&self, label: LabelId) -> Vec<ElementId> {
self.elements()
.filter(|record| record.labels.contains(&label))
.map(|record| record.id)
.collect()
}
fn relations_with_type(&self, relation_type: RelationTypeId) -> Vec<RelationId> {
self.relations()
.filter(|record| record.relation_type == Some(relation_type))
.map(|record| record.id)
.collect()
}
fn property_equal(&self, key: PropertyKeyId, value: &PropertyValue) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, candidate_value)| {
*candidate_key == key && candidate_value.as_ref() == value
})
.map(|(subject, _key, _value)| subject)
.collect()
}
fn typed_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value(key, value)?;
Ok(self.property_equal(key, value))
}
fn typed_property_equal_for_family(
&self,
key: PropertyKeyId,
family: PropertyFamily,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value_for_family(key, family, value)?;
Ok(self.property_equal(key, value))
}
fn property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, value)| {
*candidate_key == key && value.as_ref() >= min && value.as_ref() <= max
})
.map(|(subject, _key, _value)| subject)
.collect()
}
fn typed_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value(key, min)?;
self.validate_lookup_value(key, max)?;
if min > max {
return Ok(Vec::new());
}
Ok(self.property_range(key, min, max))
}
fn typed_property_composite_equal(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Result<Vec<PropertySubject>, DbError> {
if keys.len() != values.len() {
return Err(DbError::unsupported(
"composite equality tuple arity mismatch",
));
}
for (key, value) in keys.iter().copied().zip(values) {
self.validate_lookup_value(key, value)?;
}
Ok(self
.properties_by_subject()
.into_iter()
.filter_map(|(subject, subject_values)| {
keys.iter()
.copied()
.zip(values)
.all(|(key, value)| subject_values.get(&key) == Some(value))
.then_some(subject)
})
.collect())
}
fn validate_lookup_value(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<(), DbError> {
let definition = self
.catalog()
.property_key(key)
.ok_or(DbError::UnknownPropertyKey { id: key })?;
let actual = value.value_type();
if definition.value_type != actual {
return Err(DbError::PropertyTypeMismatch {
expected: definition.value_type,
actual,
});
}
Ok(())
}
fn validate_lookup_value_for_family(
&self,
key: PropertyKeyId,
family: PropertyFamily,
value: &PropertyValue,
) -> Result<(), DbError> {
let definition = self
.catalog()
.property_key(key)
.ok_or(DbError::UnknownPropertyKey { id: key })?;
if definition.family != family {
return Err(DbError::WrongPropertyFamily {
expected: definition.family,
actual: family,
});
}
if definition.value_type != value.value_type() {
return Err(DbError::PropertyTypeMismatch {
expected: definition.value_type,
actual: value.value_type(),
});
}
Ok(())
}
fn properties_by_subject(
&self,
) -> BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> {
let mut grouped: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> =
BTreeMap::new();
for (subject, key, value) in self.properties() {
grouped
.entry(subject)
.or_default()
.insert(key, value.into_owned());
}
grouped
}
}
pub(crate) trait OverlayLayer {
fn elements(&self) -> &Delta<ElementRecord>;
fn relations(&self) -> &Delta<RelationRecord>;
fn incidences(&self) -> &Delta<IncidenceRecord>;
fn properties(
&self,
) -> &BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>>;
fn catalog(&self) -> &Catalog;
fn next_ids(&self) -> NextIds;
fn index(&self) -> &OverlayIndex;
}
impl OverlayLayer for Overlay {
fn elements(&self) -> &Delta<ElementRecord> {
&self.elements
}
fn relations(&self) -> &Delta<RelationRecord> {
&self.relations
}
fn incidences(&self) -> &Delta<IncidenceRecord> {
&self.incidences
}
fn properties(
&self,
) -> &BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>> {
&self.properties
}
fn catalog(&self) -> &Catalog {
&self.catalog
}
fn next_ids(&self) -> NextIds {
self.next
}
fn index(&self) -> &OverlayIndex {
&self.index
}
}
impl OverlayLayer for WriteOverlay {
fn elements(&self) -> &Delta<ElementRecord> {
&self.elements
}
fn relations(&self) -> &Delta<RelationRecord> {
&self.relations
}
fn incidences(&self) -> &Delta<IncidenceRecord> {
&self.incidences
}
fn properties(
&self,
) -> &BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>> {
&self.properties
}
fn catalog(&self) -> &Catalog {
&self.catalog
}
fn next_ids(&self) -> NextIds {
self.next
}
fn index(&self) -> &OverlayIndex {
&self.index
}
}
#[derive(Clone, Copy)]
pub(crate) struct LayeredState<'a, L: OverlayLayer> {
base: &'a BaseRecords,
overlay: &'a L,
}
impl<'a, L: OverlayLayer> LayeredState<'a, L> {
pub(crate) const fn new(base: &'a BaseRecords, overlay: &'a L) -> Self {
Self { base, overlay }
}
pub(crate) fn element_ref(&self, id: ElementId) -> Option<Cow<'a, ElementRecord>> {
match self.overlay.elements().get(&id) {
Some(Some(record)) => Some(Cow::Owned(record.clone())),
Some(None) => None,
None => self.base.element(id).map(Cow::Borrowed),
}
}
pub(crate) fn relation_ref(&self, id: RelationId) -> Option<Cow<'a, RelationRecord>> {
match self.overlay.relations().get(&id) {
Some(Some(record)) => Some(Cow::Owned(record.clone())),
Some(None) => None,
None => self.base.relation(id).map(Cow::Borrowed),
}
}
pub(crate) fn incidence_ref(&self, id: IncidenceId) -> Option<Cow<'a, IncidenceRecord>> {
match self.overlay.incidences().get(&id) {
Some(Some(record)) => Some(Cow::Owned(*record)),
Some(None) => None,
None => self.base.incidence(id).map(Cow::Borrowed),
}
}
pub(crate) fn property_ref(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'a, PropertyValue>> {
match self
.overlay
.properties()
.get(&subject)
.and_then(|keys| keys.get(&key))
{
Some(Some(value)) => Some(Cow::Owned(value.clone())),
Some(None) => None,
None => self.base.property(subject, key).map(Cow::Borrowed),
}
}
pub(crate) fn subject_properties(
&self,
subject: PropertySubject,
) -> Vec<(PropertyKeyId, PropertyValue)> {
let mut merged: BTreeMap<PropertyKeyId, PropertyValue> = self
.base
.properties
.get(&subject)
.cloned()
.unwrap_or_default();
let overlay_keys = self.overlay.properties().get(&subject);
for (key, value) in overlay_keys.into_iter().flatten() {
if let Some(value) = value {
merged.insert(*key, value.clone());
} else {
merged.remove(key);
}
}
merged.into_iter().collect()
}
pub(crate) fn property_key_subjects(
&self,
key: PropertyKeyId,
) -> Vec<(PropertySubject, PropertyValue)> {
self.overlay
.index()
.property_key_subjects(self.base.index(), key)
}
pub(crate) fn catalog_ref(&self) -> &'a Catalog {
self.overlay.catalog()
}
}
pub(crate) type MergedState<'a> = LayeredState<'a, Overlay>;
pub(crate) type WriteMergedState<'a> = LayeredState<'a, WriteOverlay>;
impl<L: OverlayLayer> StateView for LayeredState<'_, L> {
fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>> {
self.element_ref(id)
}
fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>> {
self.relation_ref(id)
}
fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>> {
self.incidence_ref(id)
}
fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>> {
self.property_ref(subject, key)
}
fn elements(&self) -> impl Iterator<Item = Cow<'_, ElementRecord>> {
MergeIter::new(self.base.elements.values(), self.overlay.elements().iter())
}
fn relations(&self) -> impl Iterator<Item = Cow<'_, RelationRecord>> {
MergeIter::new(
self.base.relations.values(),
self.overlay.relations().iter(),
)
}
fn incidences(&self) -> impl Iterator<Item = Cow<'_, IncidenceRecord>> {
MergeIter::new(
self.base.incidences.values(),
self.overlay.incidences().iter(),
)
}
fn properties(
&self,
) -> impl Iterator<Item = (PropertySubject, PropertyKeyId, Cow<'_, PropertyValue>)> {
PropertyMergeIter::new(
base_property_triples(self.base),
overlay_property_triples(self.overlay.properties()),
)
}
fn catalog(&self) -> &Catalog {
self.overlay.catalog()
}
fn next_ids(&self) -> NextIds {
self.overlay.next_ids()
}
fn elements_with_label(&self, label: LabelId) -> Vec<ElementId> {
self.overlay
.index()
.elements_with_label(self.base.index(), label)
}
fn relations_with_type(&self, relation_type: RelationTypeId) -> Vec<RelationId> {
self.overlay
.index()
.relations_with_type(self.base.index(), relation_type)
}
fn element_incidences(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.overlay
.index()
.element_incidences(self.base.index(), element)
.into_iter()
.filter_map(|id| self.incidence_ref(id).map(Cow::into_owned))
.collect()
}
fn relation_incidences(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.overlay
.index()
.relation_incidences(self.base.index(), relation)
.into_iter()
.filter_map(|id| self.incidence_ref(id).map(Cow::into_owned))
.collect()
}
fn property_equal(&self, key: PropertyKeyId, value: &PropertyValue) -> Vec<PropertySubject> {
self.overlay
.index()
.property_equal(self.base.index(), key, value)
}
fn property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.overlay
.index()
.property_range(self.base.index(), key, min, max)
}
fn typed_property_composite_equal(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Result<Vec<PropertySubject>, DbError> {
if keys.len() != values.len() {
return Err(DbError::unsupported(
"composite equality tuple arity mismatch",
));
}
for (key, value) in keys.iter().copied().zip(values) {
self.validate_lookup_value(key, value)?;
}
let pairs: Vec<(PropertyKeyId, PropertyValue)> =
keys.iter().copied().zip(values.iter().cloned()).collect();
Ok(self
.overlay
.index()
.property_composite_equal(self.base.index(), &pairs))
}
}
#[cfg(test)]
impl<L: OverlayLayer> LayeredState<'_, L> {
pub(crate) fn elements_with_label_scan(&self, label: LabelId) -> Vec<ElementId> {
self.elements()
.filter(|record| record.labels.contains(&label))
.map(|record| record.id)
.collect()
}
pub(crate) fn relations_with_type_scan(
&self,
relation_type: RelationTypeId,
) -> Vec<RelationId> {
self.relations()
.filter(|record| record.relation_type == Some(relation_type))
.map(|record| record.id)
.collect()
}
pub(crate) fn element_incidences_scan(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.element == element)
.map(Cow::into_owned)
.collect()
}
pub(crate) fn relation_incidences_scan(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.relation == relation)
.map(Cow::into_owned)
.collect()
}
pub(crate) fn property_equal_scan(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, candidate_value)| {
*candidate_key == key && candidate_value.as_ref() == value
})
.map(|(subject, _key, _value)| subject)
.collect()
}
pub(crate) fn property_range_scan(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, value)| {
*candidate_key == key && value.as_ref() >= min && value.as_ref() <= max
})
.map(|(subject, _key, _value)| subject)
.collect()
}
pub(crate) fn property_composite_equal_scan(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Vec<PropertySubject> {
self.properties_by_subject()
.into_iter()
.filter_map(|(subject, subject_values)| {
keys.iter()
.copied()
.zip(values)
.all(|(key, value)| subject_values.get(&key) == Some(value))
.then_some(subject)
})
.collect()
}
}
struct MergeIter<'a, R: Keyed + Clone> {
base: std::iter::Peekable<btree_map::Values<'a, R::Id, R>>,
overlay: std::iter::Peekable<btree_map::Iter<'a, R::Id, Option<R>>>,
}
impl<'a, R: Keyed + Clone> MergeIter<'a, R> {
fn new(
base: btree_map::Values<'a, R::Id, R>,
overlay: btree_map::Iter<'a, R::Id, Option<R>>,
) -> Self {
Self {
base: base.peekable(),
overlay: overlay.peekable(),
}
}
fn take_base(&mut self) -> Option<Cow<'a, R>> {
self.base.next().map(Cow::Borrowed)
}
fn take_overlay(&mut self, mask_base: bool) -> Step<Cow<'a, R>> {
if mask_base {
let _masked = self.base.next();
}
let Some((_id, entry)) = self.overlay.next() else {
return Step::Done;
};
entry.as_ref().map_or(Step::Again, |record| {
Step::Yield(Cow::Owned(record.clone()))
})
}
fn step(&mut self) -> Step<Cow<'a, R>> {
let base_id = self.base.peek().map(|record| record.record_id());
let overlay_id = self.overlay.peek().map(|(id, _entry)| **id);
match (base_id, overlay_id) {
(None, None) => Step::Done,
(Some(_base), None) => self.take_base().map_or(Step::Done, Step::Yield),
(Some(base), Some(overlay)) if base < overlay => {
self.take_base().map_or(Step::Done, Step::Yield)
}
(_other, Some(overlay)) => self.take_overlay(base_id == Some(overlay)),
}
}
}
impl<'a, R: Keyed + Clone> Iterator for MergeIter<'a, R> {
type Item = Cow<'a, R>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.step() {
Step::Done => return None,
Step::Yield(item) => return Some(item),
Step::Again => {}
}
}
}
}
enum Step<T> {
Done,
Yield(T),
Again,
}
struct PropertyMergeIter<'a> {
base: Vec<(PropertySubject, PropertyKeyId, &'a PropertyValue)>,
overlay: Vec<(PropertySubject, PropertyKeyId, &'a Option<PropertyValue>)>,
base_index: usize,
overlay_index: usize,
}
impl<'a> PropertyMergeIter<'a> {
const fn new(
base: Vec<(PropertySubject, PropertyKeyId, &'a PropertyValue)>,
overlay: Vec<(PropertySubject, PropertyKeyId, &'a Option<PropertyValue>)>,
) -> Self {
Self {
base,
overlay,
base_index: 0,
overlay_index: 0,
}
}
}
impl<'a> PropertyMergeIter<'a> {
fn take_base(&mut self) -> Option<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
let (subject, key, value) = *self.base.get(self.base_index)?;
self.base_index += 1;
Some((subject, key, Cow::Borrowed(value)))
}
fn take_overlay(
&mut self,
mask_base: bool,
) -> Step<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
if mask_base {
self.base_index += 1;
}
let Some(&(subject, key, entry)) = self.overlay.get(self.overlay_index) else {
return Step::Done;
};
self.overlay_index += 1;
entry.as_ref().map_or(Step::Again, |value| {
Step::Yield((subject, key, Cow::Owned(value.clone())))
})
}
fn step(&mut self) -> Step<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
let base_pair = self
.base
.get(self.base_index)
.map(|&(subject, key, _value)| (subject, key));
let overlay_pair = self
.overlay
.get(self.overlay_index)
.map(|&(subject, key, _entry)| (subject, key));
match (base_pair, overlay_pair) {
(None, None) => Step::Done,
(Some(_base), None) => self.take_base().map_or(Step::Done, Step::Yield),
(Some(base), Some(overlay)) if base < overlay => {
self.take_base().map_or(Step::Done, Step::Yield)
}
(_other, Some(overlay)) => self.take_overlay(base_pair == Some(overlay)),
}
}
}
impl<'a> Iterator for PropertyMergeIter<'a> {
type Item = (PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>);
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.step() {
Step::Done => return None,
Step::Yield(item) => return Some(item),
Step::Again => {}
}
}
}
}
fn base_property_triples(
base: &BaseRecords,
) -> Vec<(PropertySubject, PropertyKeyId, &PropertyValue)> {
base.properties
.iter()
.flat_map(|(subject, keys)| keys.iter().map(move |(key, value)| (*subject, *key, value)))
.collect()
}
fn overlay_property_triples(
properties: &BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, Option<PropertyValue>>>,
) -> Vec<(PropertySubject, PropertyKeyId, &Option<PropertyValue>)> {
properties
.iter()
.flat_map(|(subject, keys)| keys.iter().map(move |(key, value)| (*subject, *key, value)))
.collect()
}
#[cfg(kani)]
impl BaseRecords {
pub(crate) fn proof_elements(ids: &[ElementId]) -> Self {
let mut elements = BTreeMap::new();
for id in ids.iter().copied() {
elements.insert(
id,
ElementRecord {
id,
labels: BTreeSet::new(),
},
);
}
Self {
elements,
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
index: HeldIndex::Owned(OwnedBaseIndex::empty()),
}
}
}
#[cfg(all(kani, feature = "kani-heavy"))]
impl Overlay {
pub(crate) fn proof_element_entries(entries: &[(ElementId, bool)]) -> Self {
let mut elements = BTreeMap::new();
for (id, present) in entries.iter().copied() {
let entry = present.then(|| ElementRecord {
id,
labels: BTreeSet::new(),
});
elements.insert(id, entry);
}
Self {
elements,
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog: Catalog::empty(),
next: proof_zero_next_ids(),
index: OverlayIndex::new(),
}
}
pub(crate) fn proof_is_element_tombstoned(&self, id: ElementId) -> bool {
matches!(self.elements.get(&id), Some(None))
}
pub(crate) fn proof_with_next_element(next: u64) -> Self {
let mut watermark = proof_zero_next_ids();
watermark.element = ElementId::new(next);
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog: Catalog::empty(),
next: watermark,
index: OverlayIndex::new(),
}
}
}
#[cfg(all(kani, feature = "kani-heavy"))]
fn proof_zero_next_ids() -> NextIds {
NextIds {
element: ElementId::new(1),
relation: RelationId::new(1),
incidence: IncidenceId::new(1),
role: RoleId::new(1),
label: LabelId::new(1),
relation_type: RelationTypeId::new(1),
property_key: PropertyKeyId::new(1),
projection: ProjectionId::new(1),
index: IndexId::new(1),
}
}
#[cfg(test)]
pub(crate) mod test_support {
use super::{BaseRecords, MergedState, Overlay, WriteOverlay};
use crate::{
PropertySubject,
backing::Base,
catalog::{Catalog, PropertyFamily},
freeze::{FreezeStamps, freeze_view},
state::NextIds,
value::{PropertyType, PropertyValue},
};
pub(crate) fn freeze_writer(write: WriteOverlay) -> Base {
let base = BaseRecords::empty();
let overlay = write.freeze();
let view = MergedState::new(&base, &overlay);
let bytes = freeze_view(
&view,
FreezeStamps {
commit_seq: 1,
transaction_id: 1,
generation: 1,
},
)
.expect("freeze writer view");
Base::open_owned_bytes(bytes).expect("attach base")
}
pub(crate) fn small_base() -> Base {
let mut write = WriteOverlay::new(NextIds::INITIAL, Catalog::empty());
let base = BaseRecords::empty();
let person = write.register_label("Person".to_owned()).expect("label");
let _robot = write.register_label("Robot".to_owned()).expect("label");
let calls = write
.register_relation_type("calls".to_owned())
.expect("rtype");
let caller = write.register_role("caller".to_owned()).expect("role");
let name = write
.register_property_key(
"name".to_owned(),
PropertyFamily::Element,
PropertyType::Text,
)
.expect("name key");
let rank = write
.register_property_key(
"rank".to_owned(),
PropertyFamily::Element,
PropertyType::Integer,
)
.expect("rank key");
let weight = write
.register_property_key(
"weight".to_owned(),
PropertyFamily::Relation,
PropertyType::Integer,
)
.expect("weight key");
let slot = write
.register_property_key(
"slot".to_owned(),
PropertyFamily::Incidence,
PropertyType::Integer,
)
.expect("slot key");
let e1 = write.create_element().expect("e1");
let e2 = write.create_element().expect("e2");
let _e3 = write.create_element().expect("e3");
write.add_element_label(&base, e1, person);
write.add_element_label(&base, e2, person);
let r1 = write.create_relation().expect("r1");
let _r2 = write.create_relation().expect("r2");
write.set_relation_type(&base, r1, calls);
let inc1 = write.create_incidence(r1, e1, caller).expect("inc1");
write.create_incidence(r1, e2, caller).expect("inc2");
write.set_property(
&base,
PropertySubject::Element(e1),
name,
PropertyValue::Text("Alice".to_owned()),
);
write.set_property(
&base,
PropertySubject::Relation(r1),
weight,
PropertyValue::Integer(3),
);
write.set_property(
&base,
PropertySubject::Incidence(inc1),
slot,
PropertyValue::Integer(1),
);
write.set_property(
&base,
PropertySubject::Element(e1),
rank,
PropertyValue::Integer(-5),
);
write.set_property(
&base,
PropertySubject::Element(e2),
name,
PropertyValue::Text("Bob".to_owned()),
);
freeze_writer(write)
}
pub(crate) fn base_view_from_ops() -> (BaseRecords, Overlay) {
let mut write = WriteOverlay::new(NextIds::INITIAL, Catalog::empty());
let base = BaseRecords::empty();
let name = write
.register_property_key(
"name".to_owned(),
PropertyFamily::Element,
PropertyType::Text,
)
.expect("name key");
let e1 = write.create_element().expect("e1");
let _e2 = write.create_element().expect("e2");
write.set_property(
&base,
PropertySubject::Element(e1),
name,
PropertyValue::Text("Alice".to_owned()),
);
(base, write.freeze())
}
}
#[cfg(test)]
mod tests;