use std::{
borrow::Cow,
collections::{BTreeMap, btree_map},
};
use super::{
Delta, Keyed, SubjectDelta,
frozen::{BaseRecords, Overlay},
write::WriteOverlay,
};
use crate::{
Catalog, DbError, ElementId, IncidenceId, LabelId, PropertyKeyId, RelationId, RelationTypeId,
catalog::PropertyFamily,
index::OverlayIndex,
state::{ElementRecord, IncidenceRecord, NextIds, PropertySubject, RelationRecord},
value::PropertyValue,
};
pub(crate) trait StateView {
fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>>;
fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>>;
fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>>;
fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>>;
fn contains_element(&self, id: ElementId) -> bool {
self.element(id).is_some()
}
fn contains_relation(&self, id: RelationId) -> bool {
self.relation(id).is_some()
}
fn contains_incidence(&self, id: IncidenceId) -> bool {
self.incidence(id).is_some()
}
fn elements(&self) -> impl Iterator<Item = Cow<'_, ElementRecord>>;
fn relations(&self) -> impl Iterator<Item = Cow<'_, RelationRecord>>;
fn incidences(&self) -> impl Iterator<Item = Cow<'_, IncidenceRecord>>;
fn properties(
&self,
) -> impl Iterator<Item = (PropertySubject, PropertyKeyId, Cow<'_, PropertyValue>)>;
fn element_count(&self) -> usize {
self.elements().count()
}
fn relation_count(&self) -> usize {
self.relations().count()
}
fn incidence_count(&self) -> usize {
self.incidences().count()
}
fn catalog(&self) -> &Catalog;
fn next_ids(&self) -> NextIds;
fn element_incidences(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.element == element)
.map(Cow::into_owned)
.collect()
}
fn relation_incidences(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.relation == relation)
.map(Cow::into_owned)
.collect()
}
fn elements_with_label(&self, label: LabelId) -> Vec<ElementId> {
self.elements()
.filter(|record| record.labels.contains(&label))
.map(|record| record.id)
.collect()
}
fn relations_with_type(&self, relation_type: RelationTypeId) -> Vec<RelationId> {
self.relations()
.filter(|record| record.relation_type == Some(relation_type))
.map(|record| record.id)
.collect()
}
fn property_equal(&self, key: PropertyKeyId, value: &PropertyValue) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, candidate_value)| {
*candidate_key == key && candidate_value.as_ref() == value
})
.map(|(subject, _key, _value)| subject)
.collect()
}
fn typed_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value(key, value)?;
Ok(self.property_equal(key, value))
}
fn typed_property_equal_for_family(
&self,
key: PropertyKeyId,
family: PropertyFamily,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value_for_family(key, family, value)?;
Ok(self.property_equal(key, value))
}
fn property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, value)| {
*candidate_key == key && value.as_ref() >= min && value.as_ref() <= max
})
.map(|(subject, _key, _value)| subject)
.collect()
}
fn typed_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.validate_lookup_value(key, min)?;
self.validate_lookup_value(key, max)?;
if min > max {
return Ok(Vec::new());
}
Ok(self.property_range(key, min, max))
}
fn typed_property_composite_equal(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Result<Vec<PropertySubject>, DbError> {
if keys.len() != values.len() {
return Err(DbError::unsupported(
"composite equality tuple arity mismatch",
));
}
for (key, value) in keys.iter().copied().zip(values) {
self.validate_lookup_value(key, value)?;
}
Ok(self
.properties_by_subject()
.into_iter()
.filter_map(|(subject, subject_values)| {
keys.iter()
.copied()
.zip(values)
.all(|(key, value)| subject_values.get(&key) == Some(value))
.then_some(subject)
})
.collect())
}
fn validate_lookup_value(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<(), DbError> {
let definition = self
.catalog()
.property_key(key)
.ok_or_else(|| DbError::unknown(key))?;
let actual = value.value_type();
if definition.value_type != actual {
return Err(DbError::Query(
crate::error::QueryError::PropertyTypeMismatch {
expected: definition.value_type,
actual,
},
));
}
Ok(())
}
fn validate_lookup_value_for_family(
&self,
key: PropertyKeyId,
family: PropertyFamily,
value: &PropertyValue,
) -> Result<(), DbError> {
let definition = self
.catalog()
.property_key(key)
.ok_or_else(|| DbError::unknown(key))?;
if definition.family != family {
return Err(DbError::Query(
crate::error::QueryError::WrongPropertyFamily {
expected: definition.family,
actual: family,
},
));
}
if definition.value_type != value.value_type() {
return Err(DbError::Query(
crate::error::QueryError::PropertyTypeMismatch {
expected: definition.value_type,
actual: value.value_type(),
},
));
}
Ok(())
}
fn properties_by_subject(
&self,
) -> BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> {
let mut grouped: BTreeMap<PropertySubject, BTreeMap<PropertyKeyId, PropertyValue>> =
BTreeMap::new();
for (subject, key, value) in self.properties() {
grouped
.entry(subject)
.or_default()
.insert(key, value.into_owned());
}
grouped
}
}
pub(crate) trait OverlayLayer {
fn elements(&self) -> &Delta<ElementRecord>;
fn relations(&self) -> &Delta<RelationRecord>;
fn incidences(&self) -> &Delta<IncidenceRecord>;
fn properties(&self) -> &BTreeMap<PropertySubject, SubjectDelta>;
fn catalog(&self) -> &Catalog;
fn next_ids(&self) -> NextIds;
fn index(&self) -> &OverlayIndex;
}
impl OverlayLayer for Overlay {
fn elements(&self) -> &Delta<ElementRecord> {
&self.elements
}
fn relations(&self) -> &Delta<RelationRecord> {
&self.relations
}
fn incidences(&self) -> &Delta<IncidenceRecord> {
&self.incidences
}
fn properties(&self) -> &BTreeMap<PropertySubject, SubjectDelta> {
&self.properties
}
fn catalog(&self) -> &Catalog {
&self.catalog
}
fn next_ids(&self) -> NextIds {
self.next
}
fn index(&self) -> &OverlayIndex {
&self.index
}
}
impl OverlayLayer for WriteOverlay {
fn elements(&self) -> &Delta<ElementRecord> {
&self.elements
}
fn relations(&self) -> &Delta<RelationRecord> {
&self.relations
}
fn incidences(&self) -> &Delta<IncidenceRecord> {
&self.incidences
}
fn properties(&self) -> &BTreeMap<PropertySubject, SubjectDelta> {
&self.properties
}
fn catalog(&self) -> &Catalog {
&self.catalog
}
fn next_ids(&self) -> NextIds {
self.next
}
fn index(&self) -> &OverlayIndex {
&self.index
}
}
#[derive(Clone, Copy)]
pub(crate) struct LayeredState<'a, L: OverlayLayer> {
pub(super) base: &'a BaseRecords,
pub(super) overlay: &'a L,
}
impl<'a, L: OverlayLayer> LayeredState<'a, L> {
pub(crate) const fn new(base: &'a BaseRecords, overlay: &'a L) -> Self {
Self { base, overlay }
}
pub(crate) fn element_ref(&self, id: ElementId) -> Option<Cow<'a, ElementRecord>> {
match self.overlay.elements().get(&id) {
Some(Some(record)) => Some(Cow::Owned(record.clone())),
Some(None) => None,
None => self.base.element(id).map(Cow::Borrowed),
}
}
pub(crate) fn relation_ref(&self, id: RelationId) -> Option<Cow<'a, RelationRecord>> {
match self.overlay.relations().get(&id) {
Some(Some(record)) => Some(Cow::Owned(record.clone())),
Some(None) => None,
None => self.base.relation(id).map(Cow::Borrowed),
}
}
pub(crate) fn incidence_ref(&self, id: IncidenceId) -> Option<Cow<'a, IncidenceRecord>> {
match self.overlay.incidences().get(&id) {
Some(Some(record)) => Some(Cow::Owned(*record)),
Some(None) => None,
None => self.base.incidence(id).map(Cow::Borrowed),
}
}
pub(crate) fn property_ref(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'a, PropertyValue>> {
match self
.overlay
.properties()
.get(&subject)
.and_then(|keys| keys.get(&key))
{
Some(Some(value)) => Some(Cow::Owned(value.clone())),
Some(None) => None,
None => self.base.property(subject, key).map(Cow::Borrowed),
}
}
pub(crate) fn subject_properties(
&self,
subject: PropertySubject,
) -> Vec<(PropertyKeyId, PropertyValue)> {
let mut merged: BTreeMap<PropertyKeyId, PropertyValue> = self
.base
.properties
.get(&subject)
.cloned()
.unwrap_or_default();
let overlay_keys = self.overlay.properties().get(&subject);
for (key, value) in overlay_keys.into_iter().flat_map(|keys| keys.iter()) {
if let Some(value) = value {
merged.insert(*key, value.clone());
} else {
merged.remove(key);
}
}
merged.into_iter().collect()
}
pub(crate) fn property_key_subjects(
&self,
key: PropertyKeyId,
) -> Vec<(PropertySubject, PropertyValue)> {
self.overlay
.index()
.property_key_subjects(self.base.index(), key)
}
pub(crate) fn catalog_ref(&self) -> &'a Catalog {
self.overlay.catalog()
}
}
pub(crate) type MergedState<'a> = LayeredState<'a, Overlay>;
pub(crate) type WriteMergedState<'a> = LayeredState<'a, WriteOverlay>;
impl<L: OverlayLayer> StateView for LayeredState<'_, L> {
fn element(&self, id: ElementId) -> Option<Cow<'_, ElementRecord>> {
self.element_ref(id)
}
fn relation(&self, id: RelationId) -> Option<Cow<'_, RelationRecord>> {
self.relation_ref(id)
}
fn incidence(&self, id: IncidenceId) -> Option<Cow<'_, IncidenceRecord>> {
self.incidence_ref(id)
}
fn property(
&self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Option<Cow<'_, PropertyValue>> {
self.property_ref(subject, key)
}
fn elements(&self) -> impl Iterator<Item = Cow<'_, ElementRecord>> {
MergeIter::new(self.base.elements.values(), self.overlay.elements().iter())
}
fn relations(&self) -> impl Iterator<Item = Cow<'_, RelationRecord>> {
MergeIter::new(
self.base.relations.values(),
self.overlay.relations().iter(),
)
}
fn incidences(&self) -> impl Iterator<Item = Cow<'_, IncidenceRecord>> {
MergeIter::new(
self.base.incidences.values(),
self.overlay.incidences().iter(),
)
}
fn properties(
&self,
) -> impl Iterator<Item = (PropertySubject, PropertyKeyId, Cow<'_, PropertyValue>)> {
PropertyMergeIter::new(
base_property_triples(self.base),
overlay_property_triples(self.overlay.properties()),
)
}
fn catalog(&self) -> &Catalog {
self.overlay.catalog()
}
fn next_ids(&self) -> NextIds {
self.overlay.next_ids()
}
fn elements_with_label(&self, label: LabelId) -> Vec<ElementId> {
self.overlay
.index()
.elements_with_label(self.base.index(), label)
}
fn relations_with_type(&self, relation_type: RelationTypeId) -> Vec<RelationId> {
self.overlay
.index()
.relations_with_type(self.base.index(), relation_type)
}
fn element_incidences(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.overlay
.index()
.element_incidences(self.base.index(), element)
.into_iter()
.filter_map(|id| self.incidence_ref(id).map(Cow::into_owned))
.collect()
}
fn relation_incidences(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.overlay
.index()
.relation_incidences(self.base.index(), relation)
.into_iter()
.filter_map(|id| self.incidence_ref(id).map(Cow::into_owned))
.collect()
}
fn property_equal(&self, key: PropertyKeyId, value: &PropertyValue) -> Vec<PropertySubject> {
self.overlay
.index()
.property_equal(self.base.index(), key, value)
}
fn property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.overlay
.index()
.property_range(self.base.index(), key, min, max)
}
fn typed_property_composite_equal(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Result<Vec<PropertySubject>, DbError> {
if keys.len() != values.len() {
return Err(DbError::unsupported(
"composite equality tuple arity mismatch",
));
}
for (key, value) in keys.iter().copied().zip(values) {
self.validate_lookup_value(key, value)?;
}
let pairs: Vec<(PropertyKeyId, PropertyValue)> =
keys.iter().copied().zip(values.iter().cloned()).collect();
Ok(self
.overlay
.index()
.property_composite_equal(self.base.index(), &pairs))
}
}
#[cfg(test)]
impl<L: OverlayLayer> LayeredState<'_, L> {
pub(crate) fn elements_with_label_scan(&self, label: LabelId) -> Vec<ElementId> {
self.elements()
.filter(|record| record.labels.contains(&label))
.map(|record| record.id)
.collect()
}
pub(crate) fn relations_with_type_scan(
&self,
relation_type: RelationTypeId,
) -> Vec<RelationId> {
self.relations()
.filter(|record| record.relation_type == Some(relation_type))
.map(|record| record.id)
.collect()
}
pub(crate) fn element_incidences_scan(&self, element: ElementId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.element == element)
.map(Cow::into_owned)
.collect()
}
pub(crate) fn relation_incidences_scan(&self, relation: RelationId) -> Vec<IncidenceRecord> {
self.incidences()
.filter(|record| record.relation == relation)
.map(Cow::into_owned)
.collect()
}
pub(crate) fn property_equal_scan(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, candidate_value)| {
*candidate_key == key && candidate_value.as_ref() == value
})
.map(|(subject, _key, _value)| subject)
.collect()
}
pub(crate) fn property_range_scan(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Vec<PropertySubject> {
self.properties()
.filter(|(_subject, candidate_key, value)| {
*candidate_key == key && value.as_ref() >= min && value.as_ref() <= max
})
.map(|(subject, _key, _value)| subject)
.collect()
}
pub(crate) fn property_composite_equal_scan(
&self,
keys: &[PropertyKeyId],
values: &[PropertyValue],
) -> Vec<PropertySubject> {
self.properties_by_subject()
.into_iter()
.filter_map(|(subject, subject_values)| {
keys.iter()
.copied()
.zip(values)
.all(|(key, value)| subject_values.get(&key) == Some(value))
.then_some(subject)
})
.collect()
}
}
struct MergeIter<'a, R: Keyed + Clone> {
base: std::iter::Peekable<btree_map::Values<'a, R::Id, R>>,
overlay: std::iter::Peekable<btree_map::Iter<'a, R::Id, Option<R>>>,
}
impl<'a, R: Keyed + Clone> MergeIter<'a, R> {
fn new(
base: btree_map::Values<'a, R::Id, R>,
overlay: btree_map::Iter<'a, R::Id, Option<R>>,
) -> Self {
Self {
base: base.peekable(),
overlay: overlay.peekable(),
}
}
fn take_base(&mut self) -> Option<Cow<'a, R>> {
self.base.next().map(Cow::Borrowed)
}
fn take_overlay(&mut self, mask_base: bool) -> Step<Cow<'a, R>> {
if mask_base {
let _masked = self.base.next();
}
let Some((_id, entry)) = self.overlay.next() else {
return Step::Done;
};
entry.as_ref().map_or(Step::Again, |record| {
Step::Yield(Cow::Owned(record.clone()))
})
}
fn step(&mut self) -> Step<Cow<'a, R>> {
let base_id = self.base.peek().map(|record| record.record_id());
let overlay_id = self.overlay.peek().map(|(id, _entry)| **id);
match (base_id, overlay_id) {
(None, None) => Step::Done,
(Some(_base), None) => self.take_base().map_or(Step::Done, Step::Yield),
(Some(base), Some(overlay)) if base < overlay => {
self.take_base().map_or(Step::Done, Step::Yield)
}
(_other, Some(overlay)) => self.take_overlay(base_id == Some(overlay)),
}
}
}
impl<'a, R: Keyed + Clone> Iterator for MergeIter<'a, R> {
type Item = Cow<'a, R>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.step() {
Step::Done => return None,
Step::Yield(item) => return Some(item),
Step::Again => {}
}
}
}
}
enum Step<T> {
Done,
Yield(T),
Again,
}
struct PropertyMergeIter<'a> {
base: Vec<(PropertySubject, PropertyKeyId, &'a PropertyValue)>,
overlay: Vec<(PropertySubject, PropertyKeyId, &'a Option<PropertyValue>)>,
base_index: usize,
overlay_index: usize,
}
impl<'a> PropertyMergeIter<'a> {
const fn new(
base: Vec<(PropertySubject, PropertyKeyId, &'a PropertyValue)>,
overlay: Vec<(PropertySubject, PropertyKeyId, &'a Option<PropertyValue>)>,
) -> Self {
Self {
base,
overlay,
base_index: 0,
overlay_index: 0,
}
}
}
impl<'a> PropertyMergeIter<'a> {
fn take_base(&mut self) -> Option<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
let (subject, key, value) = *self.base.get(self.base_index)?;
self.base_index += 1;
Some((subject, key, Cow::Borrowed(value)))
}
fn take_overlay(
&mut self,
mask_base: bool,
) -> Step<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
if mask_base {
self.base_index += 1;
}
let Some(&(subject, key, entry)) = self.overlay.get(self.overlay_index) else {
return Step::Done;
};
self.overlay_index += 1;
entry.as_ref().map_or(Step::Again, |value| {
Step::Yield((subject, key, Cow::Owned(value.clone())))
})
}
fn step(&mut self) -> Step<(PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>)> {
let base_pair = self
.base
.get(self.base_index)
.map(|&(subject, key, _value)| (subject, key));
let overlay_pair = self
.overlay
.get(self.overlay_index)
.map(|&(subject, key, _entry)| (subject, key));
match (base_pair, overlay_pair) {
(None, None) => Step::Done,
(Some(_base), None) => self.take_base().map_or(Step::Done, Step::Yield),
(Some(base), Some(overlay)) if base < overlay => {
self.take_base().map_or(Step::Done, Step::Yield)
}
(_other, Some(overlay)) => self.take_overlay(base_pair == Some(overlay)),
}
}
}
impl<'a> Iterator for PropertyMergeIter<'a> {
type Item = (PropertySubject, PropertyKeyId, Cow<'a, PropertyValue>);
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.step() {
Step::Done => return None,
Step::Yield(item) => return Some(item),
Step::Again => {}
}
}
}
}
fn base_property_triples(
base: &BaseRecords,
) -> Vec<(PropertySubject, PropertyKeyId, &PropertyValue)> {
base.properties
.iter()
.flat_map(|(subject, keys)| keys.iter().map(move |(key, value)| (*subject, *key, value)))
.collect()
}
fn overlay_property_triples(
properties: &BTreeMap<PropertySubject, SubjectDelta>,
) -> Vec<(PropertySubject, PropertyKeyId, &Option<PropertyValue>)> {
properties
.iter()
.flat_map(|(subject, keys)| keys.iter().map(move |(key, value)| (*subject, *key, value)))
.collect()
}