use rustc_hash::FxHashMap;
use selene_core::{DbString, LabelSet, PropertyMap, Value};
use std::collections::BTreeSet;
use crate::error::{GraphError, GraphResult};
use crate::graph::PropertyIndexEntry;
use crate::typed_index::{TypedIndex, TypedIndexKind, TypedIndexValueError};
type PropertyIndexMap = FxHashMap<(DbString, DbString), PropertyIndexEntry>;
pub(crate) fn apply_node_create(
indexes: &mut PropertyIndexMap,
labels: &LabelSet,
props: &PropertyMap,
row: u32,
) -> GraphResult<()> {
for label in labels.iter() {
for (property, value) in props.iter() {
if is_null(value) {
continue;
}
insert_commit(indexes, label.clone(), property.clone(), value, row)?;
}
}
Ok(())
}
pub(crate) fn apply_node_delete(
indexes: &mut PropertyIndexMap,
labels: &LabelSet,
props: &PropertyMap,
row: u32,
) -> GraphResult<()> {
for label in labels.iter() {
for (property, value) in props.iter() {
if is_null(value) {
continue;
}
remove_commit(indexes, label.clone(), property.clone(), value, row)?;
}
}
Ok(())
}
pub(crate) fn apply_node_update(
indexes: &mut PropertyIndexMap,
old_labels: &LabelSet,
old_props: &PropertyMap,
new_labels: &LabelSet,
new_props: &PropertyMap,
row: u32,
) -> GraphResult<()> {
let candidates = candidate_keys(indexes, old_labels, old_props, new_labels, new_props);
for (label, property) in candidates {
let old_value = indexable_value(old_labels, old_props, &label, &property);
let new_value = indexable_value(new_labels, new_props, &label, &property);
if values_share_key(
indexes,
label.clone(),
property.clone(),
old_value,
new_value,
) {
continue;
}
if let Some(value) = old_value {
remove_commit(indexes, label.clone(), property.clone(), value, row)?;
}
if let Some(value) = new_value {
insert_commit(indexes, label.clone(), property.clone(), value, row)?;
}
}
Ok(())
}
fn candidate_keys(
indexes: &PropertyIndexMap,
old_labels: &LabelSet,
old_props: &PropertyMap,
new_labels: &LabelSet,
new_props: &PropertyMap,
) -> BTreeSet<(DbString, DbString)> {
if indexes.is_empty() {
return BTreeSet::new();
}
let mut labels: BTreeSet<DbString> = BTreeSet::new();
labels.extend(old_labels.iter().cloned());
labels.extend(new_labels.iter().cloned());
let mut properties: BTreeSet<DbString> = BTreeSet::new();
properties.extend(old_props.keys().cloned());
properties.extend(new_props.keys().cloned());
let mut candidates = BTreeSet::new();
for label in &labels {
for property in &properties {
let key = (label.clone(), property.clone());
if indexes.contains_key(&key) {
candidates.insert(key);
}
}
}
candidates
}
pub(crate) fn build_property_index(
graph: &crate::SeleneGraph,
label: DbString,
property: DbString,
kind: TypedIndexKind,
) -> GraphResult<TypedIndex> {
build_property_index_inner(graph, label, property, kind, BuildPolicy::Strict)
}
pub(crate) fn build_property_index_lenient(
graph: &crate::SeleneGraph,
label: DbString,
property: DbString,
kind: TypedIndexKind,
) -> GraphResult<TypedIndex> {
build_property_index_inner(graph, label, property, kind, BuildPolicy::Lenient)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum BuildPolicy {
Strict,
Lenient,
}
fn build_property_index_inner(
graph: &crate::SeleneGraph,
label: DbString,
property: DbString,
kind: TypedIndexKind,
policy: BuildPolicy,
) -> GraphResult<TypedIndex> {
let mut index = TypedIndex::new(kind);
for row_index in 0..graph.node_store.labels.len() {
let row = u32::try_from(row_index).map_err(|_| GraphError::Inconsistent {
reason: format!(
"node store row index {row_index} exceeds u32::MAX; selene-graph \
caps rows at u32::MAX",
),
})?;
if !graph.node_store.is_alive(row) {
continue;
}
let Some(labels) = graph.node_store.labels.get(row_index) else {
continue;
};
if !labels.contains(&label) {
continue;
}
let Some(props) = graph.node_store.properties.get(row_index) else {
continue;
};
let Some(value) = props.get(&property) else {
continue;
};
if is_null(value) {
continue;
}
match index.insert(value, row) {
Ok(()) => {}
Err(err) => match policy {
BuildPolicy::Strict => {
return Err(index_rejection(label.clone(), property.clone(), err));
}
BuildPolicy::Lenient => {
warn_rejected("rebuild", label.clone(), property.clone(), row, &err);
}
},
}
}
Ok(index)
}
pub(crate) fn rebuild_property_indexes(graph: &mut crate::SeleneGraph) -> GraphResult<()> {
let registrations: Vec<((DbString, DbString), TypedIndexKind, Option<DbString>)> = graph
.property_index
.iter()
.map(|(key, entry)| (key.clone(), entry.kind(), entry.name.clone()))
.collect();
graph.property_index.clear();
for ((label, property), kind, name) in registrations {
let index = build_property_index_lenient(graph, label.clone(), property.clone(), kind)?;
graph
.property_index
.insert((label, property), PropertyIndexEntry::new(index, name));
}
Ok(())
}
fn values_share_key(
indexes: &PropertyIndexMap,
label: DbString,
property: DbString,
old_value: Option<&Value>,
new_value: Option<&Value>,
) -> bool {
match (old_value, new_value) {
(None, None) => true,
(Some(old_value), Some(new_value)) => indexes
.get(&(label.clone(), property.clone()))
.is_some_and(|entry| entry.index.values_share_key(old_value, new_value)),
_ => false,
}
}
fn indexable_value<'a>(
labels: &LabelSet,
props: &'a PropertyMap,
label: &DbString,
property: &DbString,
) -> Option<&'a Value> {
if !labels.contains(label) {
return None;
}
props.get(property).filter(|value| !is_null(value))
}
fn insert_commit(
indexes: &mut PropertyIndexMap,
label: DbString,
property: DbString,
value: &Value,
row: u32,
) -> GraphResult<()> {
if let Some(index) = indexes.get_mut(&(label.clone(), property.clone()))
&& let Err(err) = std::sync::Arc::make_mut(&mut index.index).insert(value, row)
{
return demote_or_promote(label, property, row, "insert", err);
}
Ok(())
}
fn remove_commit(
indexes: &mut PropertyIndexMap,
label: DbString,
property: DbString,
value: &Value,
row: u32,
) -> GraphResult<()> {
if let Some(index) = indexes.get_mut(&(label.clone(), property.clone()))
&& let Err(err) = std::sync::Arc::make_mut(&mut index.index).remove(value, row)
{
return demote_or_promote(label, property, row, "remove", err);
}
Ok(())
}
fn demote_or_promote(
label: DbString,
property: DbString,
row: u32,
op: &'static str,
err: TypedIndexValueError,
) -> GraphResult<()> {
match err {
TypedIndexValueError::KindMismatch { .. } | TypedIndexValueError::NaN { .. } => {
warn_rejected(op, label, property, row, &err);
Ok(())
}
}
}
fn index_rejection(label: DbString, property: DbString, err: TypedIndexValueError) -> GraphError {
match err {
TypedIndexValueError::KindMismatch { .. } | TypedIndexValueError::NaN { .. } => {
GraphError::IndexValueRejected {
label,
property,
expected_kind: err.expected_kind(),
observed: err.observed(),
}
}
}
}
fn warn_rejected(
op: &'static str,
label: DbString,
property: DbString,
row: u32,
err: &TypedIndexValueError,
) {
tracing::warn!(
op,
%label,
%property,
row,
expected_kind = ?err.expected_kind(),
observed = err.observed(),
"skipped property-index update for value that does not match the registered index kind",
);
}
const fn is_null(value: &Value) -> bool {
matches!(value, Value::Null)
}
#[cfg(test)]
#[path = "property_index_tests.rs"]
mod tests;