use std::ops::Bound::{Excluded, Included, Unbounded};
use selene_core::{DbString, LabelSet, Value};
use selene_graph::RowIndex;
use crate::{
FilterPredicate, FilterPredicateKind, IndexKey, IndexKind, LabelExpr, NodeOrEdgeScan,
PatternPlan, ScanAccess, ScanKind, TypedIndexBounds,
runtime::{Binding, BindingTableSchema, ExecutorError},
};
use super::scan_resolve::{
IndexKeyOutcome, ResolvedBounds, range_satisfiable_runtime, resolve_bitmap_union_key_values,
resolve_bounds, resolve_index_key,
};
use super::{EvalCtx, evaluator, scan_bind, scan_seed, value_compare};
pub(crate) fn scan_pattern(
scan: &NodeOrEdgeScan,
pattern: &PatternPlan,
schema: &BindingTableSchema,
seed: Option<&Binding>,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<Binding>, ExecutorError> {
Ok(scan_entities(scan, pattern, schema, seed, ctx)?
.into_iter()
.map(|(_, binding)| binding)
.collect())
}
pub(crate) fn scan_entities(
scan: &NodeOrEdgeScan,
pattern: &PatternPlan,
schema: &BindingTableSchema,
seed: Option<&Binding>,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<(Value, Binding)>, ExecutorError> {
let slots = scan_bind::ScanSlots::resolve(scan, pattern, schema)?;
match seed {
Some(seed) => scan_entities_with_seed(scan, pattern, schema, seed, slots, ctx),
None => collect_scan_entities(scan, pattern, schema, None, slots, ctx),
}
}
fn scan_entities_with_seed(
scan: &NodeOrEdgeScan,
pattern: &PatternPlan,
schema: &BindingTableSchema,
seed: &Binding,
slots: scan_bind::ScanSlots,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<(Value, Binding)>, ExecutorError> {
if let Some(rows) = scan_seed::try_seeded_scan(scan, pattern, schema, seed, slots, ctx)? {
return Ok(rows);
}
collect_scan_entities(scan, pattern, schema, Some(seed), slots, ctx)
}
fn collect_scan_entities(
scan: &NodeOrEdgeScan,
pattern: &PatternPlan,
schema: &BindingTableSchema,
seed: Option<&Binding>,
slots: scan_bind::ScanSlots,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<(Value, Binding)>, ExecutorError> {
let mut rows = Vec::new();
for row in candidate_rows(scan, ctx)? {
if !label_matches_scan(scan, row, ctx) {
continue;
}
let Some(entity) = entity_value(scan.kind, row, ctx) else {
continue;
};
let Some(binding) = scan_bind::binding_for_scan(schema, seed, entity.clone(), slots) else {
continue;
};
if predicates_pass(scan, pattern, &binding, schema, &entity, ctx)? {
rows.push((entity, binding));
}
}
Ok(rows)
}
pub(super) fn candidate_rows(
scan: &NodeOrEdgeScan,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<u32>, ExecutorError> {
match &scan.access {
ScanAccess::Linear => Ok(linear_rows(scan.kind, ctx)),
ScanAccess::LabelIndex { .. } => Ok(label_index_rows(scan, ctx)),
ScanAccess::TypedIndexRange {
property,
kind,
bounds,
..
} => typed_index_rows(scan, property.clone(), *kind, bounds, ctx),
ScanAccess::BitmapUnion {
property,
kind,
keys,
..
} => bitmap_union_rows(scan, property.clone(), *kind, keys, ctx),
ScanAccess::CompositeLookup {
properties, keys, ..
} => composite_lookup_rows(scan, properties, keys, ctx),
}
}
fn linear_rows(kind: ScanKind, ctx: &EvalCtx<'_, '_, '_, '_>) -> Vec<u32> {
match kind {
ScanKind::Node => ctx.tx.snapshot().node_store.alive.iter().collect(),
ScanKind::Edge => ctx.tx.snapshot().edge_store.alive.iter().collect(),
}
}
fn label_index_rows(scan: &NodeOrEdgeScan, ctx: &EvalCtx<'_, '_, '_, '_>) -> Vec<u32> {
let Some(label) = single_label(&scan.label_predicate) else {
return linear_rows(scan.kind, ctx);
};
match scan.kind {
ScanKind::Node => ctx
.tx
.snapshot()
.nodes_with_label(&label)
.map(|rows| rows.iter().collect())
.unwrap_or_default(),
ScanKind::Edge => ctx
.tx
.snapshot()
.edges_with_label(&label)
.map(|rows| rows.iter().collect())
.unwrap_or_default(),
}
}
fn typed_index_rows(
scan: &NodeOrEdgeScan,
property: DbString,
kind: IndexKind,
bounds: &TypedIndexBounds,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<u32>, ExecutorError> {
let Some(resolved) = resolve_bounds(bounds, kind, ctx)? else {
return Ok(Vec::new());
};
if !range_satisfiable_runtime(&resolved) {
return Ok(Vec::new());
}
let Some(label) = single_label(&scan.label_predicate) else {
return Ok(linear_rows_filtered_by_resolved_bounds(
scan, property, &resolved, ctx,
));
};
let indexed_rows = match &resolved {
ResolvedBounds::Equality(value) => {
property_eq_row_vec(ctx.tx.snapshot(), scan.kind, &label, &property, value)
}
ResolvedBounds::GreaterThan(value) => property_range_row_vec(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
(Excluded(value.clone()), Unbounded),
),
ResolvedBounds::GreaterEqual(value) => property_range_row_vec(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
(Included(value.clone()), Unbounded),
),
ResolvedBounds::LessThan(value) => property_range_row_vec(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
(Unbounded, Excluded(value.clone())),
),
ResolvedBounds::LessEqual(value) => property_range_row_vec(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
(Unbounded, Included(value.clone())),
),
ResolvedBounds::Range {
lo,
lo_inclusive,
hi,
hi_inclusive,
} => {
let lo_bound = if *lo_inclusive {
Included(lo.clone())
} else {
Excluded(lo.clone())
};
let hi_bound = if *hi_inclusive {
Included(hi.clone())
} else {
Excluded(hi.clone())
};
property_range_row_vec(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
(lo_bound, hi_bound),
)
}
};
Ok(indexed_rows
.unwrap_or_else(|| linear_rows_filtered_by_resolved_bounds(scan, property, &resolved, ctx)))
}
fn bitmap_union_rows(
scan: &NodeOrEdgeScan,
property: DbString,
kind: IndexKind,
keys: &[IndexKey],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<u32>, ExecutorError> {
let mut resolved_keys: Vec<Value> = Vec::with_capacity(keys.len());
for key in keys {
resolved_keys.extend(resolve_bitmap_union_key_values(key, kind, ctx)?);
}
if resolved_keys.is_empty() && !keys.is_empty() {
return Ok(Vec::new());
}
let Some(label) = single_label(&scan.label_predicate) else {
return Ok(linear_rows(scan.kind, ctx)
.into_iter()
.filter(|row| {
property_matches_any_resolved(scan.kind, *row, &property, &resolved_keys, ctx)
})
.collect());
};
if let Some(rows) = property_any_row_bitmap(
ctx.tx.snapshot(),
scan.kind,
&label,
&property,
&resolved_keys,
) {
return Ok(rows.iter().collect());
}
Ok(linear_rows(scan.kind, ctx)
.into_iter()
.filter(|row| {
property_matches_any_resolved(scan.kind, *row, &property, &resolved_keys, ctx)
})
.collect())
}
fn property_any_row_bitmap(
snapshot: &selene_graph::SeleneGraph,
kind: ScanKind,
label: &DbString,
property: &DbString,
values: &[Value],
) -> Option<roaring::RoaringBitmap> {
match kind {
ScanKind::Node => snapshot.nodes_with_property_any(label, property, values),
ScanKind::Edge => snapshot.edges_with_property_any(label, property, values),
}
}
fn property_eq_row_vec(
snapshot: &selene_graph::SeleneGraph,
kind: ScanKind,
label: &DbString,
property: &DbString,
value: &Value,
) -> Option<Vec<u32>> {
match kind {
ScanKind::Node => snapshot.nodes_with_property_eq(label, property, value),
ScanKind::Edge => snapshot.edges_with_property_eq(label, property, value),
}
.map(|rows| rows.iter().collect())
}
fn property_range_row_vec<R>(
snapshot: &selene_graph::SeleneGraph,
kind: ScanKind,
label: &DbString,
property: &DbString,
range: R,
) -> Option<Vec<u32>>
where
R: std::ops::RangeBounds<Value>,
{
match kind {
ScanKind::Node => snapshot.nodes_with_property_range(label, property, range),
ScanKind::Edge => snapshot.edges_with_property_range(label, property, range),
}
.map(|rows| rows.iter().collect())
}
fn composite_lookup_rows(
scan: &NodeOrEdgeScan,
properties: &[(DbString, IndexKind)],
keys: &[(DbString, IndexKey)],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Vec<u32>, ExecutorError> {
let Some(resolved_values) = resolve_composite_values(properties, keys, ctx)? else {
return Ok(Vec::new());
};
if scan.kind != ScanKind::Node {
return Ok(linear_rows_filtered_by_resolved_composite(
scan,
properties,
&resolved_values,
ctx,
));
}
let Some(label) = single_label(&scan.label_predicate) else {
return Ok(linear_rows_filtered_by_resolved_composite(
scan,
properties,
&resolved_values,
ctx,
));
};
let property_keys: Vec<DbString> = properties
.iter()
.map(|(property, _)| property.clone())
.collect();
if let Some(index) = ctx
.tx
.snapshot()
.composite_property_index_for(&label, &property_keys)
{
let refs = resolved_values.iter().collect::<Vec<_>>();
if let Ok(key) = index.key_from_values(&refs) {
return Ok(index
.lookup_key(&key)
.map(|bitmap| bitmap.iter().collect())
.unwrap_or_default());
}
}
Ok(linear_rows_filtered_by_resolved_composite(
scan,
properties,
&resolved_values,
ctx,
))
}
pub(super) fn resolve_composite_values(
properties: &[(DbString, IndexKind)],
keys: &[(DbString, IndexKey)],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<Option<Vec<Value>>, ExecutorError> {
let mut out = Vec::with_capacity(properties.len());
for (property, kind) in properties {
let Some((_, key)) = keys.iter().find(|(name, _)| name == property) else {
return Ok(None);
};
match resolve_index_key(key, *kind, ctx)? {
IndexKeyOutcome::Value(value) => out.push(value),
IndexKeyOutcome::EmptyResult => return Ok(None),
}
}
Ok(Some(out))
}
fn linear_rows_filtered_by_resolved_composite(
scan: &NodeOrEdgeScan,
properties: &[(DbString, IndexKind)],
values: &[Value],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Vec<u32> {
linear_rows(scan.kind, ctx)
.into_iter()
.filter(|row| row_matches_resolved_composite(scan.kind, *row, properties, values, ctx))
.collect()
}
pub(super) fn row_matches_resolved_composite(
kind: ScanKind,
row: u32,
properties: &[(DbString, IndexKind)],
values: &[Value],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
properties
.iter()
.zip(values.iter())
.all(|((property, _), expected)| {
property_value(kind, row, property, ctx)
.is_some_and(|value| value_eq_non_null(value, expected))
})
}
fn linear_rows_filtered_by_resolved_bounds(
scan: &NodeOrEdgeScan,
property: DbString,
resolved: &ResolvedBounds,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Vec<u32> {
linear_rows(scan.kind, ctx)
.into_iter()
.filter(|row| row_matches_resolved_bounds(scan.kind, *row, &property, resolved, ctx))
.collect()
}
pub(super) fn row_matches_resolved_bounds(
kind: ScanKind,
row: u32,
property: &DbString,
resolved: &ResolvedBounds,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
property_value(kind, row, property, ctx)
.is_some_and(|value| value_matches_resolved_bounds(value, resolved))
}
#[inline]
pub(crate) fn predicates_pass(
scan: &NodeOrEdgeScan,
pattern: &PatternPlan,
binding: &Binding,
schema: &BindingTableSchema,
entity: &Value,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<bool, ExecutorError> {
for predicate in &scan.property_predicates {
if !predicate_passes(predicate, pattern, binding, schema, entity, ctx)? {
return Ok(false);
}
}
Ok(true)
}
pub(crate) fn predicate_passes(
predicate: &FilterPredicate,
pattern: &PatternPlan,
binding: &Binding,
schema: &BindingTableSchema,
entity: &Value,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<bool, ExecutorError> {
if predicate.index_consumed {
return Ok(true);
}
match &predicate.kind {
FilterPredicateKind::Expression => {
let value = evaluator::evaluate(&predicate.expr, binding, schema, ctx)?;
Ok(matches!(value, Value::Bool(true)))
}
FilterPredicateKind::PropertyEquals {
binding: property_binding,
key,
} => {
let target = property_binding
.and_then(|binding_id| value_for_binding(pattern, binding_id, binding, schema))
.unwrap_or_else(|| entity.clone());
let property = match &target {
Value::NodeRef(id) => ctx
.tx
.snapshot()
.node_properties(*id)
.and_then(|properties| properties.get(key))
.cloned(),
Value::EdgeRef(id) => ctx
.tx
.snapshot()
.edge_properties(*id)
.and_then(|properties| properties.get(key))
.cloned(),
Value::Null => None,
_ => None,
}
.unwrap_or(Value::Null);
let expected = evaluator::evaluate(&predicate.expr, binding, schema, ctx)?;
Ok(value_eq_non_null(&property, &expected))
}
}
}
pub(crate) fn value_for_binding(
pattern: &PatternPlan,
binding_id: crate::BindingId,
binding: &Binding,
schema: &BindingTableSchema,
) -> Option<Value> {
let binding_def = pattern
.bindings
.iter()
.find(|candidate| candidate.binding == binding_id)?;
let index = schema
.columns
.iter()
.position(|column| column.name == Some(binding_def.name.clone()))?;
binding.get(index).cloned()
}
#[inline]
pub(super) fn label_matches_scan(
scan: &NodeOrEdgeScan,
row: u32,
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
let Some(label_expr) = &scan.label_predicate else {
return true;
};
let snapshot = ctx.tx.snapshot();
match scan.kind {
ScanKind::Node => {
let Some(id) = snapshot.node_id_for_row(RowIndex::new(row)) else {
return false;
};
snapshot
.node_labels(id)
.is_some_and(|labels| label_matches_node(label_expr, labels))
}
ScanKind::Edge => {
let Some(id) = snapshot.edge_id_for_row(RowIndex::new(row)) else {
return false;
};
snapshot
.edge_label(id)
.is_some_and(|label| label_matches_edge(label_expr, label.clone()))
}
}
}
pub(crate) fn label_matches_node(expr: &LabelExpr, labels: &LabelSet) -> bool {
match expr {
LabelExpr::Single(label) => labels.contains(label),
LabelExpr::Conjunction(parts) => parts.iter().all(|part| label_matches_node(part, labels)),
LabelExpr::Disjunction(parts) => parts.iter().any(|part| label_matches_node(part, labels)),
LabelExpr::Negation(part) => !label_matches_node(part, labels),
LabelExpr::Wildcard => true,
}
}
pub(crate) fn label_matches_edge(expr: &LabelExpr, label: DbString) -> bool {
match expr {
LabelExpr::Single(expected) => *expected == label,
LabelExpr::Conjunction(parts) => parts
.iter()
.all(|part| label_matches_edge(part, label.clone())),
LabelExpr::Disjunction(parts) => parts
.iter()
.any(|part| label_matches_edge(part, label.clone())),
LabelExpr::Negation(part) => !label_matches_edge(part, label),
LabelExpr::Wildcard => true,
}
}
fn single_label(label: &Option<LabelExpr>) -> Option<DbString> {
match label {
Some(LabelExpr::Single(label)) => Some(label.clone()),
_ => None,
}
}
#[inline]
fn entity_value(kind: ScanKind, row: u32, ctx: &EvalCtx<'_, '_, '_, '_>) -> Option<Value> {
let snapshot = ctx.tx.snapshot();
match kind {
ScanKind::Node => snapshot
.node_id_for_row(RowIndex::new(row))
.map(Value::NodeRef),
ScanKind::Edge => snapshot
.edge_id_for_row(RowIndex::new(row))
.map(Value::EdgeRef),
}
}
pub(super) fn property_matches_any_resolved(
kind: ScanKind,
row: u32,
property: &DbString,
values: &[Value],
ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
property_value(kind, row, property, ctx).is_some_and(|value| {
values
.iter()
.any(|expected| value_eq_non_null(value, expected))
})
}
fn property_value<'a>(
kind: ScanKind,
row: u32,
property: &DbString,
ctx: &'a EvalCtx<'_, '_, '_, '_>,
) -> Option<&'a Value> {
let snapshot = ctx.tx.snapshot();
match kind {
ScanKind::Node => snapshot
.node_id_for_row(RowIndex::new(row))
.and_then(|id| snapshot.node_properties(id))
.and_then(|properties| properties.get(property)),
ScanKind::Edge => snapshot
.edge_id_for_row(RowIndex::new(row))
.and_then(|id| snapshot.edge_properties(id))
.and_then(|properties| properties.get(property)),
}
}
fn value_matches_resolved_bounds(value: &Value, resolved: &ResolvedBounds) -> bool {
match resolved {
ResolvedBounds::Equality(expected) => value_eq_non_null(value, expected),
ResolvedBounds::GreaterThan(expected) => {
value_compare::compare_non_null(value, expected) == Some(std::cmp::Ordering::Greater)
}
ResolvedBounds::GreaterEqual(expected) => matches!(
value_compare::compare_non_null(value, expected),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
ResolvedBounds::LessThan(expected) => {
value_compare::compare_non_null(value, expected) == Some(std::cmp::Ordering::Less)
}
ResolvedBounds::LessEqual(expected) => matches!(
value_compare::compare_non_null(value, expected),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
ResolvedBounds::Range {
lo,
lo_inclusive,
hi,
hi_inclusive,
} => {
let Some(lo_order) = value_compare::compare_non_null(value, lo) else {
return false;
};
let Some(hi_order) = value_compare::compare_non_null(value, hi) else {
return false;
};
let lo_ok = if *lo_inclusive {
matches!(
lo_order,
std::cmp::Ordering::Greater | std::cmp::Ordering::Equal
)
} else {
lo_order == std::cmp::Ordering::Greater
};
let hi_ok = if *hi_inclusive {
matches!(
hi_order,
std::cmp::Ordering::Less | std::cmp::Ordering::Equal
)
} else {
hi_order == std::cmp::Ordering::Less
};
lo_ok && hi_ok
}
}
}
fn value_eq_non_null(lhs: &Value, rhs: &Value) -> bool {
if matches!(lhs, Value::Null) || matches!(rhs, Value::Null) {
return false;
}
value_compare::equal_non_null(lhs, rhs)
}