use super::helpers::resolve_node_property;
use super::ResultRow;
use crate::datatypes::values::Value;
use crate::graph::core::pattern_matching::{NodePattern, Pattern, PatternElement, PropertyMatcher};
use crate::graph::schema::DirGraph;
use crate::graph::storage::GraphRead;
use petgraph::graph::NodeIndex;
use std::collections::HashMap;
pub(super) const TRANSIENT_INDEX_THRESHOLD: usize = 64;
pub(super) struct TransientEqIndex {
pub(super) bind_var: String,
#[allow(dead_code)]
pub(super) node_type: String,
#[allow(dead_code)]
pub(super) property: String,
pub(super) resolution: ProbeResolution,
pub(super) by_value: HashMap<Value, Vec<NodeIndex>>,
}
pub(super) enum ProbeResolution {
Projected(String),
NodeProp { var: String, prop: String },
}
impl TransientEqIndex {
pub(super) fn try_build(
graph: &DirGraph,
pattern: &Pattern,
existing_row_count: usize,
) -> Option<TransientEqIndex> {
if existing_row_count < TRANSIENT_INDEX_THRESHOLD {
return None;
}
let np = extract_single_node_pattern(pattern)?;
if !np.extra_labels.is_empty() {
return None;
}
let node_type = np.node_type.as_deref()?.to_string();
let bind_var = np.variable.as_deref()?.to_string();
let props = np.properties.as_ref()?;
if props.len() != 1 {
return None;
}
let (property, matcher) = props.iter().next()?;
let resolution = match matcher {
PropertyMatcher::EqualsVar(name) => ProbeResolution::Projected(name.clone()),
PropertyMatcher::EqualsNodeProp { var, prop } => ProbeResolution::NodeProp {
var: var.clone(),
prop: prop.clone(),
},
_ => return None,
};
if graph.has_any_index(&node_type, property) {
return None;
}
let nodes = graph.nodes_with_label(&node_type);
if nodes.is_empty() {
return None;
}
let mut by_value: HashMap<Value, Vec<NodeIndex>> = HashMap::with_capacity(nodes.len());
for idx in nodes {
if let Some(node) = graph.graph.node_weight(idx) {
let val = resolve_node_property(node, property, graph);
if !matches!(val, Value::Null) {
by_value.entry(val).or_default().push(idx);
}
}
}
Some(TransientEqIndex {
bind_var,
node_type,
property: property.clone(),
resolution,
by_value,
})
}
pub(super) fn probe_value(&self, row: &ResultRow, graph: &DirGraph) -> Option<Value> {
let value = match &self.resolution {
ProbeResolution::Projected(var) => row.projected.get(var.as_str()).cloned()?,
ProbeResolution::NodeProp { var, prop } => {
let idx = row.node_bindings.get(var.as_str())?;
let node = graph.graph.node_weight(*idx)?;
resolve_node_property(node, prop, graph)
}
};
if matches!(value, Value::Null) {
None
} else {
Some(value)
}
}
pub(super) fn lookup(&self, value: &Value) -> &[NodeIndex] {
self.by_value
.get(value)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
}
fn extract_single_node_pattern(pattern: &Pattern) -> Option<&NodePattern> {
if pattern.elements.len() != 1 {
return None;
}
match &pattern.elements[0] {
PatternElement::Node(np) => Some(np),
_ => None,
}
}