use crate::query::plan::LogicalExpression;
use grafeo_common::types::LogicalType;
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::execution::operators::{
DistinctOperator, ExceptOperator, HashJoinOperator, IntersectOperator,
JoinType as PhysicalJoinType, LimitOperator, Operator, OtherwiseOperator, ProjectExpr,
ProjectOperator, SkipOperator, UnionOperator,
};
pub(crate) fn build_limit(
input: Box<dyn Operator>,
columns: Vec<String>,
count: usize,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(LimitOperator::new(input, count, schema));
(operator, columns)
}
pub(crate) fn build_skip(
input: Box<dyn Operator>,
columns: Vec<String>,
count: usize,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(SkipOperator::new(input, count, schema));
(operator, columns)
}
pub(crate) fn build_distinct(
input: Box<dyn Operator>,
columns: Vec<String>,
distinct_columns: Option<&[String]>,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator: Box<dyn Operator> = if let Some(dist_cols) = distinct_columns {
let col_indices: Vec<usize> = dist_cols
.iter()
.filter_map(|name| columns.iter().position(|c| c == name))
.collect();
if col_indices.is_empty() {
Box::new(DistinctOperator::new(input, schema))
} else {
Box::new(DistinctOperator::on_columns(input, col_indices, schema))
}
} else {
Box::new(DistinctOperator::new(input, schema))
};
(operator, columns)
}
pub(crate) fn build_union(
inputs: Vec<Box<dyn Operator>>,
columns: Vec<String>,
schema: Vec<LogicalType>,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
if inputs.is_empty() {
return Err(Error::Internal(
"Union requires at least one input".to_string(),
));
}
let operator = Box::new(UnionOperator::new(inputs, schema));
Ok((operator, columns))
}
pub(crate) fn build_except(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
all: bool,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(ExceptOperator::new(left, right, all, schema));
(operator, columns)
}
pub(crate) fn build_intersect(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
all: bool,
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(IntersectOperator::new(left, right, all, schema));
(operator, columns)
}
pub(crate) fn build_otherwise(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
columns: Vec<String>,
) -> (Box<dyn Operator>, Vec<String>) {
let operator = Box::new(OtherwiseOperator::new(left, right));
(operator, columns)
}
#[cfg(feature = "rdf")]
pub(crate) fn build_inner_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: &[String],
right_columns: &[String],
left_types: &[LogicalType],
right_types: &[LogicalType],
cardinalities: Option<(f64, f64)>,
) -> (Box<dyn Operator>, Vec<String>, Vec<LogicalType>) {
let (probe_keys, build_keys) = find_shared_join_keys(left_columns, right_columns);
let join_type = if probe_keys.is_empty() {
PhysicalJoinType::Cross
} else {
PhysicalJoinType::Inner
};
let swap_sides = matches!(join_type, PhysicalJoinType::Inner)
&& cardinalities.is_some_and(|(left_card, right_card)| right_card < left_card * 0.8);
if swap_sides {
let mut join_columns: Vec<String> = right_columns.to_vec();
join_columns.extend(left_columns.iter().cloned());
let mut join_schema: Vec<LogicalType> = right_types.to_vec();
join_schema.extend(left_types.iter().cloned());
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
right, left, build_keys, probe_keys, join_type,
join_schema.clone(),
));
let right_count = right_columns.len();
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut proj_indices: Vec<usize> =
(0..left_columns.len()).map(|i| right_count + i).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
proj_indices.push(right_idx);
output_columns.push(right_col.clone());
}
}
let proj_exprs: Vec<ProjectExpr> = proj_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> = proj_indices
.iter()
.map(|&i| join_schema[i].clone())
.collect();
let output_types = proj_types.clone();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns, output_types)
} else {
let mut join_columns: Vec<String> = left_columns.to_vec();
join_columns.extend(right_columns.iter().cloned());
let mut join_schema: Vec<LogicalType> = left_types.to_vec();
join_schema.extend(right_types.iter().cloned());
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
join_type,
join_schema.clone(),
));
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut keep_indices: Vec<usize> = (0..left_columns.len()).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
keep_indices.push(left_columns.len() + right_idx);
output_columns.push(right_col.clone());
}
}
if keep_indices.len() < join_columns.len() {
let proj_exprs: Vec<ProjectExpr> = keep_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> = keep_indices
.iter()
.map(|&i| join_schema[i].clone())
.collect();
let output_types = proj_types.clone();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns, output_types)
} else {
(join_op, output_columns, join_schema)
}
}
}
pub(crate) fn build_anti_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: Vec<String>,
right_columns: &[String],
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(&left_columns, right_columns);
if probe_keys.is_empty() {
return (left, left_columns);
}
let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Anti,
schema,
));
(operator, left_columns)
}
#[cfg(feature = "rdf")]
pub(crate) fn build_semi_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: Vec<String>,
right_columns: &[String],
schema: Vec<LogicalType>,
) -> (Box<dyn Operator>, Vec<String>) {
let (probe_keys, build_keys) = find_shared_join_keys(&left_columns, right_columns);
let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Semi,
schema,
));
(operator, left_columns)
}
pub(crate) fn build_left_join(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_columns: &[String],
right_columns: &[String],
left_types: &[LogicalType],
right_types: &[LogicalType],
) -> (Box<dyn Operator>, Vec<String>, Vec<LogicalType>) {
let (probe_keys, build_keys) = find_shared_join_keys(left_columns, right_columns);
let mut join_columns: Vec<String> = left_columns.to_vec();
join_columns.extend(right_columns.iter().cloned());
let mut join_schema: Vec<LogicalType> = left_types.to_vec();
join_schema.extend(right_types.iter().cloned());
let join_op: Box<dyn Operator> = Box::new(HashJoinOperator::new(
left,
right,
probe_keys,
build_keys,
PhysicalJoinType::Left,
join_schema.clone(),
));
let left_set: std::collections::HashSet<&str> =
left_columns.iter().map(String::as_str).collect();
let mut keep_indices: Vec<usize> = (0..left_columns.len()).collect();
let mut output_columns: Vec<String> = left_columns.to_vec();
for (right_idx, right_col) in right_columns.iter().enumerate() {
if !left_set.contains(right_col.as_str()) {
keep_indices.push(left_columns.len() + right_idx);
output_columns.push(right_col.clone());
}
}
if keep_indices.len() < join_columns.len() {
let proj_exprs: Vec<ProjectExpr> = keep_indices
.iter()
.map(|&i| ProjectExpr::Column(i))
.collect();
let proj_types: Vec<LogicalType> = keep_indices
.iter()
.map(|&i| join_schema[i].clone())
.collect();
let output_types = proj_types.clone();
let operator = Box::new(ProjectOperator::new(join_op, proj_exprs, proj_types));
(operator, output_columns, output_types)
} else {
(join_op, output_columns, join_schema)
}
}
fn find_shared_join_keys(left: &[String], right: &[String]) -> (Vec<usize>, Vec<usize>) {
let mut probe_keys = Vec::new();
let mut build_keys = Vec::new();
for (right_idx, right_col) in right.iter().enumerate() {
if let Some(left_idx) = left.iter().position(|c| c == right_col) {
probe_keys.push(left_idx);
build_keys.push(right_idx);
}
}
(probe_keys, build_keys)
}
pub(crate) fn resolve_expression_to_column(
expr: &LogicalExpression,
variable_columns: &std::collections::HashMap<String, usize>,
context: &str,
) -> Result<usize> {
match expr {
LogicalExpression::Variable(name) => variable_columns
.get(name)
.copied()
.ok_or_else(|| Error::Internal(format!("Variable '{name}' not found{context}"))),
LogicalExpression::Property { variable, property } => {
let col_name = format!("{variable}_{property}");
variable_columns.get(&col_name).copied().ok_or_else(|| {
Error::Internal(format!(
"Property column '{col_name}' not found{context} (from {variable}.{property})"
))
})
}
_ => {
let col_name = format!("__expr_{expr:?}");
variable_columns.get(&col_name).copied().ok_or_else(|| {
Error::Internal(format!(
"Cannot resolve expression to column{context}: {expr:?}"
))
})
}
}
}
pub(crate) fn expression_to_string(expr: &LogicalExpression) -> String {
match expr {
LogicalExpression::Variable(name) => name.clone(),
LogicalExpression::Property { variable, property } => {
format!("{variable}.{property}")
}
LogicalExpression::Literal(value) => format!("{value:?}"),
LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
LogicalExpression::IndexAccess { base, index } => {
format!(
"{}[{}]",
expression_to_string(base),
expression_to_string(index)
)
}
_ => "expr".to_string(),
}
}
#[cfg(all(test, feature = "rdf"))]
mod tests {
use super::*;
use grafeo_common::types::LogicalType;
use grafeo_core::execution::DataChunk;
use grafeo_core::execution::operators::{Operator, OperatorResult};
struct MockOperator {
chunk: Option<DataChunk>,
}
impl MockOperator {
fn new(chunk: DataChunk) -> Self {
Self { chunk: Some(chunk) }
}
}
impl Operator for MockOperator {
fn next(&mut self) -> OperatorResult {
Ok(self.chunk.take())
}
fn reset(&mut self) {}
fn name(&self) -> &'static str {
"Mock"
}
}
fn make_chunk(types: &[LogicalType]) -> DataChunk {
let mut chunk = DataChunk::with_capacity(types, 2);
for (i, t) in types.iter().enumerate() {
let col = chunk.column_mut(i).unwrap();
match t {
LogicalType::String => {
col.push_string("v1");
col.push_string("v2");
}
LogicalType::Int64 => {
col.push_int64(1);
col.push_int64(2);
}
_ => {}
}
}
chunk.set_count(2);
chunk
}
#[test]
fn test_inner_join_swap_sides_preserves_types() {
let left_cols = vec!["s".to_string(), "name".to_string()];
let right_cols = vec!["s".to_string(), "age".to_string()];
let left_types = vec![LogicalType::String, LogicalType::String];
let right_types = vec![LogicalType::String, LogicalType::Int64];
let left_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&left_types)));
let right_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&right_types)));
let (_, output_columns, output_types) = build_inner_join(
left_op,
right_op,
&left_cols,
&right_cols,
&left_types,
&right_types,
Some((1000.0, 100.0)),
);
assert_eq!(output_columns, vec!["s", "name", "age"]);
assert_eq!(
output_types,
vec![LogicalType::String, LogicalType::String, LogicalType::Int64]
);
}
#[test]
fn test_inner_join_no_swap_preserves_types() {
let left_cols = vec!["s".to_string(), "name".to_string()];
let right_cols = vec!["s".to_string(), "age".to_string()];
let left_types = vec![LogicalType::String, LogicalType::String];
let right_types = vec![LogicalType::String, LogicalType::Int64];
let left_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&left_types)));
let right_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&right_types)));
let (_, output_columns, output_types) = build_inner_join(
left_op,
right_op,
&left_cols,
&right_cols,
&left_types,
&right_types,
Some((100.0, 1000.0)),
);
assert_eq!(output_columns, vec!["s", "name", "age"]);
assert_eq!(
output_types,
vec![LogicalType::String, LogicalType::String, LogicalType::Int64]
);
}
#[test]
fn test_inner_join_cross_join_types() {
let left_cols = vec!["a".to_string()];
let right_cols = vec!["b".to_string()];
let left_types = vec![LogicalType::String];
let right_types = vec![LogicalType::Int64];
let left_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&left_types)));
let right_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&right_types)));
let (_, output_columns, output_types) = build_inner_join(
left_op,
right_op,
&left_cols,
&right_cols,
&left_types,
&right_types,
Some((1000.0, 100.0)),
);
assert_eq!(output_columns, vec!["a", "b"]);
assert_eq!(output_types, vec![LogicalType::String, LogicalType::Int64]);
}
#[test]
fn test_left_join_preserves_types() {
let left_cols = vec!["s".to_string(), "name".to_string()];
let right_cols = vec!["s".to_string(), "age".to_string()];
let left_types = vec![LogicalType::String, LogicalType::String];
let right_types = vec![LogicalType::String, LogicalType::Int64];
let left_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&left_types)));
let right_op: Box<dyn Operator> = Box::new(MockOperator::new(make_chunk(&right_types)));
let (_, output_columns, output_types) = build_left_join(
left_op,
right_op,
&left_cols,
&right_cols,
&left_types,
&right_types,
);
assert_eq!(output_columns, vec!["s", "name", "age"]);
assert_eq!(
output_types,
vec![LogicalType::String, LogicalType::String, LogicalType::Int64]
);
}
}