use std::{collections::BTreeMap, sync::Arc};
use yoke::Yoke;
#[cfg(any(test, kani))]
use super::write::WriteOverlay;
use super::{Delta, SubjectDelta, merge::MergedState};
use crate::{
Catalog, CheckpointGeneration, DbError, ElementId, IncidenceId, LabelId, PropertyKeyId,
RelationId, RoleId,
backing::{Base, BaseView},
id::CommitSeq,
index::{BaseIndex, BorrowedBaseIndex, OverlayIndex, OwnedBaseIndex},
state::{ElementRecord, IncidenceRecord, LabelSet, NextIds, PropertySubject, RelationRecord},
value::{PropertyType, PropertyValue},
};
#[cfg(all(kani, feature = "kani-heavy"))]
use crate::{IndexId, ProjectionId, RelationTypeId};
#[derive(Clone, Debug)]
pub(crate) struct Overlay {
pub(super) elements: Delta<ElementRecord>,
pub(super) relations: Delta<RelationRecord>,
pub(super) incidences: Delta<IncidenceRecord>,
pub(super) properties: BTreeMap<PropertySubject, SubjectDelta>,
pub(super) catalog: Catalog,
pub(super) next: NextIds,
pub(super) index: OverlayIndex,
}
impl Overlay {
pub(crate) const fn empty(next: NextIds, catalog: Catalog) -> Self {
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog,
next,
index: OverlayIndex::new(),
}
}
#[cfg(any(test, kani))]
pub(crate) const fn next_ids(&self) -> NextIds {
self.next
}
#[cfg(any(test, kani))]
pub(crate) fn with_applied(&self, delta: &WriteOverlay) -> Self {
let mut next = self.clone();
for (id, entry) in &delta.elements {
next.elements.insert(*id, entry.clone());
}
for (id, entry) in &delta.relations {
next.relations.insert(*id, entry.clone());
}
for (id, entry) in &delta.incidences {
next.incidences.insert(*id, *entry);
}
for (subject, keys) in &delta.properties {
let merged = Arc::make_mut(next.properties.entry(*subject).or_default());
for (key, value) in keys.iter() {
merged.insert(*key, value.clone());
}
}
next.catalog = delta.catalog.clone();
next.next = delta.next;
next.index = delta.index.clone();
next
}
}
enum HeldIndex {
Owned(OwnedBaseIndex),
Borrowed(Yoke<BorrowedBaseIndex<'static>, Arc<Base>>),
}
impl std::fmt::Debug for HeldIndex {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Owned(_owned) => formatter.write_str("HeldIndex::Owned(..)"),
Self::Borrowed(_yoke) => formatter.write_str("HeldIndex::Borrowed(..)"),
}
}
}
impl HeldIndex {
fn view(&self) -> BaseIndex<'_> {
match self {
Self::Owned(owned) => BaseIndex::Owned(owned),
Self::Borrowed(yoke) => BaseIndex::Borrowed(*yoke.get()),
}
}
}
impl Clone for HeldIndex {
fn clone(&self) -> Self {
match self {
Self::Owned(owned) => Self::Owned(owned.clone()),
Self::Borrowed(yoke) => Self::Borrowed(yoke.clone()),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct BaseRecords {
pub(super) elements: BTreeMap<ElementId, ElementRecord>,
pub(super) relations: BTreeMap<RelationId, RelationRecord>,
pub(super) incidences: BTreeMap<IncidenceId, IncidenceRecord>,
pub(super) properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>,
index: HeldIndex,
}
type DecodedRecords = (
BTreeMap<ElementId, ElementRecord>,
BTreeMap<RelationId, RelationRecord>,
BTreeMap<IncidenceId, IncidenceRecord>,
BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>,
);
fn decode_records(view: &BaseView<'_>) -> Result<DecodedRecords, DbError> {
let mut elements = BTreeMap::new();
for record in view.elements() {
let labels = decode_labels(view.element_label_run(record))?;
let id = ElementId::new(record.id.get());
elements.insert(id, ElementRecord { id, labels });
}
let mut relations = BTreeMap::new();
for record in view.relations() {
let labels = decode_labels(view.relation_label_run(record))?;
let id = RelationId::new(record.id.get());
relations.insert(
id,
RelationRecord {
id,
relation_type: crate::wire::decode_relation_type(record.relation_type.get()),
labels,
},
);
}
let mut incidences = BTreeMap::new();
for record in view.incidences() {
let id = IncidenceId::new(record.id.get());
incidences.insert(
id,
IncidenceRecord {
id,
relation: RelationId::new(record.relation.get()),
element: ElementId::new(record.element.get()),
role: RoleId::new(record.role.get()),
},
);
}
let properties = decode_base_properties(view)?;
Ok((elements, relations, incidences, properties))
}
impl BaseRecords {
pub(crate) const fn empty() -> Self {
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
index: HeldIndex::Owned(OwnedBaseIndex::empty()),
}
}
pub(crate) fn open(base: &Arc<Base>) -> Result<Self, DbError> {
let (elements, relations, incidences, properties) = decode_records(base.get())?;
let index = HeldIndex::Borrowed(Yoke::attach_to_cart(Arc::clone(base), |base: &Base| {
base.get().index()
}));
let records = Self {
elements,
relations,
incidences,
properties,
index,
};
#[cfg(debug_assertions)]
records.debug_assert_index_matches();
Ok(records)
}
#[cfg(debug_assertions)]
fn debug_assert_index_matches(&self) {
let oracle = OwnedBaseIndex::from_records(
&self.elements,
&self.relations,
&self.incidences,
&self.properties,
);
debug_assert!(
crate::index::indexes_agree(self.index.view(), BaseIndex::Owned(&oracle)),
"borrowed base index disagrees with the from_records oracle",
);
}
pub(crate) fn index(&self) -> BaseIndex<'_> {
self.index.view()
}
#[cfg(test)]
pub(crate) fn from_view(view: &BaseView<'_>) -> Result<Self, DbError> {
let (elements, relations, incidences, properties) = decode_records(view)?;
let index = HeldIndex::Owned(OwnedBaseIndex::from_records(
&elements,
&relations,
&incidences,
&properties,
));
Ok(Self {
elements,
relations,
incidences,
properties,
index,
})
}
pub(super) fn element(&self, id: ElementId) -> Option<&ElementRecord> {
self.elements.get(&id)
}
pub(super) fn relation(&self, id: RelationId) -> Option<&RelationRecord> {
self.relations.get(&id)
}
pub(super) fn incidence(&self, id: IncidenceId) -> Option<&IncidenceRecord> {
self.incidences.get(&id)
}
pub(super) fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<&PropertyValue> {
self.properties
.get(&subject)
.and_then(|keys| keys.get(&key))
}
}
fn decode_labels(
run: Option<&[zerocopy::byteorder::U64<zerocopy::byteorder::LE>]>,
) -> Result<LabelSet, DbError> {
let run = run.ok_or_else(|| DbError::invalid_store("base label run out of bounds"))?;
Ok(run.iter().map(|word| LabelId::new(word.get())).collect())
}
fn decode_base_properties(
view: &BaseView<'_>,
) -> Result<BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>>, DbError> {
let mut properties: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> =
BTreeMap::new();
for record in view.properties() {
let subject =
crate::wire::decode_subject(record.subject_kind.get(), record.subject_id.get())
.ok_or_else(|| DbError::invalid_store("base property subject kind out of range"))?;
let value_type = crate::wire::property_type_from_tag(record.value_tag.get())
.ok_or_else(|| DbError::invalid_store("base property value tag out of range"))?;
let value = match value_type {
PropertyType::Boolean => PropertyValue::Boolean(record.scalar.get() != 0),
PropertyType::Integer => PropertyValue::Integer(record.scalar.get().cast_signed()),
PropertyType::Text => {
let bytes = view
.property_text(record)
.ok_or_else(|| DbError::invalid_store("base property text out of bounds"))?;
let text = core::str::from_utf8(bytes)
.map_err(|_error| DbError::invalid_store("base property text is not UTF-8"))?;
PropertyValue::Text(Arc::from(text))
}
};
properties
.entry(subject)
.or_default()
.insert(PropertyKeyId::new(record.key.get()), value);
}
Ok(properties)
}
#[derive(Clone)]
pub(crate) struct Snapshot {
generation: CheckpointGeneration,
lsn: CommitSeq,
base: Arc<Base>,
overlay: Arc<Overlay>,
base_records: Arc<BaseRecords>,
}
impl Snapshot {
pub(crate) const fn with_shared_base_records(
generation: CheckpointGeneration,
lsn: CommitSeq,
base: Arc<Base>,
overlay: Arc<Overlay>,
base_records: Arc<BaseRecords>,
) -> Self {
Self {
generation,
lsn,
base,
overlay,
base_records,
}
}
pub(crate) const fn generation(&self) -> CheckpointGeneration {
self.generation
}
pub(crate) const fn lsn(&self) -> CommitSeq {
self.lsn
}
pub(crate) const fn overlay(&self) -> &Arc<Overlay> {
&self.overlay
}
pub(crate) const fn base(&self) -> &Arc<Base> {
&self.base
}
pub(crate) const fn base_records(&self) -> &Arc<BaseRecords> {
&self.base_records
}
pub(crate) fn view(&self) -> MergedState<'_> {
MergedState {
base: &self.base_records,
overlay: &self.overlay,
}
}
}
const fn assert_send_sync<T: Send + Sync>() {}
const _: () = assert_send_sync::<Snapshot>();
const _: () = assert_send_sync::<Overlay>();
#[cfg(kani)]
impl BaseRecords {
pub(crate) fn proof_elements(ids: &[ElementId]) -> Self {
let mut elements = BTreeMap::new();
for id in ids.iter().copied() {
elements.insert(
id,
ElementRecord {
id,
labels: LabelSet::default(),
},
);
}
Self {
elements,
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
index: HeldIndex::Owned(OwnedBaseIndex::empty()),
}
}
}
#[cfg(all(kani, feature = "kani-heavy"))]
impl Overlay {
pub(crate) fn proof_element_entries(entries: &[(ElementId, bool)]) -> Self {
let mut elements = BTreeMap::new();
for (id, present) in entries.iter().copied() {
let entry = present.then(|| ElementRecord {
id,
labels: LabelSet::default(),
});
elements.insert(id, entry);
}
Self {
elements,
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog: Catalog::empty(),
next: proof_zero_next_ids(),
index: OverlayIndex::new(),
}
}
pub(crate) fn proof_is_element_tombstoned(&self, id: ElementId) -> bool {
matches!(self.elements.get(&id), Some(None))
}
pub(crate) fn proof_with_next_element(next: u64) -> Self {
let mut watermark = proof_zero_next_ids();
watermark.element = ElementId::new(next);
Self {
elements: BTreeMap::new(),
relations: BTreeMap::new(),
incidences: BTreeMap::new(),
properties: BTreeMap::new(),
catalog: Catalog::empty(),
next: watermark,
index: OverlayIndex::new(),
}
}
}
#[cfg(all(kani, feature = "kani-heavy"))]
fn proof_zero_next_ids() -> NextIds {
NextIds {
element: ElementId::new(1),
relation: RelationId::new(1),
incidence: IncidenceId::new(1),
role: RoleId::new(1),
label: LabelId::new(1),
relation_type: RelationTypeId::new(1),
property_key: PropertyKeyId::new(1),
projection: ProjectionId::new(1),
index: IndexId::new(1),
}
}