use std::collections::HashMap;
use std::fmt;
use grafeo_common::types::Value;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CountExpr {
Literal(usize),
Parameter(String),
}
impl CountExpr {
pub fn value(&self) -> usize {
match self {
Self::Literal(n) => *n,
Self::Parameter(name) => panic!("Unresolved parameter: ${name}"),
}
}
pub fn try_value(&self) -> Result<usize, String> {
match self {
Self::Literal(n) => Ok(*n),
Self::Parameter(name) => Err(format!("Unresolved SKIP/LIMIT parameter: ${name}")),
}
}
pub fn estimate(&self) -> f64 {
match self {
Self::Literal(n) => *n as f64,
Self::Parameter(_) => 10.0, }
}
}
impl fmt::Display for CountExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Literal(n) => write!(f, "{n}"),
Self::Parameter(name) => write!(f, "${name}"),
}
}
}
impl From<usize> for CountExpr {
fn from(n: usize) -> Self {
Self::Literal(n)
}
}
impl PartialEq<usize> for CountExpr {
fn eq(&self, other: &usize) -> bool {
matches!(self, Self::Literal(n) if n == other)
}
}
#[derive(Debug, Clone)]
pub struct LogicalPlan {
pub root: LogicalOperator,
pub explain: bool,
pub profile: bool,
pub default_params: HashMap<String, Value>,
}
impl LogicalPlan {
pub fn new(root: LogicalOperator) -> Self {
Self {
root,
explain: false,
profile: false,
default_params: HashMap::new(),
}
}
pub fn explain(root: LogicalOperator) -> Self {
Self {
root,
explain: true,
profile: false,
default_params: HashMap::new(),
}
}
pub fn profile(root: LogicalOperator) -> Self {
Self {
root,
explain: false,
profile: true,
default_params: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub enum LogicalOperator {
NodeScan(NodeScanOp),
EdgeScan(EdgeScanOp),
Expand(ExpandOp),
Filter(FilterOp),
Project(ProjectOp),
Join(JoinOp),
Aggregate(AggregateOp),
Limit(LimitOp),
Skip(SkipOp),
Sort(SortOp),
Distinct(DistinctOp),
CreateNode(CreateNodeOp),
CreateEdge(CreateEdgeOp),
DeleteNode(DeleteNodeOp),
DeleteEdge(DeleteEdgeOp),
SetProperty(SetPropertyOp),
AddLabel(AddLabelOp),
RemoveLabel(RemoveLabelOp),
Return(ReturnOp),
Empty,
TripleScan(TripleScanOp),
Union(UnionOp),
LeftJoin(LeftJoinOp),
AntiJoin(AntiJoinOp),
Bind(BindOp),
Unwind(UnwindOp),
MapCollect(MapCollectOp),
Merge(MergeOp),
MergeRelationship(MergeRelationshipOp),
ShortestPath(ShortestPathOp),
InsertTriple(InsertTripleOp),
DeleteTriple(DeleteTripleOp),
Modify(ModifyOp),
ClearGraph(ClearGraphOp),
CreateGraph(CreateGraphOp),
DropGraph(DropGraphOp),
LoadGraph(LoadGraphOp),
CopyGraph(CopyGraphOp),
MoveGraph(MoveGraphOp),
AddGraph(AddGraphOp),
HorizontalAggregate(HorizontalAggregateOp),
VectorScan(VectorScanOp),
VectorJoin(VectorJoinOp),
Except(ExceptOp),
Intersect(IntersectOp),
Otherwise(OtherwiseOp),
Apply(ApplyOp),
ParameterScan(ParameterScanOp),
CreatePropertyGraph(CreatePropertyGraphOp),
MultiWayJoin(MultiWayJoinOp),
CallProcedure(CallProcedureOp),
LoadData(LoadDataOp),
}
impl LogicalOperator {
#[must_use]
pub fn has_mutations(&self) -> bool {
match self {
Self::CreateNode(_)
| Self::CreateEdge(_)
| Self::DeleteNode(_)
| Self::DeleteEdge(_)
| Self::SetProperty(_)
| Self::AddLabel(_)
| Self::RemoveLabel(_)
| Self::Merge(_)
| Self::MergeRelationship(_)
| Self::InsertTriple(_)
| Self::DeleteTriple(_)
| Self::Modify(_)
| Self::ClearGraph(_)
| Self::CreateGraph(_)
| Self::DropGraph(_)
| Self::LoadGraph(_)
| Self::CopyGraph(_)
| Self::MoveGraph(_)
| Self::AddGraph(_)
| Self::CreatePropertyGraph(_) => true,
Self::Filter(op) => op.input.has_mutations(),
Self::Project(op) => op.input.has_mutations(),
Self::Aggregate(op) => op.input.has_mutations(),
Self::Limit(op) => op.input.has_mutations(),
Self::Skip(op) => op.input.has_mutations(),
Self::Sort(op) => op.input.has_mutations(),
Self::Distinct(op) => op.input.has_mutations(),
Self::Unwind(op) => op.input.has_mutations(),
Self::Bind(op) => op.input.has_mutations(),
Self::MapCollect(op) => op.input.has_mutations(),
Self::Return(op) => op.input.has_mutations(),
Self::HorizontalAggregate(op) => op.input.has_mutations(),
Self::VectorScan(_) | Self::VectorJoin(_) => false,
Self::Join(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::LeftJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::AntiJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Except(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Intersect(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Otherwise(op) => op.left.has_mutations() || op.right.has_mutations(),
Self::Union(op) => op.inputs.iter().any(|i| i.has_mutations()),
Self::MultiWayJoin(op) => op.inputs.iter().any(|i| i.has_mutations()),
Self::Apply(op) => op.input.has_mutations() || op.subplan.has_mutations(),
Self::NodeScan(_)
| Self::EdgeScan(_)
| Self::Expand(_)
| Self::TripleScan(_)
| Self::ShortestPath(_)
| Self::Empty
| Self::ParameterScan(_)
| Self::CallProcedure(_)
| Self::LoadData(_) => false,
}
}
#[must_use]
pub fn children(&self) -> Vec<&LogicalOperator> {
match self {
Self::NodeScan(op) => op.input.as_deref().into_iter().collect(),
Self::EdgeScan(op) => op.input.as_deref().into_iter().collect(),
Self::TripleScan(op) => op.input.as_deref().into_iter().collect(),
Self::VectorScan(op) => op.input.as_deref().into_iter().collect(),
Self::CreateNode(op) => op.input.as_deref().into_iter().collect(),
Self::InsertTriple(op) => op.input.as_deref().into_iter().collect(),
Self::DeleteTriple(op) => op.input.as_deref().into_iter().collect(),
Self::Expand(op) => vec![&*op.input],
Self::Filter(op) => vec![&*op.input],
Self::Project(op) => vec![&*op.input],
Self::Aggregate(op) => vec![&*op.input],
Self::Limit(op) => vec![&*op.input],
Self::Skip(op) => vec![&*op.input],
Self::Sort(op) => vec![&*op.input],
Self::Distinct(op) => vec![&*op.input],
Self::Return(op) => vec![&*op.input],
Self::Unwind(op) => vec![&*op.input],
Self::Bind(op) => vec![&*op.input],
Self::MapCollect(op) => vec![&*op.input],
Self::ShortestPath(op) => vec![&*op.input],
Self::Merge(op) => vec![&*op.input],
Self::MergeRelationship(op) => vec![&*op.input],
Self::CreateEdge(op) => vec![&*op.input],
Self::DeleteNode(op) => vec![&*op.input],
Self::DeleteEdge(op) => vec![&*op.input],
Self::SetProperty(op) => vec![&*op.input],
Self::AddLabel(op) => vec![&*op.input],
Self::RemoveLabel(op) => vec![&*op.input],
Self::HorizontalAggregate(op) => vec![&*op.input],
Self::VectorJoin(op) => vec![&*op.input],
Self::Modify(op) => vec![&*op.where_clause],
Self::Join(op) => vec![&*op.left, &*op.right],
Self::LeftJoin(op) => vec![&*op.left, &*op.right],
Self::AntiJoin(op) => vec![&*op.left, &*op.right],
Self::Except(op) => vec![&*op.left, &*op.right],
Self::Intersect(op) => vec![&*op.left, &*op.right],
Self::Otherwise(op) => vec![&*op.left, &*op.right],
Self::Apply(op) => vec![&*op.input, &*op.subplan],
Self::Union(op) => op.inputs.iter().collect(),
Self::MultiWayJoin(op) => op.inputs.iter().collect(),
Self::Empty
| Self::ParameterScan(_)
| Self::CallProcedure(_)
| Self::ClearGraph(_)
| Self::CreateGraph(_)
| Self::DropGraph(_)
| Self::LoadGraph(_)
| Self::CopyGraph(_)
| Self::MoveGraph(_)
| Self::AddGraph(_)
| Self::CreatePropertyGraph(_)
| Self::LoadData(_) => vec![],
}
}
#[must_use]
pub fn display_label(&self) -> String {
match self {
Self::NodeScan(op) => {
let label = op.label.as_deref().unwrap_or("*");
format!("{}:{}", op.variable, label)
}
Self::EdgeScan(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
format!("{}:{}", op.variable, types)
}
Self::Expand(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let dir = match op.direction {
ExpandDirection::Outgoing => "->",
ExpandDirection::Incoming => "<-",
ExpandDirection::Both => "--",
};
format!(
"({from}){dir}[:{types}]{dir}({to})",
from = op.from_variable,
to = op.to_variable,
)
}
Self::Filter(op) => {
let hint = match &op.pushdown_hint {
Some(PushdownHint::IndexLookup { property }) => {
format!(" [index: {property}]")
}
Some(PushdownHint::RangeScan { property }) => {
format!(" [range: {property}]")
}
Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
None => String::new(),
};
format!("{}{hint}", fmt_expr(&op.predicate))
}
Self::Project(op) => {
let cols: Vec<String> = op
.projections
.iter()
.map(|p| match &p.alias {
Some(alias) => alias.clone(),
None => fmt_expr(&p.expression),
})
.collect();
cols.join(", ")
}
Self::Join(op) => format!("{:?}", op.join_type),
Self::Aggregate(op) => {
let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
format!("group: [{}]", groups.join(", "))
}
Self::Limit(op) => format!("{}", op.count),
Self::Skip(op) => format!("{}", op.count),
Self::Sort(op) => {
let keys: Vec<String> = op
.keys
.iter()
.map(|k| {
let dir = match k.order {
SortOrder::Ascending => "ASC",
SortOrder::Descending => "DESC",
};
format!("{} {dir}", fmt_expr(&k.expression))
})
.collect();
keys.join(", ")
}
Self::Distinct(_) => String::new(),
Self::Return(op) => {
let items: Vec<String> = op
.items
.iter()
.map(|item| match &item.alias {
Some(alias) => alias.clone(),
None => fmt_expr(&item.expression),
})
.collect();
items.join(", ")
}
Self::Union(op) => format!("{} branches", op.inputs.len()),
Self::MultiWayJoin(op) => {
format!("{} inputs", op.inputs.len())
}
Self::LeftJoin(_) => String::new(),
Self::AntiJoin(_) => String::new(),
Self::Unwind(op) => op.variable.clone(),
Self::Bind(op) => op.variable.clone(),
Self::MapCollect(op) => op.alias.clone(),
Self::ShortestPath(op) => {
format!("{} -> {}", op.source_var, op.target_var)
}
Self::Merge(op) => op.variable.clone(),
Self::MergeRelationship(op) => op.variable.clone(),
Self::CreateNode(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::CreateEdge(op) => {
format!(
"[{}:{}]",
op.variable.as_deref().unwrap_or("?"),
op.edge_type
)
}
Self::DeleteNode(op) => op.variable.clone(),
Self::DeleteEdge(op) => op.variable.clone(),
Self::SetProperty(op) => op.variable.clone(),
Self::AddLabel(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::RemoveLabel(op) => {
let labels = op.labels.join(":");
format!("{}:{labels}", op.variable)
}
Self::CallProcedure(op) => op.name.join("."),
Self::LoadData(op) => format!("{} AS {}", op.path, op.variable),
Self::Apply(_) => String::new(),
Self::VectorScan(op) => op.variable.clone(),
Self::VectorJoin(op) => op.right_variable.clone(),
_ => String::new(),
}
}
}
impl LogicalOperator {
pub fn explain_tree(&self) -> String {
let mut output = String::new();
self.fmt_tree(&mut output, 0);
output
}
fn fmt_tree(&self, out: &mut String, depth: usize) {
use std::fmt::Write;
let indent = " ".repeat(depth);
match self {
Self::NodeScan(op) => {
let label = op.label.as_deref().unwrap_or("*");
let _ = writeln!(out, "{indent}NodeScan ({var}:{label})", var = op.variable);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::EdgeScan(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let _ = writeln!(out, "{indent}EdgeScan ({var}:{types})", var = op.variable);
}
Self::Expand(op) => {
let types = if op.edge_types.is_empty() {
"*".to_string()
} else {
op.edge_types.join("|")
};
let dir = match op.direction {
ExpandDirection::Outgoing => "->",
ExpandDirection::Incoming => "<-",
ExpandDirection::Both => "--",
};
let hops = match (op.min_hops, op.max_hops) {
(1, Some(1)) => String::new(),
(min, Some(max)) if min == max => format!("*{min}"),
(min, Some(max)) => format!("*{min}..{max}"),
(min, None) => format!("*{min}.."),
};
let _ = writeln!(
out,
"{indent}Expand ({from}){dir}[:{types}{hops}]{dir}({to})",
from = op.from_variable,
to = op.to_variable,
);
op.input.fmt_tree(out, depth + 1);
}
Self::Filter(op) => {
let hint = match &op.pushdown_hint {
Some(PushdownHint::IndexLookup { property }) => {
format!(" [index: {property}]")
}
Some(PushdownHint::RangeScan { property }) => {
format!(" [range: {property}]")
}
Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
None => String::new(),
};
let _ = writeln!(
out,
"{indent}Filter ({expr}){hint}",
expr = fmt_expr(&op.predicate)
);
op.input.fmt_tree(out, depth + 1);
}
Self::Project(op) => {
let cols: Vec<String> = op
.projections
.iter()
.map(|p| {
let expr = fmt_expr(&p.expression);
match &p.alias {
Some(alias) => format!("{expr} AS {alias}"),
None => expr,
}
})
.collect();
let _ = writeln!(out, "{indent}Project ({cols})", cols = cols.join(", "));
op.input.fmt_tree(out, depth + 1);
}
Self::Join(op) => {
let _ = writeln!(out, "{indent}Join ({ty:?})", ty = op.join_type);
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Aggregate(op) => {
let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
let aggs: Vec<String> = op
.aggregates
.iter()
.map(|a| {
let func = format!("{:?}", a.function).to_lowercase();
match &a.alias {
Some(alias) => format!("{func}(...) AS {alias}"),
None => format!("{func}(...)"),
}
})
.collect();
let _ = writeln!(
out,
"{indent}Aggregate (group: [{groups}], aggs: [{aggs}])",
groups = groups.join(", "),
aggs = aggs.join(", "),
);
op.input.fmt_tree(out, depth + 1);
}
Self::Limit(op) => {
let _ = writeln!(out, "{indent}Limit ({})", op.count);
op.input.fmt_tree(out, depth + 1);
}
Self::Skip(op) => {
let _ = writeln!(out, "{indent}Skip ({})", op.count);
op.input.fmt_tree(out, depth + 1);
}
Self::Sort(op) => {
let keys: Vec<String> = op
.keys
.iter()
.map(|k| {
let dir = match k.order {
SortOrder::Ascending => "ASC",
SortOrder::Descending => "DESC",
};
format!("{} {dir}", fmt_expr(&k.expression))
})
.collect();
let _ = writeln!(out, "{indent}Sort ({keys})", keys = keys.join(", "));
op.input.fmt_tree(out, depth + 1);
}
Self::Distinct(op) => {
let _ = writeln!(out, "{indent}Distinct");
op.input.fmt_tree(out, depth + 1);
}
Self::Return(op) => {
let items: Vec<String> = op
.items
.iter()
.map(|item| {
let expr = fmt_expr(&item.expression);
match &item.alias {
Some(alias) => format!("{expr} AS {alias}"),
None => expr,
}
})
.collect();
let distinct = if op.distinct { " DISTINCT" } else { "" };
let _ = writeln!(
out,
"{indent}Return{distinct} ({items})",
items = items.join(", ")
);
op.input.fmt_tree(out, depth + 1);
}
Self::Union(op) => {
let _ = writeln!(out, "{indent}Union ({n} branches)", n = op.inputs.len());
for input in &op.inputs {
input.fmt_tree(out, depth + 1);
}
}
Self::MultiWayJoin(op) => {
let vars = op.shared_variables.join(", ");
let _ = writeln!(
out,
"{indent}MultiWayJoin ({n} inputs, shared: [{vars}])",
n = op.inputs.len()
);
for input in &op.inputs {
input.fmt_tree(out, depth + 1);
}
}
Self::LeftJoin(op) => {
if let Some(cond) = &op.condition {
let _ = writeln!(out, "{indent}LeftJoin (condition: {cond:?})");
} else {
let _ = writeln!(out, "{indent}LeftJoin");
}
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::AntiJoin(op) => {
let _ = writeln!(out, "{indent}AntiJoin");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Unwind(op) => {
let _ = writeln!(out, "{indent}Unwind ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::Bind(op) => {
let _ = writeln!(out, "{indent}Bind ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::MapCollect(op) => {
let _ = writeln!(
out,
"{indent}MapCollect ({key} -> {val} AS {alias})",
key = op.key_var,
val = op.value_var,
alias = op.alias
);
op.input.fmt_tree(out, depth + 1);
}
Self::Apply(op) => {
let _ = writeln!(out, "{indent}Apply");
op.input.fmt_tree(out, depth + 1);
op.subplan.fmt_tree(out, depth + 1);
}
Self::Except(op) => {
let all = if op.all { " ALL" } else { "" };
let _ = writeln!(out, "{indent}Except{all}");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Intersect(op) => {
let all = if op.all { " ALL" } else { "" };
let _ = writeln!(out, "{indent}Intersect{all}");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::Otherwise(op) => {
let _ = writeln!(out, "{indent}Otherwise");
op.left.fmt_tree(out, depth + 1);
op.right.fmt_tree(out, depth + 1);
}
Self::ShortestPath(op) => {
let _ = writeln!(
out,
"{indent}ShortestPath ({from} -> {to})",
from = op.source_var,
to = op.target_var
);
op.input.fmt_tree(out, depth + 1);
}
Self::Merge(op) => {
let _ = writeln!(out, "{indent}Merge ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::MergeRelationship(op) => {
let _ = writeln!(out, "{indent}MergeRelationship ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::CreateNode(op) => {
let labels = op.labels.join(":");
let _ = writeln!(
out,
"{indent}CreateNode ({var}:{labels})",
var = op.variable
);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::CreateEdge(op) => {
let var = op.variable.as_deref().unwrap_or("?");
let _ = writeln!(
out,
"{indent}CreateEdge ({from})-[{var}:{ty}]->({to})",
from = op.from_variable,
ty = op.edge_type,
to = op.to_variable
);
op.input.fmt_tree(out, depth + 1);
}
Self::DeleteNode(op) => {
let _ = writeln!(out, "{indent}DeleteNode ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::DeleteEdge(op) => {
let _ = writeln!(out, "{indent}DeleteEdge ({var})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::SetProperty(op) => {
let props: Vec<String> = op
.properties
.iter()
.map(|(k, _)| format!("{}.{k}", op.variable))
.collect();
let _ = writeln!(
out,
"{indent}SetProperty ({props})",
props = props.join(", ")
);
op.input.fmt_tree(out, depth + 1);
}
Self::AddLabel(op) => {
let labels = op.labels.join(":");
let _ = writeln!(out, "{indent}AddLabel ({var}:{labels})", var = op.variable);
op.input.fmt_tree(out, depth + 1);
}
Self::RemoveLabel(op) => {
let labels = op.labels.join(":");
let _ = writeln!(
out,
"{indent}RemoveLabel ({var}:{labels})",
var = op.variable
);
op.input.fmt_tree(out, depth + 1);
}
Self::CallProcedure(op) => {
let _ = writeln!(
out,
"{indent}CallProcedure ({name})",
name = op.name.join(".")
);
}
Self::LoadData(op) => {
let format_name = match op.format {
LoadDataFormat::Csv => "LoadCsv",
LoadDataFormat::Jsonl => "LoadJsonl",
LoadDataFormat::Parquet => "LoadParquet",
};
let headers = if op.with_headers && op.format == LoadDataFormat::Csv {
" WITH HEADERS"
} else {
""
};
let _ = writeln!(
out,
"{indent}{format_name}{headers} ('{path}' AS {var})",
path = op.path,
var = op.variable,
);
}
Self::TripleScan(op) => {
let _ = writeln!(
out,
"{indent}TripleScan ({s} {p} {o})",
s = fmt_triple_component(&op.subject),
p = fmt_triple_component(&op.predicate),
o = fmt_triple_component(&op.object)
);
if let Some(input) = &op.input {
input.fmt_tree(out, depth + 1);
}
}
Self::Empty => {
let _ = writeln!(out, "{indent}Empty");
}
_ => {
let _ = writeln!(out, "{indent}{:?}", std::mem::discriminant(self));
}
}
}
}
fn fmt_expr(expr: &LogicalExpression) -> String {
match expr {
LogicalExpression::Variable(name) => name.clone(),
LogicalExpression::Property { variable, property } => format!("{variable}.{property}"),
LogicalExpression::Literal(val) => format!("{val}"),
LogicalExpression::Binary { left, op, right } => {
format!("{} {op:?} {}", fmt_expr(left), fmt_expr(right))
}
LogicalExpression::Unary { op, operand } => {
format!("{op:?} {}", fmt_expr(operand))
}
LogicalExpression::FunctionCall { name, args, .. } => {
let arg_strs: Vec<String> = args.iter().map(fmt_expr).collect();
format!("{name}({})", arg_strs.join(", "))
}
_ => format!("{expr:?}"),
}
}
fn fmt_triple_component(comp: &TripleComponent) -> String {
match comp {
TripleComponent::Variable(name) => format!("?{name}"),
TripleComponent::Iri(iri) => format!("<{iri}>"),
TripleComponent::Literal(val) => format!("{val}"),
TripleComponent::LangLiteral { value, lang } => format!("\"{value}\"@{lang}"),
TripleComponent::BlankNode(label) => format!("_:{label}"),
}
}
#[derive(Debug, Clone)]
pub struct NodeScanOp {
pub variable: String,
pub label: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct EdgeScanOp {
pub variable: String,
pub edge_types: Vec<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PathMode {
#[default]
Walk,
Trail,
Simple,
Acyclic,
}
#[derive(Debug, Clone)]
pub struct ExpandOp {
pub from_variable: String,
pub to_variable: String,
pub edge_variable: Option<String>,
pub direction: ExpandDirection,
pub edge_types: Vec<String>,
pub min_hops: u32,
pub max_hops: Option<u32>,
pub input: Box<LogicalOperator>,
pub path_alias: Option<String>,
pub path_mode: PathMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExpandDirection {
Outgoing,
Incoming,
Both,
}
#[derive(Debug, Clone)]
pub struct JoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub join_type: JoinType,
pub conditions: Vec<JoinCondition>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
Semi,
Anti,
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left: LogicalExpression,
pub right: LogicalExpression,
}
#[derive(Debug, Clone)]
pub struct MultiWayJoinOp {
pub inputs: Vec<LogicalOperator>,
pub conditions: Vec<JoinCondition>,
pub shared_variables: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct AggregateOp {
pub group_by: Vec<LogicalExpression>,
pub aggregates: Vec<AggregateExpr>,
pub input: Box<LogicalOperator>,
pub having: Option<LogicalExpression>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntityKind {
Edge,
Node,
}
#[derive(Debug, Clone)]
pub struct HorizontalAggregateOp {
pub list_column: String,
pub entity_kind: EntityKind,
pub function: AggregateFunction,
pub property: String,
pub alias: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct AggregateExpr {
pub function: AggregateFunction,
pub expression: Option<LogicalExpression>,
pub expression2: Option<LogicalExpression>,
pub distinct: bool,
pub alias: Option<String>,
pub percentile: Option<f64>,
pub separator: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AggregateFunction {
Count,
CountNonNull,
Sum,
Avg,
Min,
Max,
Collect,
StdDev,
StdDevPop,
Variance,
VariancePop,
PercentileDisc,
PercentileCont,
GroupConcat,
Sample,
CovarSamp,
CovarPop,
Corr,
RegrSlope,
RegrIntercept,
RegrR2,
RegrCount,
RegrSxx,
RegrSyy,
RegrSxy,
RegrAvgx,
RegrAvgy,
}
#[derive(Debug, Clone)]
pub enum PushdownHint {
IndexLookup {
property: String,
},
RangeScan {
property: String,
},
LabelFirst,
}
#[derive(Debug, Clone)]
pub struct FilterOp {
pub predicate: LogicalExpression,
pub input: Box<LogicalOperator>,
pub pushdown_hint: Option<PushdownHint>,
}
#[derive(Debug, Clone)]
pub struct ProjectOp {
pub projections: Vec<Projection>,
pub input: Box<LogicalOperator>,
pub pass_through_input: bool,
}
#[derive(Debug, Clone)]
pub struct Projection {
pub expression: LogicalExpression,
pub alias: Option<String>,
}
#[derive(Debug, Clone)]
pub struct LimitOp {
pub count: CountExpr,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SkipOp {
pub count: CountExpr,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SortOp {
pub keys: Vec<SortKey>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SortKey {
pub expression: LogicalExpression,
pub order: SortOrder,
pub nulls: Option<NullsOrdering>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortOrder {
Ascending,
Descending,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NullsOrdering {
First,
Last,
}
#[derive(Debug, Clone)]
pub struct DistinctOp {
pub input: Box<LogicalOperator>,
pub columns: Option<Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct CreateNodeOp {
pub variable: String,
pub labels: Vec<String>,
pub properties: Vec<(String, LogicalExpression)>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct CreateEdgeOp {
pub variable: Option<String>,
pub from_variable: String,
pub to_variable: String,
pub edge_type: String,
pub properties: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct DeleteNodeOp {
pub variable: String,
pub detach: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct DeleteEdgeOp {
pub variable: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct SetPropertyOp {
pub variable: String,
pub properties: Vec<(String, LogicalExpression)>,
pub replace: bool,
pub is_edge: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct AddLabelOp {
pub variable: String,
pub labels: Vec<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct RemoveLabelOp {
pub variable: String,
pub labels: Vec<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone, Default)]
pub struct DatasetRestriction {
pub default_graphs: Vec<String>,
pub named_graphs: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct TripleScanOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<TripleComponent>,
pub input: Option<Box<LogicalOperator>>,
pub dataset: Option<DatasetRestriction>,
}
#[derive(Debug, Clone)]
pub enum TripleComponent {
Variable(String),
Iri(String),
Literal(Value),
LangLiteral {
value: String,
lang: String,
},
BlankNode(String),
}
#[derive(Debug, Clone)]
pub struct UnionOp {
pub inputs: Vec<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ExceptOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub all: bool,
}
#[derive(Debug, Clone)]
pub struct IntersectOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub all: bool,
}
#[derive(Debug, Clone)]
pub struct OtherwiseOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ApplyOp {
pub input: Box<LogicalOperator>,
pub subplan: Box<LogicalOperator>,
pub shared_variables: Vec<String>,
pub optional: bool,
}
#[derive(Debug, Clone)]
pub struct ParameterScanOp {
pub columns: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct LeftJoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
pub condition: Option<LogicalExpression>,
}
#[derive(Debug, Clone)]
pub struct AntiJoinOp {
pub left: Box<LogicalOperator>,
pub right: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct BindOp {
pub expression: LogicalExpression,
pub variable: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct UnwindOp {
pub expression: LogicalExpression,
pub variable: String,
pub ordinality_var: Option<String>,
pub offset_var: Option<String>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MapCollectOp {
pub key_var: String,
pub value_var: String,
pub alias: String,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MergeOp {
pub variable: String,
pub labels: Vec<String>,
pub match_properties: Vec<(String, LogicalExpression)>,
pub on_create: Vec<(String, LogicalExpression)>,
pub on_match: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct MergeRelationshipOp {
pub variable: String,
pub source_variable: String,
pub target_variable: String,
pub edge_type: String,
pub match_properties: Vec<(String, LogicalExpression)>,
pub on_create: Vec<(String, LogicalExpression)>,
pub on_match: Vec<(String, LogicalExpression)>,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ShortestPathOp {
pub input: Box<LogicalOperator>,
pub source_var: String,
pub target_var: String,
pub edge_types: Vec<String>,
pub direction: ExpandDirection,
pub path_alias: String,
pub all_paths: bool,
}
#[derive(Debug, Clone)]
pub struct InsertTripleOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct DeleteTripleOp {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone)]
pub struct ModifyOp {
pub delete_templates: Vec<TripleTemplate>,
pub insert_templates: Vec<TripleTemplate>,
pub where_clause: Box<LogicalOperator>,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TripleTemplate {
pub subject: TripleComponent,
pub predicate: TripleComponent,
pub object: TripleComponent,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ClearGraphOp {
pub graph: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct CreateGraphOp {
pub graph: String,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct DropGraphOp {
pub graph: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct LoadGraphOp {
pub source: String,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct CopyGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct MoveGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct AddGraphOp {
pub source: Option<String>,
pub destination: Option<String>,
pub silent: bool,
}
#[derive(Debug, Clone)]
pub struct VectorScanOp {
pub variable: String,
pub index_name: Option<String>,
pub property: String,
pub label: Option<String>,
pub query_vector: LogicalExpression,
pub k: usize,
pub metric: Option<VectorMetric>,
pub min_similarity: Option<f32>,
pub max_distance: Option<f32>,
pub input: Option<Box<LogicalOperator>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VectorMetric {
Cosine,
Euclidean,
DotProduct,
Manhattan,
}
#[derive(Debug, Clone)]
pub struct VectorJoinOp {
pub input: Box<LogicalOperator>,
pub left_vector_variable: Option<String>,
pub left_property: Option<String>,
pub query_vector: LogicalExpression,
pub right_variable: String,
pub right_property: String,
pub right_label: Option<String>,
pub index_name: Option<String>,
pub k: usize,
pub metric: Option<VectorMetric>,
pub min_similarity: Option<f32>,
pub max_distance: Option<f32>,
pub score_variable: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ReturnOp {
pub items: Vec<ReturnItem>,
pub distinct: bool,
pub input: Box<LogicalOperator>,
}
#[derive(Debug, Clone)]
pub struct ReturnItem {
pub expression: LogicalExpression,
pub alias: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CreatePropertyGraphOp {
pub name: String,
pub node_tables: Vec<PropertyGraphNodeTable>,
pub edge_tables: Vec<PropertyGraphEdgeTable>,
}
#[derive(Debug, Clone)]
pub struct PropertyGraphNodeTable {
pub name: String,
pub columns: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct PropertyGraphEdgeTable {
pub name: String,
pub columns: Vec<(String, String)>,
pub source_table: String,
pub target_table: String,
}
#[derive(Debug, Clone)]
pub struct CallProcedureOp {
pub name: Vec<String>,
pub arguments: Vec<LogicalExpression>,
pub yield_items: Option<Vec<ProcedureYield>>,
}
#[derive(Debug, Clone)]
pub struct ProcedureYield {
pub field_name: String,
pub alias: Option<String>,
}
pub use grafeo_core::execution::operators::LoadDataFormat;
#[derive(Debug, Clone)]
pub struct LoadDataOp {
pub format: LoadDataFormat,
pub with_headers: bool,
pub path: String,
pub variable: String,
pub field_terminator: Option<char>,
}
#[derive(Debug, Clone)]
pub enum LogicalExpression {
Literal(Value),
Variable(String),
Property {
variable: String,
property: String,
},
Binary {
left: Box<LogicalExpression>,
op: BinaryOp,
right: Box<LogicalExpression>,
},
Unary {
op: UnaryOp,
operand: Box<LogicalExpression>,
},
FunctionCall {
name: String,
args: Vec<LogicalExpression>,
distinct: bool,
},
List(Vec<LogicalExpression>),
Map(Vec<(String, LogicalExpression)>),
IndexAccess {
base: Box<LogicalExpression>,
index: Box<LogicalExpression>,
},
SliceAccess {
base: Box<LogicalExpression>,
start: Option<Box<LogicalExpression>>,
end: Option<Box<LogicalExpression>>,
},
Case {
operand: Option<Box<LogicalExpression>>,
when_clauses: Vec<(LogicalExpression, LogicalExpression)>,
else_clause: Option<Box<LogicalExpression>>,
},
Parameter(String),
Labels(String),
Type(String),
Id(String),
ListComprehension {
variable: String,
list_expr: Box<LogicalExpression>,
filter_expr: Option<Box<LogicalExpression>>,
map_expr: Box<LogicalExpression>,
},
ListPredicate {
kind: ListPredicateKind,
variable: String,
list_expr: Box<LogicalExpression>,
predicate: Box<LogicalExpression>,
},
ExistsSubquery(Box<LogicalOperator>),
CountSubquery(Box<LogicalOperator>),
ValueSubquery(Box<LogicalOperator>),
MapProjection {
base: String,
entries: Vec<MapProjectionEntry>,
},
Reduce {
accumulator: String,
initial: Box<LogicalExpression>,
variable: String,
list: Box<LogicalExpression>,
expression: Box<LogicalExpression>,
},
PatternComprehension {
subplan: Box<LogicalOperator>,
projection: Box<LogicalExpression>,
},
}
#[derive(Debug, Clone)]
pub enum MapProjectionEntry {
PropertySelector(String),
LiteralEntry(String, LogicalExpression),
AllProperties,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ListPredicateKind {
All,
Any,
None,
Single,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BinaryOp {
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
And,
Or,
Xor,
Add,
Sub,
Mul,
Div,
Mod,
Concat,
StartsWith,
EndsWith,
Contains,
In,
Like,
Regex,
Pow,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnaryOp {
Not,
Neg,
IsNull,
IsNotNull,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_node_scan_plan() {
let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".into()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Person".into()),
input: None,
})),
}));
if let LogicalOperator::Return(ret) = &plan.root {
assert_eq!(ret.items.len(), 1);
assert!(!ret.distinct);
if let LogicalOperator::NodeScan(scan) = ret.input.as_ref() {
assert_eq!(scan.variable, "n");
assert_eq!(scan.label, Some("Person".into()));
} else {
panic!("Expected NodeScan");
}
} else {
panic!("Expected Return");
}
}
#[test]
fn test_filter_plan() {
let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Property {
variable: "n".into(),
property: "name".into(),
},
alias: Some("name".into()),
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".into(),
property: "age".into(),
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".into(),
label: Some("Person".into()),
input: None,
})),
pushdown_hint: None,
})),
}));
if let LogicalOperator::Return(ret) = &plan.root {
if let LogicalOperator::Filter(filter) = ret.input.as_ref() {
if let LogicalExpression::Binary { op, .. } = &filter.predicate {
assert_eq!(*op, BinaryOp::Gt);
} else {
panic!("Expected Binary expression");
}
} else {
panic!("Expected Filter");
}
} else {
panic!("Expected Return");
}
}
}