use std::collections::HashMap;
use std::fmt;
use grafeo_common::types::Value;
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum CountExpr {
Literal(usize),
Parameter(String),
}
impl CountExpr {
pub fn value(&self) -> usize {
match self {
Self::Literal(n) => *n,
Self::Parameter(name) => panic!("Unresolved parameter: ${name}"),
}
}
pub fn try_value(&self) -> Result<usize, String> {
match self {
Self::Literal(n) => Ok(*n),
Self::Parameter(name) => Err(format!("Unresolved SKIP/LIMIT parameter: ${name}")),
}
}
pub fn estimate(&self) -> f64 {
match self {
Self::Literal(n) => *n as f64,
Self::Parameter(_) => 10.0, }
}
}
impl fmt::Display for CountExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Literal(n) => write!(f, "{n}"),
Self::Parameter(name) => write!(f, "${name}"),
}
}
}
impl From<usize> for CountExpr {
fn from(n: usize) -> Self {
Self::Literal(n)
}
}
impl PartialEq<usize> for CountExpr {
fn eq(&self, other: &usize) -> bool {
matches!(self, Self::Literal(n) if n == other)
}
}
#[derive(Debug, Clone)]
pub struct LogicalPlan {
pub root: LogicalOperator,
pub explain: bool,
pub profile: bool,
pub default_params: HashMap<String, Value>,
}
impl LogicalPlan {
pub fn new(root: LogicalOperator) -> Self {
Self {
root,
explain: false,
profile: false,
default_params: HashMap::new(),
}
}
pub fn explain(root: LogicalOperator) -> Self {
Self {
root,
explain: true,
profile: false,
default_params: HashMap::new(),
}
}
pub fn profile(root: LogicalOperator) -> Self {
Self {
root,
explain: false,
profile: true,
default_params: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LogicalOperator {
NodeScan(NodeScanOp),
EdgeScan(EdgeScanOp),
Expand(ExpandOp),
Filter(FilterOp),
Project(ProjectOp),
Join(JoinOp),
Aggregate(AggregateOp),
Limit(LimitOp),
Skip(SkipOp),
Sort(SortOp),
Distinct(DistinctOp),
CreateNode(CreateNodeOp),
CreateEdge(CreateEdgeOp),
DeleteNode(DeleteNodeOp),
DeleteEdge(DeleteEdgeOp),
SetProperty(SetPropertyOp),
AddLabel(AddLabelOp),
RemoveLabel(RemoveLabelOp),
Return(ReturnOp),
Empty,
TripleScan(TripleScanOp),
Union(UnionOp),
LeftJoin(LeftJoinOp),
AntiJoin(AntiJoinOp),
Construct(ConstructOp),
Bind(BindOp),
Unwind(UnwindOp),
MapCollect(MapCollectOp),
Merge(MergeOp),
MergeRelationship(MergeRelationshipOp),
ShortestPath(ShortestPathOp),
InsertTriple(InsertTripleOp),
DeleteTriple(DeleteTripleOp),
Modify(ModifyOp),
ClearGraph(ClearGraphOp),
CreateGraph(CreateGraphOp),
DropGraph(DropGraphOp),
LoadGraph(LoadGraphOp),
CopyGraph(CopyGraphOp),
MoveGraph(MoveGraphOp),
AddGraph(AddGraphOp),
HorizontalAggregate(HorizontalAggregateOp),
VectorScan(VectorScanOp),
VectorJoin(VectorJoinOp),
TextScan(TextScanOp),
Except(ExceptOp),
Intersect(IntersectOp),
Otherwise(OtherwiseOp),
Apply(ApplyOp),
ParameterScan(ParameterScanOp),
CreatePropertyGraph(CreatePropertyGraphOp),
MultiWayJoin(MultiWayJoinOp),
CallProcedure(CallProcedureOp),
LoadData(LoadDataOp),
}
impl LogicalOperator {
#[must_use]
pub fn has_mutations(&self) -> bool {
match self {
Self::CreateNode(_)
| Self::CreateEdge(_)
| Self::DeleteNode(_)
| Self::DeleteEdge(_)
| Self::SetProperty(_)
| Self::AddLabel(_)
| Self::RemoveLabel(_)
| Self::Merge(_)
| Self::MergeRelationship(_)
| Self::InsertTriple(_)
| Self::DeleteTriple(_)
| Self::Modify(_)
| Self::ClearGraph(_)
| Self::CreateGraph(_)
| Self::DropGraph(_)
| Self::LoadGraph(_)
| Self::CopyGraph(_)
| Self::MoveGraph(_)
| Self::AddGraph(_)
| Self::CreatePropertyGraph(_) => true,
Self::Filter(op) => op.input.has_mutations(),
Self::Project(op) => op.input.has_mutations(),
Self::Aggregate(op) => op.input.has_mutations(),
Self::Limit(op) => op.input.has_mutations(),
Self::Skip(op) => op.input.has_mutations(),
Self::Sort(op) => op.input.has_mutations(),
Self::Distinct(op) => op.input.has_mutations(),
Self::Unwind(op) => op.input.has_mutations(),
Self::Bind(op) => op.input.has_mutations(),
Self::MapCollect(op) => op.input.has_mutations(),
Self::Return(op) => op.input.has_mutations(),
Self::HorizontalAggregate(op) => op.input.has_mutations(),
Self::VectorScan(op) => op.input.as_deref().is_some_and(Self::has_mutations),
Self::VectorJoin(op) => op.input.has_mutations(),
Self::TextScan(_) => false,
Self::Join(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::LeftJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::AntiJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Except(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Intersect(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Otherwise(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Union(op) => op.inputs.iter().any(|i| i.has_mutations()),
Self::MultiWayJoin(op) => op.inputs.iter().any(|i| i.has_mutations()),
Self::Apply(op) => op.input.has_mutations() || op.subplan.has_mutations(),
Self::NodeScan(_)
| Self::EdgeScan(_)
| Self::Expand(_)
| Self::TripleScan(_)
| Self::ShortestPath(_)
| Self::Empty
| Self::ParameterScan(_)
| Self::CallProcedure(_)
| Self::LoadData(_) => false,
Self::Construct(op) => op.input.has_mutations(),
}
}
#[must_use]
pub fn children(&self) -> Vec<&LogicalOperator> {
match self {
Self::NodeScan(op) => op.input.as_deref().into_iter().collect(),
Self::EdgeScan(op) => op.input.as_deref().into_iter().collect(),
Self::TripleScan(op) => op.input.as_deref().into_iter().collect(),
Self::VectorScan(op) => op.input.as_deref().into_iter().collect(),
Self::CreateNode(op) => op.input.as_deref().into_iter().collect(),
Self::InsertTriple(op) => op.input.as_deref().into_iter().collect(),
Self::DeleteTriple(op) => op.input.as_deref().into_iter().collect(),
Self::Expand(op) => vec![&*op.input],
Self::Filter(op) => vec![&*op.input],
Self::Project(op) => vec![&*op.input],
Self::Aggregate(op) => vec![&*op.input],
Self::Limit(op) => vec![&*op.input],
Self::Skip(op) => vec![&*op.input],
Self::Sort(op) => vec![&*op.input],
Self::Distinct(op) => vec![&*op.input],
Self::Return(op) => vec![&*op.input],
Self::Unwind(op) => vec![&*op.input],
Self::Bind(op) => vec![&*op.input],
Self::Construct(op) => vec![&*op.input],
Self::MapCollect(op) => vec![&*op.input],
Self::ShortestPath(op) => vec![&*op.input],
Self::Merge(op) => vec![&*op.input],
Self::MergeRelationship(op) => vec![&*op.input],
Self::CreateEdge(op) => vec![&*op.input],
Self::DeleteNode(op) => vec![&*op.input],
Self::DeleteEdge(op) => vec![&*op.input],
Self::SetProperty(op) => vec![&*op.input],
Self::AddLabel(op) => vec![&*op.input],
Self::RemoveLabel(op) => vec![&*op.input],
Self::HorizontalAggregate(op) => vec![&*op.input],
Self::VectorJoin(op) => vec![&*op.input],
Self::Modify(op) => vec![&*op.where_clause],
Self::Join(op) => vec![&*op.left, &*op.right],
Self::LeftJoin(op) => vec![&*op.left, &*op.right],
Self::AntiJoin(op) => vec![&*op.left, &*op.right],
Self::Except(op) => vec![&*op.left, &*op.right],
Self::Intersect(op) => vec![&*op.left, &*op.right],
Self::Otherwise(op) => vec![&*op.left, &*op.right],
Self::Apply(op) => vec![&*op.input, &*op.subplan],
Self::Union(op) => op.inputs.iter().collect(),
Self::MultiWayJoin(op) => op.inputs.iter().collect(),
Self::Empty
| Self::ParameterScan(_)
| Self::CallProcedure(_)
| Self::ClearGraph(_)
| Self::CreateGraph(_)
| Self::DropGraph(_)
| Self::LoadGraph(_)
| Self::CopyGraph(_)
| Self::MoveGraph(_)
| Self::AddGraph(_)
| Self::CreatePropertyGraph(_)
| Self::LoadData(_)
| Self::TextScan(_) => vec![],
}
}
#[must_use]
pub fn display_label(&self) -> String {
match self {
Self::NodeScan(op) => {
let label = op.label.as_deref().unwrap_or("*");
format!("{}:{}", op.variable, label)
}
Self::EdgeScan(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
format!("{}:{}", op.variable, types)
}
Self::Expand(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let dir = match op.direction {
ExpandDirection::Outgoing => "->",
ExpandDirection::Incoming => "<-",
ExpandDirection::Both => "--",
};
format!(
"({from}){dir}[:{types}]{dir}({to})",
from = op.from_variable,
to = op.to_variable,
)
}
Self::Filter(op) => {
let hint = match &op.pushdown_hint {
Some(PushdownHint::IndexLookup { property }) => {
format!(" [index: {property}]")
}
Some(PushdownHint::RangeScan { property }) => {
format!(" [range: {property}]")
}
Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
None => String::new(),
};
format!("{}{hint}", fmt_expr(&op.predicate))
}
Self::Project(op) => {
let cols: Vec<String> = op
.projections
.iter()
.map(|p| match &p.alias {
Some(alias) => alias.clone(),
None => fmt_expr(&p.expression),
})
.collect();
cols.join(", ")
}
Self::Join(op) => format!("{:?}", op.join_type),
Self::Aggregate(op) => {
let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
format!("group: [{}]", groups.join(", "))
}
Self::Limit(op) => format!("{}", op.count),
Self::Skip(op) => format!("{}", op.count),
Self::Sort(op) => {
let keys: Vec<String> = op
.keys
.iter()
.map(|k| {
let dir = match k.order {
SortOrder::Ascending => "ASC",
SortOrder::Descending => "DESC",
};
format!("{} {dir}", fmt_expr(&k.expression))
})
.collect();
keys.join(", ")
}
Self::Distinct(_) => String::new(),
Self::Return(op) => {
let items: Vec<String> = op
.items
.iter()
.map(|item| match &item.alias {
Some(alias) => alias.clone(),
None => fmt_expr(&item.expression),
})
.collect();
items.join(", ")
}
Self::Union(op) => format!("{} branches", op.inputs.len()),
Self::MultiWayJoin(op) => {
format!("{} inputs", op.inputs.len())
}
Self::LeftJoin(_) => String::new(),
Self::AntiJoin(_) => String::new(),
Self::Unwind(op) => op.variable.clone(),
Self::Bind(op) => op.variable.clone(),
Self::MapCollect(op) => op.alias.clone(),
Self::ShortestPath(op) => {
format!("{} -> {}", op.source_var, op.target_var)
}
Self::Merge(op) => op.variable.clone(),
Self::MergeRelationship(op) => op.variable.clone(),
Self::CreateNode(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::CreateEdge(op) => {
format!(
"[{}:{}]",
op.variable.as_deref().unwrap_or("?"),
op.edge_type
)
}
Self::DeleteNode(op) => op.variable.clone(),
Self::DeleteEdge(op) => op.variable.clone(),
Self::SetProperty(op) => op.variable.clone(),
Self::AddLabel(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::RemoveLabel(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::CallProcedure(op) => op.name.join("."),
Self::LoadData(op) => format!("{} AS {}", op.path, op.variable),
Self::Apply(_) => String::new(),
Self::VectorScan(op) => op.variable.clone(),
Self::VectorJoin(op) => op.right_variable.clone(),
Self::TextScan(op) => format!("{}:{}", op.variable, op.label),
_ => String::new(),
}
}
}
impl LogicalOperator {
pub fn explain_tree(&self) -> String {
let mut output = String::new();
self.fmt_tree(&mut output, 0);
output
}
fn fmt_tree(&self, out: &mut String, depth: usize) {
use std::fmt::Write;
let indent = " ".repeat(depth);
match self {
Self::NodeScan(op) => {
let label = op.label.as_deref().unwrap_or("*");
let _ = writeln!(out, "{indent}NodeScan ({var}:{label})", var = op.variable);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::EdgeScan(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let _ = writeln!(out, "{indent}EdgeScan ({var}:{types})", var = op.variable);
}
Self::Expand(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let dir = match op.direction {
ExpandDirection::Outgoing => "->",
ExpandDirection::Incoming => "<-",
ExpandDirection::Both => "--",
};
let hops = match (op.min_hops, op.max_hops) {
(1, Some(1)) => String::new(),
(min, Some(max)) if min == max => format!("*{min}"),
(min, Some(max)) => format!("*{min}..{max}"),
(min, None) => format!("*{min}.."),
};
let _ = writeln!(
out,
"{indent}Expand ({from}){dir}[:{types}{hops}]{dir}({to})",
from = op.from_variable,
to = op.to_variable,
);
op.input.fmt_tree(out, depth + 1);
}
Self::Filter(op) => {
let hint = match &op.pushdown_hint {
Some(PushdownHint::IndexLookup { property }) => {
format!(" [index: {property}]")
}
Some(PushdownHint::RangeScan { property }) => {
format!(" [range: {property}]")
}
Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
None => String::new(),
};
let _ = writeln!(
out,
"{indent}Filter ({expr}){hint}",
expr = fmt_expr(&op.predicate)
);
op.input.fmt_tree(out, depth + 1);
}
Self::Project(op) => {
let cols: Vec<String> = op
.projections
.iter()
.map(|p| {
let expr = fmt_expr(&p.expression);
match &p.alias {
Some(alias) => format!("{expr} AS {alias}"),
None => expr,
}
})
.collect();
let _ = writeln!(out, "{indent}Project ({cols})", cols = cols.join(", "));
op.input.fmt_tree(out, depth + 1);
}
Self::Join(op) => {
let _ = writeln!(out, "{indent}Join ({ty:?})", ty = op.join_type);
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Aggregate(op) => {
let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
let aggs: Vec<String> = op
.aggregates
.iter()
.map(|a| {
let func = format!("{:?}", a.function).to_lowercase();
match &a.alias {
Some(alias) => format!("{func}(...) AS {alias}"),
None => format!("{func}(...)"),
}
})
.collect();
let _ = writeln!(
out,
"{indent}Aggregate (group: [{groups}], aggs: [{aggs}])",
groups = groups.join(", "),
aggs = aggs.join(", "),
);
op.input.fmt_tree(out, depth + 1);
}
Self::Limit(op) => {
let _ = writeln!(out, "{indent}Limit ({})", op.count);
op.input.fmt_tree(out, depth + 1);
}
Self::Skip(op) => {
let _ = writeln!(out, "{indent}Skip ({})", op.count);
op.input.fmt_tree(out, depth + 1);
}
Self::Sort(op) => {
let keys: Vec<String> = op
.keys
.iter()
.map(|k| {
let dir = match k.order {
SortOrder::Ascending => "ASC",
SortOrder::Descending => "DESC",
};
format!("{} {dir}", fmt_expr(&k.expression))
})
.collect();
let _ = writeln!(out, "{indent}Sort ({keys})", keys = keys.join(", "));
op.input.fmt_tree(out, depth + 1);
}
Self::Distinct(op) => {
let _ = writeln!(out, "{indent}Distinct");
op.input.fmt_tree(out, depth + 1);
}
Self::Return(op) => {
let items: Vec<String> = op
.items
.iter()
.map(|item| {
let expr = fmt_expr(&item.expression);
match &item.alias {
Some(alias) => format!("{expr} AS {alias}"),
None => expr,
}
})
.collect();
let distinct = if op.distinct { " DISTINCT" } else { "" };
let _ = writeln!(
out,
"{indent}Return{distinct} ({items})",
items = items.join(", ")
);
op.input.fmt_tree(out, depth + 1);
}
Self::Union(op) => {
let _ = writeln!(out, "{indent}Union ({n} branches)", n = op.inputs.len());
for input in &op.inputs {
input.fmt_tree(out, depth + 1);
}
}
Self::MultiWayJoin(op) => {
let vars = op.shared_variables.join(", ");
let _ = writeln!(
out,
"{indent}MultiWayJoin ({n} inputs, shared: [{vars}])",
n = op.inputs.len()
);
for input in &op.inputs {
input.fmt_tree(out, depth + 1);
}
}
Self::LeftJoin(op) => {
if let Some(cond) = &op.condition {
let _ = writeln!(out, "{indent}LeftJoin (condition: {cond:?})");
} else {
let _ = writeln!(out, "{indent}LeftJoin");
}
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::AntiJoin(op) => {
let _ = writeln!(out, "{indent}AntiJoin");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Unwind(op) => {
let _ = writeln!(out, "{indent}Unwind ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::Bind(op) => {
let _ = writeln!(out, "{indent}Bind ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::MapCollect(op) => {
let _ = writeln!(
out,
"{indent}MapCollect ({key} -> {val} AS {alias})",
key = op.key_var,
val = op.value_var,
alias = op.alias
);
op.input.fmt_tree(out, depth + 1);
}
Self::Apply(op) => {
let _ = writeln!(out, "{indent}Apply");
op.input.fmt_tree(out, depth + 1);
op.subplan.fmt_tree(out, depth + 1);
}
Self::Except(op) => {
let all = if op.all { " ALL" } else { "" };
let _ = writeln!(out, "{indent}Except{all}");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Intersect(op) => {
let all = if op.all { " ALL" } else { "" };
let _ = writeln!(out, "{indent}Intersect{all}");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Otherwise(op) => {
let _ = writeln!(out, "{indent}Otherwise");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::ShortestPath(op) => {
let _ = writeln!(
out,
"{indent}ShortestPath ({from} -> {to})",
from = op.source_var,
to = op.target_var
);
op.input.fmt_tree(out, depth + 1);
}
Self::Merge(op) => {
let _ = writeln!(out, "{indent}Merge ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::MergeRelationship(op) => {
let _ = writeln!(out, "{indent}MergeRelationship ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::CreateNode(op) => {
let labels = op.labels.join(":");
let _ = writeln!(
out,
"{indent}CreateNode ({var}:{labels})",
var = op.variable
);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::CreateEdge(op) => {
let var = op.variable.as_deref().unwrap_or("?");
let _ = writeln!(
out,
"{indent}CreateEdge ({from})-[{var}:{ty}]->({to})",
from = op.from_variable,
ty = op.edge_type,
to = op.to_variable
);
op.input.fmt_tree(out, depth + 1);
}
Self::DeleteNode(op) => {
let _ = writeln!(out, "{indent}DeleteNode ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::DeleteEdge(op) => {
let _ = writeln!(out, "{indent}DeleteEdge ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::SetProperty(op) => {
let props: Vec<String> = op
.properties
.iter()
.map(|(k, _)| format!("{}.{k}", op.variable))
.collect();
let _ = writeln!(
out,
"{indent}SetProperty ({props})",
props = props.join(", ")
);
op.input.fmt_tree(out, depth + 1);
}
Self::AddLabel(op) => {
let labels = op.labels.join(":");
let _ = writeln!(out, "{indent}AddLabel ({var}:{labels})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::RemoveLabel(op) => {
let labels = op.labels.join(":");
let _ = writeln!(
out,
"{indent}RemoveLabel ({var}:{labels})",
var = op.variable
);
op.input.fmt_tree(out, depth + 1);
}
Self::CallProcedure(op) => {
let _ = writeln!(
out,
"{indent}CallProcedure ({name})",
name = op.name.join(".")
);
}
Self::LoadData(op) => {
let format_name = match op.format {
LoadDataFormat::Csv => "LoadCsv",
LoadDataFormat::Jsonl => "LoadJsonl",
LoadDataFormat::Parquet => "LoadParquet",
_ => "LoadData",
};
let headers = if op.with_headers && op.format == LoadDataFormat::Csv {
" WITH HEADERS"
} else {
""
};
let _ = writeln!(
out,
"{indent}{format_name}{headers} ('{path}' AS {var})",
path = op.path,
var = op.variable,
);
}
Self::TripleScan(op) => {
let _ = writeln!(
out,
"{indent}TripleScan ({s} {p} {o})",
s = fmt_triple_component(&op.subject),
p = fmt_triple_component(&op.predicate),
o = fmt_triple_component(&op.object)
);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::VectorScan(op) => {
let metric = op.metric.map_or("default", |m| match m {
VectorMetric::Cosine => "cosine",
VectorMetric::Euclidean => "euclidean",
VectorMetric::DotProduct => "dot_product",
VectorMetric::Manhattan => "manhattan",
});
let mode = match op.k {
Some(k) => format!("top-{k}"),
None => "threshold".to_string(),
};
let _ = writeln!(
out,
"{indent}VectorScan ({var}:{label}.{prop}, {metric}, {mode})",
var = op.variable,
label = op.label.as_deref().unwrap_or("*"),
prop = op.property,
);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::TextScan(op) => {
let mode = match (op.k, op.threshold) {
(Some(k), _) => format!("top-{k}"),
(None, Some(t)) => format!("threshold>={t}"),
(None, None) => "default-top-100".to_string(),
};
let query = fmt_expr(&op.query);
let _ = writeln!(
out,
"{indent}TextScan ({var}:{label}.{prop}, query={query}, {mode})",
var = op.variable,
label = op.label,
prop = op.property,
);
}
Self::Empty => {
let _ = writeln!(out, "{indent}Empty");
}
_ => {
let _ = writeln!(out, "{indent}{:?}", std::mem::discriminant(self));
}
}
}
}
fn fmt_expr(expr: &LogicalExpression) -> String {
match expr {
LogicalExpression::Variable(name) => name.clone(),
LogicalExpression::Property { variable, property } => format!("{variable}.{property}"),
LogicalExpression::Literal(val) => format!("{val}"),
LogicalExpression::Binary { left, op, right } => {
format!("{} {op:?} {}", fmt_expr(left), fmt_expr(right))
}
LogicalExpression::Unary { op, operand } => {
format!("{op:?} {}", fmt_expr(operand))
}
LogicalExpression::FunctionCall { name, args, .. } => {
let arg_strs: Vec<String> = args.iter().map(fmt_expr).collect();
format!("{name}({})", arg_strs.join(", "))
}
_ => format!("{expr:?}"),
}
}
fn fmt_triple_component(comp: &TripleComponent) -> String {
match comp {
TripleComponent::Variable(name) => format!("?{name}"),
TripleComponent::Iri(iri) => format!("<{iri}>"),
TripleComponent::Literal(val) => format!("{val}"),
TripleComponent::LangLiteral { value, lang } => format!("\"{value}\"@{lang}"),
TripleComponent::BlankNode(label) => format!("_:{label}"),
}
}
#[derive(Debug, Clone)]
pub struct NodeScanOp {
pub variable: String,
pub label: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct EdgeScanOp {
pub variable: String,
pub edge_types: Vec<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum PathMode {
#[default]
Walk,
Trail,
Simple,
Acyclic,
}
#[derive(Debug, Clone)]
pub struct ExpandOp {
pub from_variable: String,
pub to_variable: String,
pub edge_variable: Option<String>,
pub direction: ExpandDirection,
pub edge_types: Vec<String>,
pub min_hops: u32,
pub max_hops: Option<u32>,
pub input: Box<LogicalOperator>,
pub path_alias: Option<String>,
pub path_mode: PathMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ExpandDirection {
Outgoing,
Incoming,
Both,
}
#[derive(Debug, Clone)]
pub struct JoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub join_type: JoinType,
pub conditions: Vec<JoinCondition>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
Semi,
Anti,
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left: LogicalExpression,
pub right: LogicalExpression,
}
#[derive(Debug, Clone)]
pub struct MultiWayJoinOp {
pub inputs: Vec<LogicalOperator>,
pub conditions: Vec<JoinCondition>,
pub shared_variables: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct AggregateOp {
pub group_by: Vec<LogicalExpression>,
pub aggregates: Vec<AggregateExpr>,
pub input: Box<LogicalOperator>,
pub having: Option<LogicalExpression>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EntityKind {
Edge,
Node,
}
#[derive(Debug, Clone)]
pub struct HorizontalAggregateOp {
pub list_column: String,
pub entity_kind: EntityKind,
pub function: AggregateFunction,
pub property: String,
pub alias: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct AggregateExpr {
pub function: AggregateFunction,
pub expression: Option<LogicalExpression>,
pub expression2: Option<LogicalExpression>,
pub distinct: bool,
pub alias: Option<String>,
pub percentile: Option<f64>,
pub separator: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum AggregateFunction {
Count,
CountNonNull,
Sum,
Avg,
Min,
Max,
Collect,
StdDev,
StdDevPop,
Variance,
VariancePop,
PercentileDisc,
PercentileCont,
GroupConcat,
Sample,
CovarSamp,
CovarPop,
Corr,
RegrSlope,
RegrIntercept,
RegrR2,
RegrCount,
RegrSxx,
RegrSyy,
RegrSxy,
RegrAvgx,
RegrAvgy,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum PushdownHint {
IndexLookup {
property: String,
},
RangeScan {
property: String,
},
LabelFirst,
}
#[derive(Debug, Clone)]
pub struct FilterOp {
pub predicate: LogicalExpression,
pub input: Box<LogicalOperator>,
pub pushdown_hint: Option<PushdownHint>,
}
#[derive(Debug, Clone)]
pub struct ProjectOp {
pub projections: Vec<Projection>,
pub input: Box<LogicalOperator>,
pub pass_through_input: bool,
}
#[derive(Debug, Clone)]
pub struct Projection {
pub expression: LogicalExpression,
pub alias: Option<String>,
}
#[derive(Debug, Clone)]
pub struct LimitOp {
pub count: CountExpr,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SkipOp {
pub count: CountExpr,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SortOp {
pub keys: Vec<SortKey>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SortKey {
pub expression: LogicalExpression,
pub order: SortOrder,
pub nulls: Option<NullsOrdering>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SortOrder {
Ascending,
Descending,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum NullsOrdering {
First,
Last,
}
#[derive(Debug, Clone)]
pub struct DistinctOp {
pub input: Box<LogicalOperator>,
pub columns: Option<Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct CreateNodeOp {
pub variable: String,
pub labels: Vec<String>,
pub properties: Vec<(String, LogicalExpression)>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct CreateEdgeOp {
pub variable: Option<String>,
pub from_variable: String,
pub to_variable: String,
pub edge_type: String,
pub properties: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct DeleteNodeOp {
pub variable: String,
pub detach: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct DeleteEdgeOp {
pub variable: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SetPropertyOp {
pub variable: String,
pub properties: Vec<(String, LogicalExpression)>,
pub replace: bool,
pub is_edge: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct AddLabelOp {
pub variable: String,
pub labels: Vec<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct RemoveLabelOp {
pub variable: String,
pub labels: Vec<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone, Default)]
pub struct DatasetRestriction {
pub default_graphs: Vec<String>,
pub named_graphs: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct TripleScanOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<TripleComponent>,
pub input: Option<Box<LogicalOperator>>,
pub dataset: Option<DatasetRestriction>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum TripleComponent {
Variable(String),
Iri(String),
Literal(Value),
LangLiteral {
value: String,
lang: String,
},
BlankNode(String),
}
impl TripleComponent {
#[must_use]
pub fn as_variable(&self) -> Option<&str> {
match self {
Self::Variable(v) => Some(v),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct UnionOp {
pub inputs: Vec<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ExceptOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub all: bool,
}
#[derive(Debug, Clone)]
pub struct IntersectOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub all: bool,
}
#[derive(Debug, Clone)]
pub struct OtherwiseOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ApplyOp {
pub input: Box<LogicalOperator>,
pub subplan: Box<LogicalOperator>,
pub shared_variables: Vec<String>,
pub optional: bool,
}
#[derive(Debug, Clone)]
pub struct ParameterScanOp {
pub columns: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct LeftJoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub condition: Option<LogicalExpression>,
}
#[derive(Debug, Clone)]
pub struct AntiJoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct BindOp {
pub expression: LogicalExpression,
pub variable: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct UnwindOp {
pub expression: LogicalExpression,
pub variable: String,
pub ordinality_var: Option<String>,
pub offset_var: Option<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MapCollectOp {
pub key_var: String,
pub value_var: String,
pub alias: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MergeOp {
pub variable: String,
pub labels: Vec<String>,
pub match_properties: Vec<(String, LogicalExpression)>,
pub on_create: Vec<(String, LogicalExpression)>,
pub on_match: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MergeRelationshipOp {
pub variable: String,
pub source_variable: String,
pub target_variable: String,
pub edge_type: String,
pub match_properties: Vec<(String, LogicalExpression)>,
pub on_create: Vec<(String, LogicalExpression)>,
pub on_match: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ShortestPathOp {
pub input: Box<LogicalOperator>,
pub source_var: String,
pub target_var: String,
pub edge_types: Vec<String>,
pub direction: ExpandDirection,
pub path_alias: String,
pub all_paths: bool,
}
#[derive(Debug, Clone)]
pub struct InsertTripleOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct DeleteTripleOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct ModifyOp {
pub delete_templates: Vec<TripleTemplate>,
pub insert_templates: Vec<TripleTemplate>,
pub where_clause: Box<LogicalOperator>,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TripleTemplate {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConstructOp {
pub templates: Vec<TripleTemplate>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ClearGraphOp {
pub graph: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct CreateGraphOp {
pub graph: String,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct DropGraphOp {
pub graph: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct LoadGraphOp {
pub source: String,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct CopyGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct MoveGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct AddGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct VectorScanOp {
pub variable: String,
pub index_name: Option<String>,
pub property: String,
pub label: Option<String>,
pub query_vector: LogicalExpression,
pub k: Option<usize>,
pub metric: Option<VectorMetric>,
pub min_similarity: Option<f32>,
pub max_distance: Option<f32>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum VectorMetric {
Cosine,
Euclidean,
DotProduct,
Manhattan,
}
#[derive(Debug, Clone)]
pub struct VectorJoinOp {
pub input: Box<LogicalOperator>,
pub left_vector_variable: Option<String>,
pub left_property: Option<String>,
pub query_vector: LogicalExpression,
pub right_variable: String,
pub right_property: String,
pub right_label: Option<String>,
pub index_name: Option<String>,
pub k: usize,
pub metric: Option<VectorMetric>,
pub min_similarity: Option<f32>,
pub max_distance: Option<f32>,
pub score_variable: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TextScanOp {
pub variable: String,
pub label: String,
pub property: String,
pub query: LogicalExpression,
pub k: Option<usize>,
pub threshold: Option<f64>,
pub score_column: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ReturnOp {
pub items: Vec<ReturnItem>,
pub distinct: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ReturnItem {
pub expression: LogicalExpression,
pub alias: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CreatePropertyGraphOp {
pub name: String,
pub node_tables: Vec<PropertyGraphNodeTable>,
pub edge_tables: Vec<PropertyGraphEdgeTable>,
}
#[derive(Debug, Clone)]
pub struct PropertyGraphNodeTable {
pub name: String,
pub columns: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct PropertyGraphEdgeTable {
pub name: String,
pub columns: Vec<(String, String)>,
pub source_table: String,
pub target_table: String,
}
#[derive(Debug, Clone)]
pub struct CallProcedureOp {
pub name: Vec<String>,
pub arguments: Vec<LogicalExpression>,
pub yield_items: Option<Vec<ProcedureYield>>,
}
#[derive(Debug, Clone)]
pub struct ProcedureYield {
pub field_name: String,
pub alias: Option<String>,
}
pub use grafeo_core::execution::operators::LoadDataFormat;
#[derive(Debug, Clone)]
pub struct LoadDataOp {
pub format: LoadDataFormat,
pub with_headers: bool,
pub path: String,
pub variable: String,
pub field_terminator: Option<char>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LogicalExpression {
Literal(Value),
Variable(String),
Property {
variable: String,
property: String,
},
Binary {
left: Box<LogicalExpression>,
op: BinaryOp,
right: Box<LogicalExpression>,
},
Unary {
op: UnaryOp,
operand: Box<LogicalExpression>,
},
FunctionCall {
name: String,
args: Vec<LogicalExpression>,
distinct: bool,
},
List(Vec<LogicalExpression>),
Map(Vec<(String, LogicalExpression)>),
IndexAccess {
base: Box<LogicalExpression>,
index: Box<LogicalExpression>,
},
SliceAccess {
base: Box<LogicalExpression>,
start: Option<Box<LogicalExpression>>,
end: Option<Box<LogicalExpression>>,
},
Case {
operand: Option<Box<LogicalExpression>>,
when_clauses: Vec<(LogicalExpression, LogicalExpression)>,
else_clause: Option<Box<LogicalExpression>>,
},
Parameter(String),
Labels(String),
Type(String),
Id(String),
ListComprehension {
variable: String,
list_expr: Box<LogicalExpression>,
filter_expr: Option<Box<LogicalExpression>>,
map_expr: Box<LogicalExpression>,
},
ListPredicate {
kind: ListPredicateKind,
variable: String,
list_expr: Box<LogicalExpression>,
predicate: Box<LogicalExpression>,
},
ExistsSubquery(Box<LogicalOperator>),
CountSubquery(Box<LogicalOperator>),
ValueSubquery(Box<LogicalOperator>),
MapProjection {
base: String,
entries: Vec<MapProjectionEntry>,
},
Reduce {
accumulator: String,
initial: Box<LogicalExpression>,
variable: String,
list: Box<LogicalExpression>,
expression: Box<LogicalExpression>,
},
PatternComprehension {
subplan: Box<LogicalOperator>,
projection: Box<LogicalExpression>,
},
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum MapProjectionEntry {
PropertySelector(String),
LiteralEntry(String, LogicalExpression),
AllProperties,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ListPredicateKind {
All,
Any,
None,
Single,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum BinaryOp {
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
And,
Or,
Xor,
Add,
Sub,
Mul,
Div,
Mod,
Concat,
StartsWith,
EndsWith,
Contains,
In,
Like,
Regex,
Pow,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum UnaryOp {
Not,
Neg,
IsNull,
IsNotNull,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_node_scan_plan() {
let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".into()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Person".into()),
input: None,
})),
}));
if let LogicalOperator::Return(ret) = &plan.root {
assert_eq!(ret.items.len(), 1);
assert!(!ret.distinct);
if let LogicalOperator::NodeScan(scan) = ret.input.as_ref() {
assert_eq!(scan.variable, "n");
assert_eq!(scan.label, Some("Person".into()));
} else {
panic!("Expected NodeScan");
}
} else {
panic!("Expected Return");
}
}
#[test]
fn test_filter_plan() {
let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Property {
variable: "n".into(),
property: "name".into(),
},
alias: Some("name".into()),
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".into(),
property: "age".into(),
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Person".into()),
input: None,
})),
pushdown_hint: None,
})),
}));
if let LogicalOperator::Return(ret) = &plan.root {
if let LogicalOperator::Filter(filter) = ret.input.as_ref() {
if let LogicalExpression::Binary { op, .. } = &filter.predicate {
assert_eq!(*op, BinaryOp::Gt);
} else {
panic!("Expected Binary expression");
}
} else {
panic!("Expected Filter");
}
} else {
panic!("Expected Return");
}
}
fn read_only_scan() -> LogicalOperator {
LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Article".into()),
input: None,
})
}
fn mutating_create_node() -> LogicalOperator {
LogicalOperator::CreateNode(CreateNodeOp {
variable: "n".into(),
labels: vec!["Article".into()],
properties: vec![],
input: None,
})
}
#[test]
fn test_text_scan_is_leaf_no_mutations() {
let op = LogicalOperator::TextScan(TextScanOp {
variable: "doc".into(),
label: "Article".into(),
property: "body".into(),
query: LogicalExpression::Literal(Value::String("rust".into())),
k: Some(10),
threshold: None,
score_column: None,
});
assert!(!op.has_mutations(), "TextScan is a leaf and never mutates");
}
#[test]
fn test_vector_scan_no_input_no_mutations() {
let op = LogicalOperator::VectorScan(VectorScanOp {
variable: "doc".into(),
index_name: None,
property: "embedding".into(),
label: Some("Article".into()),
query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
k: Some(10),
metric: None,
min_similarity: None,
max_distance: None,
input: None,
});
assert!(!op.has_mutations(), "VectorScan with no input is read-only");
}
#[test]
fn test_vector_scan_recurses_into_mutating_input() {
let op = LogicalOperator::VectorScan(VectorScanOp {
variable: "doc".into(),
index_name: None,
property: "embedding".into(),
label: Some("Article".into()),
query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
k: Some(10),
metric: None,
min_similarity: None,
max_distance: None,
input: Some(Box::new(mutating_create_node())),
});
assert!(
op.has_mutations(),
"VectorScan must propagate mutations from its input subtree"
);
}
#[test]
fn test_vector_scan_recurses_into_read_only_input() {
let op = LogicalOperator::VectorScan(VectorScanOp {
variable: "doc".into(),
index_name: None,
property: "embedding".into(),
label: Some("Article".into()),
query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
k: Some(10),
metric: None,
min_similarity: None,
max_distance: None,
input: Some(Box::new(read_only_scan())),
});
assert!(
!op.has_mutations(),
"VectorScan with read-only input is read-only"
);
}
#[test]
fn test_vector_join_recurses_into_mutating_input() {
let op = LogicalOperator::VectorJoin(VectorJoinOp {
input: Box::new(mutating_create_node()),
left_vector_variable: None,
left_property: None,
query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
right_variable: "m".into(),
right_property: "embedding".into(),
right_label: Some("Movie".into()),
index_name: None,
k: 10,
metric: Some(VectorMetric::Cosine),
min_similarity: None,
max_distance: None,
score_variable: None,
});
assert!(
op.has_mutations(),
"VectorJoin must recurse into input, was previously hard-coded false"
);
}
#[test]
fn test_vector_join_with_read_only_input_is_read_only() {
let op = LogicalOperator::VectorJoin(VectorJoinOp {
input: Box::new(read_only_scan()),
left_vector_variable: None,
left_property: None,
query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
right_variable: "m".into(),
right_property: "embedding".into(),
right_label: Some("Movie".into()),
index_name: None,
k: 10,
metric: Some(VectorMetric::Cosine),
min_similarity: None,
max_distance: None,
score_variable: None,
});
assert!(!op.has_mutations());
}
fn text_scan_with_modes(k: Option<usize>, threshold: Option<f64>) -> String {
let plan = LogicalPlan::new(LogicalOperator::TextScan(TextScanOp {
variable: "doc".into(),
label: "Article".into(),
property: "body".into(),
query: LogicalExpression::Literal(Value::String("rust".into())),
k,
threshold,
score_column: None,
}));
let mut out = String::new();
plan.root.fmt_tree(&mut out, 0);
out
}
#[test]
fn test_text_scan_display_top_k_mode() {
let out = text_scan_with_modes(Some(10), None);
assert!(out.contains("top-10"), "expected top-10 in:\n{out}");
assert!(
!out.contains("threshold"),
"top-k mode should not say threshold:\n{out}"
);
}
#[test]
fn test_text_scan_display_threshold_mode() {
let out = text_scan_with_modes(None, Some(0.5));
assert!(
out.contains("threshold>=0.5"),
"expected threshold>=0.5 in:\n{out}"
);
assert!(
!out.contains("top-"),
"threshold mode should not say top-:\n{out}"
);
}
#[test]
fn test_text_scan_display_default_mode_when_both_none() {
let out = text_scan_with_modes(None, None);
assert!(
out.contains("default-top-100"),
"expected default-top-100 (both k and threshold None) in:\n{out}"
);
}
#[test]
fn test_text_scan_display_k_takes_precedence_over_threshold() {
let out = text_scan_with_modes(Some(5), Some(0.3));
assert!(out.contains("top-5"), "expected top-5 in:\n{out}");
assert!(
!out.contains("threshold"),
"k should take precedence over threshold:\n{out}"
);
}
#[test]
fn test_explain_tree_basic_operators() {
let plan = LogicalOperator::Project(ProjectOp {
projections: vec![Projection {
expression: LogicalExpression::Property {
variable: "b".into(),
property: "name".into(),
},
alias: Some("name".into()),
}],
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "b".into(),
property: "age".into(),
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".into(),
to_variable: "b".into(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".into()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".into(),
label: Some("Person".into()),
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
pushdown_hint: Some(PushdownHint::LabelFirst),
})),
pass_through_input: false,
});
let tree = plan.explain_tree();
assert!(tree.contains("Project"), "missing Project in:\n{tree}");
assert!(tree.contains("Filter"), "missing Filter in:\n{tree}");
assert!(tree.contains("Expand"), "missing Expand in:\n{tree}");
assert!(tree.contains("NodeScan"), "missing NodeScan in:\n{tree}");
assert!(tree.starts_with("Project"));
assert!(
tree.contains("\n Filter"),
"Filter should be indented by 2 spaces"
);
assert!(
tree.contains("\n Expand"),
"Expand should be indented by 4 spaces"
);
assert!(
tree.contains("\n NodeScan"),
"NodeScan should be indented by 6 spaces"
);
assert!(tree.contains("Person"));
assert!(tree.contains("KNOWS"));
assert!(tree.contains("[label-first]"));
assert!(tree.contains("AS name"));
}
#[test]
fn test_has_mutations_recursive() {
let with_mutation = LogicalOperator::Project(ProjectOp {
projections: vec![],
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Literal(Value::Bool(true)),
input: Box::new(LogicalOperator::CreateNode(CreateNodeOp {
variable: "n".into(),
labels: vec!["Person".into()],
properties: vec![],
input: None,
})),
pushdown_hint: None,
})),
pass_through_input: false,
});
assert!(with_mutation.has_mutations());
let read_only = LogicalOperator::Project(ProjectOp {
projections: vec![],
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Literal(Value::Bool(true)),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
pass_through_input: false,
});
assert!(!read_only.has_mutations());
}
#[test]
fn test_children_collection_for_union_and_apply() {
let leaf = |label: &str| {
LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some(label.into()),
input: None,
})
};
let union = LogicalOperator::Union(UnionOp {
inputs: vec![leaf("Amsterdam"), leaf("Berlin"), leaf("Prague")],
});
let children = union.children();
assert_eq!(children.len(), 3);
match children[0] {
LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Amsterdam")),
_ => panic!("Expected NodeScan"),
}
match children[2] {
LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Prague")),
_ => panic!("Expected NodeScan"),
}
let apply = LogicalOperator::Apply(ApplyOp {
input: Box::new(leaf("Person")),
subplan: Box::new(leaf("Company")),
shared_variables: vec![],
optional: false,
});
let apply_children = apply.children();
assert_eq!(apply_children.len(), 2);
match apply_children[0] {
LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Person")),
_ => panic!("Expected input NodeScan"),
}
match apply_children[1] {
LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Company")),
_ => panic!("Expected subplan NodeScan"),
}
}
#[test]
fn test_count_expr_parameter_default() {
let param = CountExpr::Parameter("limit".to_string());
assert!((param.estimate() - 10.0).abs() < f64::EPSILON);
let literal = CountExpr::Literal(42);
assert!((literal.estimate() - 42.0).abs() < f64::EPSILON);
assert_eq!(literal.value(), 42);
assert_eq!(literal.try_value(), Ok(42));
let err = param.try_value().unwrap_err();
assert!(err.contains("$limit"), "error should mention $limit: {err}");
assert_eq!(format!("{literal}"), "42");
assert_eq!(format!("{param}"), "$limit");
assert!(literal == 42usize);
}
#[test]
fn count_expr_literal_value() {
let count = CountExpr::Literal(42);
assert_eq!(count.value(), 42);
assert_eq!(count.try_value(), Ok(42));
assert!((count.estimate() - 42.0).abs() < f64::EPSILON);
}
#[test]
fn count_expr_parameter_try_value_errors() {
let count = CountExpr::Parameter("limit".into());
let err = count.try_value().unwrap_err();
assert!(err.contains("$limit"));
assert!((count.estimate() - 10.0).abs() < f64::EPSILON);
}
#[test]
#[should_panic(expected = "Unresolved parameter: $rows")]
fn count_expr_parameter_value_panics() {
let count = CountExpr::Parameter("rows".into());
let _ = count.value();
}
#[test]
fn count_expr_display_and_conversions() {
assert_eq!(format!("{}", CountExpr::Literal(7)), "7");
assert_eq!(format!("{}", CountExpr::Parameter("n".into())), "$n");
let from_usize: CountExpr = 3usize.into();
assert_eq!(from_usize, CountExpr::Literal(3));
assert_eq!(CountExpr::Literal(5), 5usize);
assert!(CountExpr::Parameter("x".into()) != 5usize);
}
#[test]
fn logical_plan_constructors() {
let leaf = || LogicalOperator::Empty;
let normal = LogicalPlan::new(leaf());
assert!(!normal.explain);
assert!(!normal.profile);
assert!(normal.default_params.is_empty());
let explained = LogicalPlan::explain(leaf());
assert!(explained.explain);
assert!(!explained.profile);
let profiled = LogicalPlan::profile(leaf());
assert!(!profiled.explain);
assert!(profiled.profile);
}
fn var(name: &str) -> LogicalExpression {
LogicalExpression::Variable(name.into())
}
fn leaf_empty() -> Box<LogicalOperator> {
Box::new(LogicalOperator::Empty)
}
fn leaf_node_scan(v: &str) -> Box<LogicalOperator> {
Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: v.into(),
label: None,
input: None,
}))
}
fn leaf_create_node(v: &str) -> Box<LogicalOperator> {
Box::new(LogicalOperator::CreateNode(CreateNodeOp {
variable: v.into(),
labels: vec!["Person".into()],
properties: vec![],
input: None,
}))
}
#[test]
fn has_mutations_direct_operators_are_mutating() {
let op = LogicalOperator::CreateNode(CreateNodeOp {
variable: "vincent".into(),
labels: vec!["Person".into()],
properties: vec![],
input: None,
});
assert!(op.has_mutations());
let delete = LogicalOperator::DeleteNode(DeleteNodeOp {
variable: "vincent".into(),
detach: true,
input: leaf_node_scan("vincent"),
});
assert!(delete.has_mutations());
let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
variable: "mia".into(),
properties: vec![("city".into(), LogicalExpression::Literal(Value::Null))],
replace: false,
is_edge: false,
input: leaf_node_scan("mia"),
});
assert!(set_prop.has_mutations());
let insert_triple = LogicalOperator::InsertTriple(InsertTripleOp {
subject: TripleComponent::Iri("s".into()),
predicate: TripleComponent::Iri("p".into()),
object: TripleComponent::Iri("o".into()),
graph: None,
input: None,
});
assert!(insert_triple.has_mutations());
let clear = LogicalOperator::ClearGraph(ClearGraphOp {
graph: None,
silent: false,
});
assert!(clear.has_mutations());
let ddl = LogicalOperator::CreatePropertyGraph(CreatePropertyGraphOp {
name: "g".into(),
node_tables: vec![],
edge_tables: vec![],
});
assert!(ddl.has_mutations());
}
#[test]
fn has_mutations_propagates_through_single_input_operators() {
let base = || {
LogicalOperator::SetProperty(SetPropertyOp {
variable: "butch".into(),
properties: vec![],
replace: false,
is_edge: false,
input: leaf_node_scan("butch"),
})
};
let filter = LogicalOperator::Filter(FilterOp {
predicate: var("x"),
input: Box::new(base()),
pushdown_hint: None,
});
assert!(filter.has_mutations());
let project = LogicalOperator::Project(ProjectOp {
projections: vec![],
input: Box::new(base()),
pass_through_input: false,
});
assert!(project.has_mutations());
let agg = LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![],
input: Box::new(base()),
having: None,
});
assert!(agg.has_mutations());
let limit = LogicalOperator::Limit(LimitOp {
count: CountExpr::Literal(10),
input: Box::new(base()),
});
assert!(limit.has_mutations());
let skip = LogicalOperator::Skip(SkipOp {
count: CountExpr::Literal(5),
input: Box::new(base()),
});
assert!(skip.has_mutations());
let sort = LogicalOperator::Sort(SortOp {
keys: vec![],
input: Box::new(base()),
});
assert!(sort.has_mutations());
let distinct = LogicalOperator::Distinct(DistinctOp {
input: Box::new(base()),
columns: None,
});
assert!(distinct.has_mutations());
let unwind = LogicalOperator::Unwind(UnwindOp {
expression: var("xs"),
variable: "x".into(),
ordinality_var: None,
offset_var: None,
input: Box::new(base()),
});
assert!(unwind.has_mutations());
let bind = LogicalOperator::Bind(BindOp {
expression: var("x"),
variable: "y".into(),
input: Box::new(base()),
});
assert!(bind.has_mutations());
let map_collect = LogicalOperator::MapCollect(MapCollectOp {
key_var: "k".into(),
value_var: "v".into(),
alias: "m".into(),
input: Box::new(base()),
});
assert!(map_collect.has_mutations());
let ret = LogicalOperator::Return(ReturnOp {
items: vec![],
distinct: false,
input: Box::new(base()),
});
assert!(ret.has_mutations());
let hagg = LogicalOperator::HorizontalAggregate(HorizontalAggregateOp {
list_column: "_path".into(),
entity_kind: EntityKind::Edge,
function: AggregateFunction::Sum,
property: "weight".into(),
alias: "total".into(),
input: Box::new(base()),
});
assert!(hagg.has_mutations());
let construct = LogicalOperator::Construct(ConstructOp {
templates: vec![],
input: Box::new(base()),
});
assert!(construct.has_mutations());
}
#[test]
fn has_mutations_vector_operators_are_readonly() {
let vscan = LogicalOperator::VectorScan(VectorScanOp {
variable: "m".into(),
index_name: None,
property: "embedding".into(),
label: None,
query_vector: LogicalExpression::Literal(Value::Null),
k: Some(5),
metric: Some(VectorMetric::Cosine),
min_similarity: None,
max_distance: None,
input: None,
});
assert!(!vscan.has_mutations());
let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
input: leaf_node_scan("m"),
left_vector_variable: None,
left_property: None,
query_vector: LogicalExpression::Literal(Value::Null),
right_variable: "n".into(),
right_property: "embedding".into(),
right_label: None,
index_name: None,
k: 3,
metric: None,
min_similarity: None,
max_distance: None,
score_variable: None,
});
assert!(!vjoin.has_mutations());
}
#[test]
fn has_mutations_two_children_and_union_apply() {
let mutating = || *leaf_create_node("jules");
let read = || *leaf_node_scan("jules");
let join_readonly = LogicalOperator::Join(JoinOp {
left: Box::new(read()),
right: Box::new(read()),
join_type: JoinType::Inner,
conditions: vec![],
});
assert!(!join_readonly.has_mutations());
let join_right_mutates = LogicalOperator::Join(JoinOp {
left: Box::new(read()),
right: Box::new(mutating()),
join_type: JoinType::Left,
conditions: vec![],
});
assert!(join_right_mutates.has_mutations());
let left_join = LogicalOperator::LeftJoin(LeftJoinOp {
left: Box::new(mutating()),
right: Box::new(read()),
condition: None,
});
assert!(left_join.has_mutations());
let anti_join = LogicalOperator::AntiJoin(AntiJoinOp {
left: Box::new(read()),
right: Box::new(mutating()),
});
assert!(anti_join.has_mutations());
let except = LogicalOperator::Except(ExceptOp {
left: Box::new(read()),
right: Box::new(read()),
all: true,
});
assert!(!except.has_mutations());
let intersect = LogicalOperator::Intersect(IntersectOp {
left: Box::new(mutating()),
right: Box::new(read()),
all: false,
});
assert!(intersect.has_mutations());
let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
left: Box::new(read()),
right: Box::new(mutating()),
});
assert!(otherwise.has_mutations());
let union = LogicalOperator::Union(UnionOp {
inputs: vec![read(), mutating(), read()],
});
assert!(union.has_mutations());
let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
inputs: vec![read(), read()],
conditions: vec![],
shared_variables: vec!["a".into()],
});
assert!(!mwj.has_mutations());
let apply_readonly = LogicalOperator::Apply(ApplyOp {
input: Box::new(read()),
subplan: Box::new(read()),
shared_variables: vec![],
optional: false,
});
assert!(!apply_readonly.has_mutations());
let apply_inner_mutates = LogicalOperator::Apply(ApplyOp {
input: Box::new(read()),
subplan: Box::new(mutating()),
shared_variables: vec![],
optional: true,
});
assert!(apply_inner_mutates.has_mutations());
}
#[test]
fn has_mutations_leaf_operators_are_readonly() {
assert!(!LogicalOperator::Empty.has_mutations());
assert!(
!LogicalOperator::ParameterScan(ParameterScanOp {
columns: vec!["a".into()],
})
.has_mutations()
);
assert!(
!LogicalOperator::CallProcedure(CallProcedureOp {
name: vec!["grafeo".into(), "pagerank".into()],
arguments: vec![],
yield_items: None,
})
.has_mutations()
);
assert!(
!LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Csv,
with_headers: true,
path: "/tmp/x.csv".into(),
variable: "row".into(),
field_terminator: None,
})
.has_mutations()
);
assert!(
!LogicalOperator::TripleScan(TripleScanOp {
subject: TripleComponent::Variable("s".into()),
predicate: TripleComponent::Variable("p".into()),
object: TripleComponent::Variable("o".into()),
graph: None,
input: None,
dataset: None,
})
.has_mutations()
);
}
#[test]
fn children_of_leaf_operators() {
assert!(LogicalOperator::Empty.children().is_empty());
assert!(
LogicalOperator::CallProcedure(CallProcedureOp {
name: vec!["p".into()],
arguments: vec![],
yield_items: None,
})
.children()
.is_empty()
);
assert!(
LogicalOperator::CreateGraph(CreateGraphOp {
graph: "g".into(),
silent: false,
})
.children()
.is_empty()
);
assert!(
LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Jsonl,
with_headers: false,
path: "x.jsonl".into(),
variable: "r".into(),
field_terminator: None,
})
.children()
.is_empty()
);
}
#[test]
fn children_of_optional_input_operators() {
let ns_no_input = LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: None,
input: None,
});
assert_eq!(ns_no_input.children().len(), 0);
let ns_with_input = LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: None,
input: Some(leaf_empty()),
});
assert_eq!(ns_with_input.children().len(), 1);
let edge_scan_in = LogicalOperator::EdgeScan(EdgeScanOp {
variable: "e".into(),
edge_types: vec![],
input: Some(leaf_empty()),
});
assert_eq!(edge_scan_in.children().len(), 1);
}
#[test]
fn children_of_two_child_operators() {
let join = LogicalOperator::Join(JoinOp {
left: leaf_empty(),
right: leaf_empty(),
join_type: JoinType::Cross,
conditions: vec![],
});
assert_eq!(join.children().len(), 2);
let apply = LogicalOperator::Apply(ApplyOp {
input: leaf_empty(),
subplan: leaf_empty(),
shared_variables: vec![],
optional: false,
});
assert_eq!(apply.children().len(), 2);
let union = LogicalOperator::Union(UnionOp {
inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
});
assert_eq!(union.children().len(), 3);
}
#[test]
fn children_of_modify_returns_where_clause() {
let modify = LogicalOperator::Modify(ModifyOp {
delete_templates: vec![],
insert_templates: vec![],
where_clause: leaf_empty(),
graph: None,
});
assert_eq!(modify.children().len(), 1);
}
#[test]
fn display_label_spot_checks() {
let ns = LogicalOperator::NodeScan(NodeScanOp {
variable: "vincent".into(),
label: Some("Person".into()),
input: None,
});
assert_eq!(ns.display_label(), "vincent:Person");
let ns_no_label = LogicalOperator::NodeScan(NodeScanOp {
variable: "mia".into(),
label: None,
input: None,
});
assert_eq!(ns_no_label.display_label(), "mia:*");
let edge_scan = LogicalOperator::EdgeScan(EdgeScanOp {
variable: "e".into(),
edge_types: vec!["KNOWS".into(), "LIKES".into()],
input: None,
});
assert_eq!(edge_scan.display_label(), "e:KNOWS|LIKES");
let edge_scan_any = LogicalOperator::EdgeScan(EdgeScanOp {
variable: "e".into(),
edge_types: vec![],
input: None,
});
assert_eq!(edge_scan_any.display_label(), "e:*");
let expand = LogicalOperator::Expand(ExpandOp {
from_variable: "a".into(),
to_variable: "b".into(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".into()],
min_hops: 1,
max_hops: Some(1),
input: leaf_node_scan("a"),
path_alias: None,
path_mode: PathMode::Walk,
});
assert_eq!(expand.display_label(), "(a)->[:KNOWS]->(b)");
let expand_in = LogicalOperator::Expand(ExpandOp {
from_variable: "a".into(),
to_variable: "b".into(),
edge_variable: None,
direction: ExpandDirection::Incoming,
edge_types: vec![],
min_hops: 1,
max_hops: Some(1),
input: leaf_node_scan("a"),
path_alias: None,
path_mode: PathMode::Walk,
});
assert_eq!(expand_in.display_label(), "(a)<-[:*]<-(b)");
let expand_both = LogicalOperator::Expand(ExpandOp {
from_variable: "a".into(),
to_variable: "b".into(),
edge_variable: None,
direction: ExpandDirection::Both,
edge_types: vec![],
min_hops: 1,
max_hops: Some(1),
input: leaf_node_scan("a"),
path_alias: None,
path_mode: PathMode::Walk,
});
assert_eq!(expand_both.display_label(), "(a)--[:*]--(b)");
}
#[test]
fn display_label_filter_pushdown_hints() {
let make = |hint: Option<PushdownHint>| {
LogicalOperator::Filter(FilterOp {
predicate: var("x"),
input: leaf_empty(),
pushdown_hint: hint,
})
};
let f_none = make(None);
let s = f_none.display_label();
assert!(!s.contains('['));
let f_index = make(Some(PushdownHint::IndexLookup {
property: "name".into(),
}));
assert!(f_index.display_label().contains("[index: name]"));
let f_range = make(Some(PushdownHint::RangeScan {
property: "age".into(),
}));
assert!(f_range.display_label().contains("[range: age]"));
let f_label = make(Some(PushdownHint::LabelFirst));
assert!(f_label.display_label().contains("[label-first]"));
}
#[test]
fn display_label_projection_join_sort_return() {
let proj = LogicalOperator::Project(ProjectOp {
projections: vec![
Projection {
expression: var("n"),
alias: Some("person".into()),
},
Projection {
expression: LogicalExpression::Property {
variable: "n".into(),
property: "city".into(),
},
alias: None,
},
],
input: leaf_empty(),
pass_through_input: false,
});
let s = proj.display_label();
assert!(s.contains("person"));
assert!(s.contains("n.city"));
let join = LogicalOperator::Join(JoinOp {
left: leaf_empty(),
right: leaf_empty(),
join_type: JoinType::Cross,
conditions: vec![],
});
assert_eq!(join.display_label(), "Cross");
let agg = LogicalOperator::Aggregate(AggregateOp {
group_by: vec![var("city")],
aggregates: vec![],
input: leaf_empty(),
having: None,
});
assert_eq!(agg.display_label(), "group: [city]");
let limit = LogicalOperator::Limit(LimitOp {
count: CountExpr::Literal(10),
input: leaf_empty(),
});
assert_eq!(limit.display_label(), "10");
let skip = LogicalOperator::Skip(SkipOp {
count: CountExpr::Parameter("off".into()),
input: leaf_empty(),
});
assert_eq!(skip.display_label(), "$off");
let sort = LogicalOperator::Sort(SortOp {
keys: vec![
SortKey {
expression: var("a"),
order: SortOrder::Ascending,
nulls: None,
},
SortKey {
expression: var("b"),
order: SortOrder::Descending,
nulls: None,
},
],
input: leaf_empty(),
});
let s = sort.display_label();
assert!(s.contains("a ASC"));
assert!(s.contains("b DESC"));
let distinct = LogicalOperator::Distinct(DistinctOp {
input: leaf_empty(),
columns: None,
});
assert_eq!(distinct.display_label(), "");
let ret = LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: var("n"),
alias: Some("node".into()),
},
ReturnItem {
expression: var("m"),
alias: None,
},
],
distinct: true,
input: leaf_empty(),
});
let s = ret.display_label();
assert!(s.contains("node"));
assert!(s.contains('m'));
}
#[test]
fn display_label_remaining_operators() {
let union = LogicalOperator::Union(UnionOp {
inputs: vec![*leaf_empty(), *leaf_empty()],
});
assert_eq!(union.display_label(), "2 branches");
let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
conditions: vec![],
shared_variables: vec![],
});
assert_eq!(mwj.display_label(), "3 inputs");
let lj = LogicalOperator::LeftJoin(LeftJoinOp {
left: leaf_empty(),
right: leaf_empty(),
condition: None,
});
assert_eq!(lj.display_label(), "");
let aj = LogicalOperator::AntiJoin(AntiJoinOp {
left: leaf_empty(),
right: leaf_empty(),
});
assert_eq!(aj.display_label(), "");
let unwind = LogicalOperator::Unwind(UnwindOp {
expression: var("xs"),
variable: "item".into(),
ordinality_var: None,
offset_var: None,
input: leaf_empty(),
});
assert_eq!(unwind.display_label(), "item");
let bind = LogicalOperator::Bind(BindOp {
expression: var("x"),
variable: "y".into(),
input: leaf_empty(),
});
assert_eq!(bind.display_label(), "y");
let mapc = LogicalOperator::MapCollect(MapCollectOp {
key_var: "k".into(),
value_var: "v".into(),
alias: "counts".into(),
input: leaf_empty(),
});
assert_eq!(mapc.display_label(), "counts");
let sp = LogicalOperator::ShortestPath(ShortestPathOp {
input: leaf_empty(),
source_var: "a".into(),
target_var: "b".into(),
edge_types: vec![],
direction: ExpandDirection::Outgoing,
path_alias: "p".into(),
all_paths: false,
});
assert_eq!(sp.display_label(), "a -> b");
let merge = LogicalOperator::Merge(MergeOp {
variable: "django".into(),
labels: vec![],
match_properties: vec![],
on_create: vec![],
on_match: vec![],
input: leaf_empty(),
});
assert_eq!(merge.display_label(), "django");
let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
variable: "r".into(),
source_variable: "a".into(),
target_variable: "b".into(),
edge_type: "KNOWS".into(),
match_properties: vec![],
on_create: vec![],
on_match: vec![],
input: leaf_empty(),
});
assert_eq!(merge_rel.display_label(), "r");
let cnode = LogicalOperator::CreateNode(CreateNodeOp {
variable: "shosanna".into(),
labels: vec!["Person".into(), "Hero".into()],
properties: vec![],
input: None,
});
assert_eq!(cnode.display_label(), "shosanna:Person:Hero");
let cedge_with = LogicalOperator::CreateEdge(CreateEdgeOp {
variable: Some("r".into()),
from_variable: "a".into(),
to_variable: "b".into(),
edge_type: "KNOWS".into(),
properties: vec![],
input: leaf_empty(),
});
assert_eq!(cedge_with.display_label(), "[r:KNOWS]");
let cedge_without = LogicalOperator::CreateEdge(CreateEdgeOp {
variable: None,
from_variable: "a".into(),
to_variable: "b".into(),
edge_type: "KNOWS".into(),
properties: vec![],
input: leaf_empty(),
});
assert_eq!(cedge_without.display_label(), "[?:KNOWS]");
let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
variable: "hans".into(),
detach: false,
input: leaf_empty(),
});
assert_eq!(dnode.display_label(), "hans");
let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
variable: "r".into(),
input: leaf_empty(),
});
assert_eq!(dedge.display_label(), "r");
let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
variable: "beatrix".into(),
properties: vec![],
replace: false,
is_edge: false,
input: leaf_empty(),
});
assert_eq!(set_prop.display_label(), "beatrix");
let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
variable: "n".into(),
labels: vec!["A".into(), "B".into()],
input: leaf_empty(),
});
assert_eq!(add_lbl.display_label(), "n:A:B");
let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
variable: "n".into(),
labels: vec!["A".into()],
input: leaf_empty(),
});
assert_eq!(rm_lbl.display_label(), "n:A");
let call = LogicalOperator::CallProcedure(CallProcedureOp {
name: vec!["grafeo".into(), "pagerank".into()],
arguments: vec![],
yield_items: None,
});
assert_eq!(call.display_label(), "grafeo.pagerank");
let load = LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Csv,
with_headers: true,
path: "data.csv".into(),
variable: "r".into(),
field_terminator: None,
});
assert_eq!(load.display_label(), "data.csv AS r");
let apply = LogicalOperator::Apply(ApplyOp {
input: leaf_empty(),
subplan: leaf_empty(),
shared_variables: vec![],
optional: false,
});
assert_eq!(apply.display_label(), "");
let vscan = LogicalOperator::VectorScan(VectorScanOp {
variable: "m".into(),
index_name: None,
property: "embedding".into(),
label: None,
query_vector: LogicalExpression::Literal(Value::Null),
k: Some(5),
metric: None,
min_similarity: None,
max_distance: None,
input: None,
});
assert_eq!(vscan.display_label(), "m");
let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
input: leaf_empty(),
left_vector_variable: None,
left_property: None,
query_vector: LogicalExpression::Literal(Value::Null),
right_variable: "t".into(),
right_property: "emb".into(),
right_label: None,
index_name: None,
k: 3,
metric: None,
min_similarity: None,
max_distance: None,
score_variable: None,
});
assert_eq!(vjoin.display_label(), "t");
assert_eq!(LogicalOperator::Empty.display_label(), "");
}
#[test]
fn explain_tree_covers_all_common_arms() {
let ns = LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Person".into()),
input: Some(Box::new(LogicalOperator::Empty)),
});
let out = ns.explain_tree();
assert!(out.contains("NodeScan (n:Person)"));
assert!(out.contains("Empty"));
let ns_star = LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: None,
input: None,
});
assert!(ns_star.explain_tree().contains("NodeScan (n:*)"));
let es = LogicalOperator::EdgeScan(EdgeScanOp {
variable: "e".into(),
edge_types: vec![],
input: None,
});
assert!(es.explain_tree().contains("EdgeScan (e:*)"));
}
#[test]
fn explain_tree_expand_variants() {
let mk = |min, max, dir| {
LogicalOperator::Expand(ExpandOp {
from_variable: "a".into(),
to_variable: "b".into(),
edge_variable: None,
direction: dir,
edge_types: vec!["KNOWS".into()],
min_hops: min,
max_hops: max,
input: leaf_node_scan("a"),
path_alias: None,
path_mode: PathMode::Walk,
})
.explain_tree()
};
let s = mk(1, Some(1), ExpandDirection::Outgoing);
assert!(s.contains("(a)->[:KNOWS]->(b)"));
let s = mk(2, Some(2), ExpandDirection::Incoming);
assert!(s.contains("*2"));
assert!(s.contains("<-"));
let s = mk(1, Some(3), ExpandDirection::Both);
assert!(s.contains("*1..3"));
assert!(s.contains("--"));
let s = mk(2, None, ExpandDirection::Outgoing);
assert!(s.contains("*2.."));
}
#[test]
fn explain_tree_filter_with_all_hints() {
let base = || {
LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".into(),
property: "age".into(),
}),
op: BinaryOp::Eq,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: leaf_node_scan("n"),
pushdown_hint: None,
})
};
let mut f = base();
if let LogicalOperator::Filter(ref mut op) = f {
op.pushdown_hint = Some(PushdownHint::IndexLookup {
property: "age".into(),
});
}
assert!(f.explain_tree().contains("[index: age]"));
if let LogicalOperator::Filter(ref mut op) = f {
op.pushdown_hint = Some(PushdownHint::RangeScan {
property: "age".into(),
});
}
assert!(f.explain_tree().contains("[range: age]"));
if let LogicalOperator::Filter(ref mut op) = f {
op.pushdown_hint = Some(PushdownHint::LabelFirst);
}
assert!(f.explain_tree().contains("[label-first]"));
}
#[test]
fn explain_tree_projection_aggregate_sort_return() {
let proj = LogicalOperator::Project(ProjectOp {
projections: vec![
Projection {
expression: var("n"),
alias: Some("who".into()),
},
Projection {
expression: var("m"),
alias: None,
},
],
input: leaf_empty(),
pass_through_input: true,
});
let s = proj.explain_tree();
assert!(s.contains("Project"));
assert!(s.contains("n AS who"));
let agg = LogicalOperator::Aggregate(AggregateOp {
group_by: vec![var("city")],
aggregates: vec![
AggregateExpr {
function: AggregateFunction::Count,
expression: None,
expression2: None,
distinct: false,
alias: Some("c".into()),
percentile: None,
separator: None,
},
AggregateExpr {
function: AggregateFunction::Sum,
expression: Some(var("x")),
expression2: None,
distinct: false,
alias: None,
percentile: None,
separator: None,
},
],
input: leaf_empty(),
having: None,
});
let s = agg.explain_tree();
assert!(s.contains("Aggregate"));
assert!(s.contains("count(...) AS c"));
assert!(s.contains("sum(...)"));
let sort = LogicalOperator::Sort(SortOp {
keys: vec![SortKey {
expression: var("age"),
order: SortOrder::Descending,
nulls: None,
}],
input: leaf_empty(),
});
assert!(sort.explain_tree().contains("age DESC"));
let ret_distinct = LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: var("n"),
alias: Some("who".into()),
}],
distinct: true,
input: leaf_empty(),
});
let s = ret_distinct.explain_tree();
assert!(s.contains("Return DISTINCT"));
assert!(s.contains("n AS who"));
let limit = LogicalOperator::Limit(LimitOp {
count: CountExpr::Literal(5),
input: leaf_empty(),
});
assert!(limit.explain_tree().contains("Limit (5)"));
let skip = LogicalOperator::Skip(SkipOp {
count: CountExpr::Literal(2),
input: leaf_empty(),
});
assert!(skip.explain_tree().contains("Skip (2)"));
let distinct = LogicalOperator::Distinct(DistinctOp {
input: leaf_empty(),
columns: None,
});
assert!(distinct.explain_tree().contains("Distinct"));
}
#[test]
fn explain_tree_joins_and_set_ops() {
let join = LogicalOperator::Join(JoinOp {
left: leaf_empty(),
right: leaf_empty(),
join_type: JoinType::Inner,
conditions: vec![],
});
assert!(join.explain_tree().contains("Join (Inner)"));
let left_join_cond = LogicalOperator::LeftJoin(LeftJoinOp {
left: leaf_empty(),
right: leaf_empty(),
condition: Some(var("x")),
});
assert!(
left_join_cond
.explain_tree()
.contains("LeftJoin (condition:")
);
let left_join_none = LogicalOperator::LeftJoin(LeftJoinOp {
left: leaf_empty(),
right: leaf_empty(),
condition: None,
});
let s = left_join_none.explain_tree();
assert!(s.contains("LeftJoin"));
assert!(!s.contains("condition:"));
let anti = LogicalOperator::AntiJoin(AntiJoinOp {
left: leaf_empty(),
right: leaf_empty(),
});
assert!(anti.explain_tree().contains("AntiJoin"));
let union = LogicalOperator::Union(UnionOp {
inputs: vec![*leaf_empty(), *leaf_empty()],
});
assert!(union.explain_tree().contains("Union (2 branches)"));
let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
inputs: vec![*leaf_empty(), *leaf_empty()],
conditions: vec![],
shared_variables: vec!["a".into(), "b".into()],
});
let s = mwj.explain_tree();
assert!(s.contains("MultiWayJoin"));
assert!(s.contains("shared: [a, b]"));
let except_all = LogicalOperator::Except(ExceptOp {
left: leaf_empty(),
right: leaf_empty(),
all: true,
});
assert!(except_all.explain_tree().contains("Except ALL"));
let except = LogicalOperator::Except(ExceptOp {
left: leaf_empty(),
right: leaf_empty(),
all: false,
});
assert!(except.explain_tree().contains("Except\n"));
let inter_all = LogicalOperator::Intersect(IntersectOp {
left: leaf_empty(),
right: leaf_empty(),
all: true,
});
assert!(inter_all.explain_tree().contains("Intersect ALL"));
let inter = LogicalOperator::Intersect(IntersectOp {
left: leaf_empty(),
right: leaf_empty(),
all: false,
});
assert!(inter.explain_tree().contains("Intersect\n"));
let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
left: leaf_empty(),
right: leaf_empty(),
});
assert!(otherwise.explain_tree().contains("Otherwise"));
}
#[test]
fn explain_tree_unwind_bind_mapcollect_apply_sp() {
let unwind = LogicalOperator::Unwind(UnwindOp {
expression: var("xs"),
variable: "item".into(),
ordinality_var: None,
offset_var: None,
input: leaf_empty(),
});
assert!(unwind.explain_tree().contains("Unwind (item)"));
let bind = LogicalOperator::Bind(BindOp {
expression: var("x"),
variable: "y".into(),
input: leaf_empty(),
});
assert!(bind.explain_tree().contains("Bind (y)"));
let mapc = LogicalOperator::MapCollect(MapCollectOp {
key_var: "k".into(),
value_var: "v".into(),
alias: "m".into(),
input: leaf_empty(),
});
let s = mapc.explain_tree();
assert!(s.contains("MapCollect"));
assert!(s.contains("k -> v AS m"));
let apply = LogicalOperator::Apply(ApplyOp {
input: leaf_empty(),
subplan: leaf_empty(),
shared_variables: vec!["a".into()],
optional: true,
});
assert!(apply.explain_tree().contains("Apply"));
let sp = LogicalOperator::ShortestPath(ShortestPathOp {
input: leaf_empty(),
source_var: "a".into(),
target_var: "b".into(),
edge_types: vec![],
direction: ExpandDirection::Outgoing,
path_alias: "p".into(),
all_paths: false,
});
assert!(sp.explain_tree().contains("ShortestPath (a -> b)"));
}
#[test]
fn explain_tree_mutations() {
let merge = LogicalOperator::Merge(MergeOp {
variable: "vincent".into(),
labels: vec!["Person".into()],
match_properties: vec![],
on_create: vec![],
on_match: vec![],
input: leaf_empty(),
});
assert!(merge.explain_tree().contains("Merge (vincent)"));
let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
variable: "r".into(),
source_variable: "a".into(),
target_variable: "b".into(),
edge_type: "KNOWS".into(),
match_properties: vec![],
on_create: vec![],
on_match: vec![],
input: leaf_empty(),
});
assert!(merge_rel.explain_tree().contains("MergeRelationship (r)"));
let cnode = LogicalOperator::CreateNode(CreateNodeOp {
variable: "mia".into(),
labels: vec!["Person".into()],
properties: vec![],
input: Some(leaf_empty()),
});
let s = cnode.explain_tree();
assert!(s.contains("CreateNode (mia:Person)"));
assert!(s.contains("Empty"));
let cnode_no_input = LogicalOperator::CreateNode(CreateNodeOp {
variable: "mia".into(),
labels: vec![],
properties: vec![],
input: None,
});
assert!(cnode_no_input.explain_tree().contains("CreateNode (mia:)"));
let cedge = LogicalOperator::CreateEdge(CreateEdgeOp {
variable: Some("r".into()),
from_variable: "a".into(),
to_variable: "b".into(),
edge_type: "KNOWS".into(),
properties: vec![],
input: leaf_empty(),
});
assert!(
cedge
.explain_tree()
.contains("CreateEdge (a)-[r:KNOWS]->(b)")
);
let cedge_anon = LogicalOperator::CreateEdge(CreateEdgeOp {
variable: None,
from_variable: "a".into(),
to_variable: "b".into(),
edge_type: "KNOWS".into(),
properties: vec![],
input: leaf_empty(),
});
assert!(cedge_anon.explain_tree().contains("[?:KNOWS]"));
let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
variable: "butch".into(),
detach: true,
input: leaf_empty(),
});
assert!(dnode.explain_tree().contains("DeleteNode (butch)"));
let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
variable: "r".into(),
input: leaf_empty(),
});
assert!(dedge.explain_tree().contains("DeleteEdge (r)"));
let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
variable: "n".into(),
properties: vec![("name".into(), var("x")), ("age".into(), var("y"))],
replace: false,
is_edge: false,
input: leaf_empty(),
});
let s = set_prop.explain_tree();
assert!(s.contains("SetProperty"));
assert!(s.contains("n.name"));
assert!(s.contains("n.age"));
let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
variable: "n".into(),
labels: vec!["A".into()],
input: leaf_empty(),
});
assert!(add_lbl.explain_tree().contains("AddLabel (n:A)"));
let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
variable: "n".into(),
labels: vec!["A".into(), "B".into()],
input: leaf_empty(),
});
assert!(rm_lbl.explain_tree().contains("RemoveLabel (n:A:B)"));
}
#[test]
fn explain_tree_call_and_load_data() {
let call = LogicalOperator::CallProcedure(CallProcedureOp {
name: vec!["grafeo".into(), "pagerank".into()],
arguments: vec![],
yield_items: None,
});
assert!(
call.explain_tree()
.contains("CallProcedure (grafeo.pagerank)")
);
let csv = LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Csv,
with_headers: true,
path: "data.csv".into(),
variable: "row".into(),
field_terminator: None,
});
let s = csv.explain_tree();
assert!(s.contains("LoadCsv"));
assert!(s.contains("WITH HEADERS"));
assert!(s.contains("data.csv"));
assert!(s.contains("AS row"));
let csv_no_hdr = LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Csv,
with_headers: false,
path: "data.csv".into(),
variable: "row".into(),
field_terminator: None,
});
assert!(!csv_no_hdr.explain_tree().contains("WITH HEADERS"));
let jsonl = LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Jsonl,
with_headers: false,
path: "data.jsonl".into(),
variable: "r".into(),
field_terminator: None,
});
assert!(jsonl.explain_tree().contains("LoadJsonl"));
let parquet = LogicalOperator::LoadData(LoadDataOp {
format: LoadDataFormat::Parquet,
with_headers: false,
path: "data.parquet".into(),
variable: "r".into(),
field_terminator: None,
});
assert!(parquet.explain_tree().contains("LoadParquet"));
}
#[test]
fn explain_tree_triple_scan_and_fallback() {
let ts = LogicalOperator::TripleScan(TripleScanOp {
subject: TripleComponent::Variable("s".into()),
predicate: TripleComponent::Iri("http://ex/p".into()),
object: TripleComponent::Literal(Value::Int64(5)),
graph: None,
input: Some(leaf_empty()),
dataset: None,
});
let s = ts.explain_tree();
assert!(s.contains("TripleScan"));
assert!(s.contains("?s"));
assert!(s.contains("<http://ex/p>"));
assert!(s.contains("Empty"));
let ts_no_input = LogicalOperator::TripleScan(TripleScanOp {
subject: TripleComponent::Variable("s".into()),
predicate: TripleComponent::Variable("p".into()),
object: TripleComponent::Variable("o".into()),
graph: None,
input: None,
dataset: None,
});
assert!(ts_no_input.explain_tree().contains("TripleScan"));
let graph_op = LogicalOperator::CreateGraph(CreateGraphOp {
graph: "g".into(),
silent: false,
});
let out = graph_op.explain_tree();
assert!(!out.is_empty());
}
#[test]
fn fmt_expr_covers_common_variants() {
let v = var("n");
assert_eq!(fmt_expr(&v), "n");
let p = LogicalExpression::Property {
variable: "n".into(),
property: "age".into(),
};
assert_eq!(fmt_expr(&p), "n.age");
let lit = LogicalExpression::Literal(Value::Int64(42));
assert_eq!(fmt_expr(&lit), "42");
let bin = LogicalExpression::Binary {
left: Box::new(var("a")),
op: BinaryOp::Eq,
right: Box::new(LogicalExpression::Literal(Value::Int64(1))),
};
let s = fmt_expr(&bin);
assert!(s.contains("Eq"));
assert!(s.contains('a'));
let un = LogicalExpression::Unary {
op: UnaryOp::Not,
operand: Box::new(var("a")),
};
let s = fmt_expr(&un);
assert!(s.contains("Not"));
let fc = LogicalExpression::FunctionCall {
name: "toLower".into(),
args: vec![var("name")],
distinct: false,
};
assert_eq!(fmt_expr(&fc), "toLower(name)");
let list = LogicalExpression::List(vec![var("a")]);
let out = fmt_expr(&list);
assert!(out.contains("List") || out.contains('['));
}
#[test]
fn fmt_triple_component_variants() {
assert_eq!(
fmt_triple_component(&TripleComponent::Variable("s".into())),
"?s"
);
assert_eq!(
fmt_triple_component(&TripleComponent::Iri("http://ex/p".into())),
"<http://ex/p>"
);
assert!(fmt_triple_component(&TripleComponent::Literal(Value::Int64(10))).contains("10"));
assert_eq!(
fmt_triple_component(&TripleComponent::LangLiteral {
value: "hello".into(),
lang: "en".into(),
}),
"\"hello\"@en"
);
assert_eq!(
fmt_triple_component(&TripleComponent::BlankNode("b0".into())),
"_:b0"
);
}
#[test]
fn triple_component_as_variable() {
assert_eq!(
TripleComponent::Variable("s".into()).as_variable(),
Some("s")
);
assert_eq!(
TripleComponent::Iri("http://ex/p".into()).as_variable(),
None
);
assert_eq!(
TripleComponent::Literal(Value::Int64(1)).as_variable(),
None
);
assert_eq!(TripleComponent::BlankNode("b".into()).as_variable(), None);
assert_eq!(
TripleComponent::LangLiteral {
value: "v".into(),
lang: "en".into(),
}
.as_variable(),
None
);
}
}