use crate::query::plan::LogicalExpression;
use grafeo_common::types::LogicalType;
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::execution::operators::{
DistinctOperator, ExceptOperator, HashJoinOperator, IntersectOperator,
JoinType as PhysicalJoinType, LimitOperator, Operator, OtherwiseOperator, ProjectExpr,
ProjectOperator, SkipOperator, UnionOperator,
};
pub(crate) fn build_limit(
input: Box<dyn Operator>,
columns: Vec<String>,
count: usize,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(LimitOperator::new(input, count, schema));
(operator, columns)
}
pub(crate) fn build_skip(
input: Box<dyn Operator>,
columns: Vec<String>,
count: usize,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(SkipOperator::new(input, count, schema));
(operator, columns)
}
pub(crate) fn build_distinct(
input: Box<dyn Operator>,
columns: Vec<String>,
distinct_columns: Option<&[String]>,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator: Box<dyn Operator> = if let Some(dist_cols) = distinct_columns {
let col_indices: Vec<usize> = dist_cols
.iter()
.filter_map(|name| columns.iter().position(|c| c == name))
.collect();
if col_indices.is_empty() {
Box::new(DistinctOperator::new(input, schema))
} else {
Box::new(DistinctOperator::on_columns(input, col_indices, schema))
}
} else {
Box::new(DistinctOperator::new(input, schema))
};
(operator, columns)
}
pub(crate) fn build_union(
inputs: Vec<Box<dyn Operator>>,
columns: Vec<String>,
schema: Vec<LogicalType>,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
if inputs.is_empty() {
return Err(Error::Internal(
"Union requires at least one input".to_string(),
));
}
let operator = Box::new(UnionOperator::new(inputs, schema));
Ok((operator, columns))
}
pub(crate) fn build_except(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
all: bool,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(ExceptOperator::new(left, right, all, schema));
(operator, columns)
}
pub(crate) fn build_intersect(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
all: bool,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(IntersectOperator::new(left, right, all, schema));
(operator, columns)
}
pub(crate) fn build_otherwise(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(OtherwiseOperator::new(left, right));
(operator, columns)
}
#[cfg(feature = "rdf")]
pub(crate) fn build_inner_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: &[String],
right_columns: &[String],
schema_fn: impl Fn(&[String]) -> Vec<LogicalType>,
cardinalities: Option<(f64, f64)>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(left_columns, right_columns);
let join_type = if probe_keys.is_empty() {
PhysicalJoinType::Cross
} else {
PhysicalJoinType::Inner
};
let swap_sides = matches!(join_type, PhysicalJoinType::Inner)
&& cardinalities.is_some_and(|(left_card, right_card)| right_card < left_card * 0.8);
if swap_sides {
let mut join_columns: Vec<String> = right_columns.to_vec();
join_columns.extend(left_columns.iter().cloned());
let join_schema = schema_fn(&join_columns);
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
right, left, build_keys, probe_keys, join_type,
join_schema,
));
let right_count = right_columns.len();
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut proj_indices: Vec<usize> =
(0..left_columns.len()).map(|i| right_count + i).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
proj_indices.push(right_idx);
output_columns.push(right_col.clone());
}
}
let proj_exprs: Vec<ProjectExpr> = proj_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> = proj_indices.iter().map(|_| LogicalType::Any).collect();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns)
} else {
let mut join_columns: Vec<String> = left_columns.to_vec();
join_columns.extend(right_columns.iter().cloned());
let join_schema = schema_fn(&join_columns);
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
join_type,
join_schema,
));
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut keep_indices: Vec<usize> = (0..left_columns.len()).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
keep_indices.push(left_columns.len() + right_idx);
output_columns.push(right_col.clone());
}
}
if keep_indices.len() < join_columns.len() {
let proj_exprs: Vec<ProjectExpr> = keep_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> =
keep_indices.iter().map(|_| LogicalType::Any).collect();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns)
} else {
(join_op, output_columns)
}
}
}
pub(crate) fn build_anti_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: Vec<String>,
right_columns: &[String],
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(&left_columns, right_columns);
let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Anti,
schema,
));
(operator, left_columns)
}
#[cfg(feature = "rdf")]
pub(crate) fn build_semi_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: Vec<String>,
right_columns: &[String],
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(&left_columns, right_columns);
let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Semi,
schema,
));
(operator, left_columns)
}
pub(crate) fn build_left_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: &[String],
right_columns: &[String],
schema_fn: impl Fn(&[String]) -> Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(left_columns, right_columns);
let mut join_columns: Vec<String> = left_columns.to_vec();
join_columns.extend(right_columns.iter().cloned());
let join_schema = schema_fn(&join_columns);
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Left,
join_schema,
));
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut keep_indices: Vec<usize> = (0..left_columns.len()).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
keep_indices.push(left_columns.len() + right_idx);
output_columns.push(right_col.clone());
}
}
if keep_indices.len() < join_columns.len() {
let proj_exprs: Vec<ProjectExpr> = keep_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> = keep_indices.iter().map(|_| LogicalType::Any).collect();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns)
} else {
(join_op, output_columns)
}
}
fn find_shared_join_keys(left: &[String], right: &[String]) -> (Vec<usize>, Vec<usize>) {
let mut probe_keys = Vec::new();
let mut build_keys = Vec::new();
for (right_idx, right_col) in right.iter().enumerate() {
if let Some(left_idx) = left.iter().position(|c| c == right_col) {
probe_keys.push(left_idx);
build_keys.push(right_idx);
}
}
(probe_keys, build_keys)
}
pub(crate) fn resolve_expression_to_column(
expr: &LogicalExpression,
variable_columns: &std::collections::HashMap<String, usize>,
context: &str,
) -> Result<usize> {
match expr {
LogicalExpression::Variable(name) => variable_columns
.get(name)
.copied()
.ok_or_else(|| Error::Internal(format!("Variable '{name}' not found{context}"))),
LogicalExpression::Property { variable, property } => {
let col_name = format!("{variable}_{property}");
variable_columns.get(&col_name).copied().ok_or_else(|| {
Error::Internal(format!(
"Property column '{col_name}' not found{context} (from {variable}.{property})"
))
})
}
_ => {
let col_name = format!("__expr_{expr:?}");
variable_columns.get(&col_name).copied().ok_or_else(|| {
Error::Internal(format!(
"Cannot resolve expression to column{context}: {expr:?}"
))
})
}
}
}
pub(crate) fn expression_to_string(expr: &LogicalExpression) -> String {
match expr {
LogicalExpression::Variable(name) => name.clone(),
LogicalExpression::Property { variable, property } => {
format!("{variable}.{property}")
}
LogicalExpression::Literal(value) => format!("{value:?}"),
LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
LogicalExpression::IndexAccess { base, index } => {
format!(
"{}[{}]",
expression_to_string(base),
expression_to_string(index)
)
}
_ => "expr".to_string(),
}
}