use super::{
AddLabelOp, AddLabelOperator, AntiJoinOp, Arc, CreateEdgeOp, CreateEdgeOperator, CreateNodeOp,
CreateNodeOperator, DeleteEdgeOp, DeleteEdgeOperator, DeleteNodeOp, DeleteNodeOperator,
Direction, Error, ExpandDirection, ExpressionPredicate, FilterOperator, GraphStore, HashMap,
LeftJoinOp, LogicalExpression, LogicalOperator, LogicalType, MergeConfig, MergeOp,
MergeOperator, MergeRelationshipConfig, MergeRelationshipOp, MergeRelationshipOperator,
Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOp, RemoveLabelOperator,
Result, SetPropertyOp, SetPropertyOperator, ShortestPathOp, ShortestPathOperator, UnaryOp,
UnwindOp, UnwindOperator, Value,
};
#[cfg(feature = "algos")]
use super::{CallProcedureOp, GraphStoreMut, StaticResultOperator};
impl super::Planner {
pub(super) fn plan_create_node(
&self,
create: &CreateNodeOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, mut columns) = if let Some(ref input) = create.input {
let (op, cols) = self.plan_operator(input)?;
(Some(op), cols)
} else {
(None, vec![])
};
if columns.contains(&create.variable)
&& create.labels.is_empty()
&& create.properties.is_empty()
&& let Some(op) = input_op
{
return Ok((op, columns));
}
let output_column = columns.len();
columns.push(create.variable.clone());
let properties: Vec<(String, PropertySource)> = create
.properties
.iter()
.map(|(name, expr)| {
let source = self.expression_to_property_source(expr, &columns)?;
Ok((name.clone(), source))
})
.collect::<Result<Vec<_>>>()?;
let mut output_schema = self.derive_schema_from_columns(&columns[..output_column]);
output_schema.push(LogicalType::Node);
let mut op = CreateNodeOperator::new(
Arc::clone(&self.store),
input_op,
create.labels.clone(),
properties,
output_schema,
output_column,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
if let Some(ref validator) = self.validator {
op = op.with_validator(Arc::clone(validator));
}
let operator = Box::new(op);
Ok((operator, columns))
}
pub(super) fn plan_create_edge(
&self,
create: &CreateEdgeOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, mut columns) = self.plan_operator(&create.input)?;
let from_column = columns
.iter()
.position(|c| c == &create.from_variable)
.ok_or_else(|| {
Error::Internal(format!(
"Source variable '{}' not found",
create.from_variable
))
})?;
let to_column = columns
.iter()
.position(|c| c == &create.to_variable)
.ok_or_else(|| {
Error::Internal(format!(
"Target variable '{}' not found",
create.to_variable
))
})?;
let output_column = create.variable.as_ref().map(|v| {
let idx = columns.len();
columns.push(v.clone());
self.edge_columns.borrow_mut().insert(v.clone());
idx
});
let properties: Vec<(String, PropertySource)> = create
.properties
.iter()
.map(|(name, expr)| {
let source = self.expression_to_property_source(expr, &columns)?;
Ok((name.clone(), source))
})
.collect::<Result<Vec<_>>>()?;
let output_schema = self.derive_schema_from_columns(&columns);
let mut operator = CreateEdgeOperator::new(
Arc::clone(&self.store),
input_op,
from_column,
to_column,
create.edge_type.clone(),
output_schema,
)
.with_properties(properties)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
operator = operator.with_write_tracker(Arc::clone(tracker));
}
if let Some(col) = output_column {
operator = operator.with_output_column(col);
}
if let Some(ref validator) = self.validator {
operator = operator.with_validator(Arc::clone(validator));
}
let operator = Box::new(operator);
Ok((operator, columns))
}
pub(super) fn plan_delete_node(
&self,
delete: &DeleteNodeOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, columns) = self.plan_operator(&delete.input)?;
let col_idx = columns
.iter()
.position(|c| c == &delete.variable)
.ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for delete",
delete.variable
))
})?;
let output_schema = self.derive_schema_from_columns(&columns);
let output_columns = columns.clone();
let is_edge = self.edge_columns.borrow().contains(&delete.variable);
if is_edge {
let mut op =
DeleteEdgeOperator::new(Arc::clone(&self.store), input_op, col_idx, output_schema)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
Ok((Box::new(op), output_columns))
} else {
let mut op = DeleteNodeOperator::new(
Arc::clone(&self.store),
input_op,
col_idx,
output_schema,
delete.detach,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
Ok((Box::new(op), output_columns))
}
}
pub(super) fn plan_delete_edge(
&self,
delete: &DeleteEdgeOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, columns) = self.plan_operator(&delete.input)?;
let edge_column = columns
.iter()
.position(|c| c == &delete.variable)
.ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for delete",
delete.variable
))
})?;
let output_schema = self.derive_schema_from_columns(&columns);
let output_columns = columns.clone();
let mut op = DeleteEdgeOperator::new(
Arc::clone(&self.store),
input_op,
edge_column,
output_schema,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
Ok((Box::new(op), output_columns))
}
pub(super) fn plan_left_join(
&self,
left_join: &LeftJoinOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (left_op, left_columns): (Box<dyn Operator>, Vec<String>) =
if matches!(left_join.left.as_ref(), LogicalOperator::Empty) {
let single_row: Box<dyn Operator> = Box::new(
grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
);
(single_row, Vec::new())
} else {
self.plan_operator(&left_join.left)?
};
let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
let schema_fn = |cols: &[String]| self.derive_schema_from_columns(cols);
let (join_op, join_columns) = super::common::build_left_join(
left_op,
right_op,
&left_columns,
&right_columns,
schema_fn,
);
if let Some(condition) = &left_join.condition {
let filter_expr = self.convert_expression(condition)?;
let variable_columns: HashMap<String, usize> = join_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let predicate = ExpressionPredicate::new(
filter_expr,
variable_columns,
Arc::clone(&self.store) as Arc<dyn GraphStore>,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id)
.with_session_context(self.session_context.clone());
let filter_op: Box<dyn Operator> =
Box::new(FilterOperator::new(join_op, Box::new(predicate)));
return Ok((filter_op, join_columns));
}
Ok((join_op, join_columns))
}
pub(super) fn plan_anti_join(
&self,
anti_join: &AntiJoinOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
let schema = self.derive_schema_from_columns(&left_columns);
Ok(super::common::build_anti_join(
left_op,
right_op,
left_columns,
&right_columns,
schema,
))
}
pub(super) fn plan_unwind(
&self,
unwind: &UnwindOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
if matches!(&*unwind.input, LogicalOperator::Empty) {
let literal_list = self.convert_expression(&unwind.expression)?;
let single_row_op: Box<dyn Operator> = Box::new(
grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
);
let project_op: Box<dyn Operator> = Box::new(
ProjectOperator::with_store(
single_row_op,
vec![ProjectExpr::Expression {
expr: literal_list,
variable_columns: HashMap::new(),
}],
vec![LogicalType::Any],
Arc::clone(&self.store) as Arc<dyn GraphStore>,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id)
.with_session_context(self.session_context.clone()),
);
(project_op, vec!["__list__".to_string()])
} else {
self.plan_operator(&unwind.input)?
};
let list_col_idx = match &unwind.expression {
LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
None
}
_ => None,
};
let (final_input_op, final_input_columns, col_idx) = if let Some(idx) = list_col_idx {
(input_op, input_columns, idx)
} else if matches!(
&unwind.expression,
LogicalExpression::List(_)
| LogicalExpression::Literal(Value::List(_))
| LogicalExpression::Literal(Value::Vector(_))
| LogicalExpression::Property { .. }
) {
let literal_list = self.convert_expression(&unwind.expression)?;
let mut proj_exprs: Vec<ProjectExpr> =
(0..input_columns.len()).map(ProjectExpr::Column).collect();
let var_cols: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, c)| (c.clone(), i))
.collect();
proj_exprs.push(ProjectExpr::Expression {
expr: literal_list,
variable_columns: var_cols,
});
let mut proj_schema = self.derive_schema_from_columns(&input_columns);
proj_schema.push(LogicalType::Any);
let project_op: Box<dyn Operator> = Box::new(
ProjectOperator::with_store(
input_op,
proj_exprs,
proj_schema,
Arc::clone(&self.store) as Arc<dyn GraphStore>,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id)
.with_session_context(self.session_context.clone()),
);
let list_col = input_columns.len();
let mut cols = input_columns;
cols.push("__unwind_list__".to_string());
(project_op, cols, list_col)
} else {
(input_op, input_columns, 0)
};
let mut columns = final_input_columns.clone();
columns.push(unwind.variable.clone());
self.scalar_columns
.borrow_mut()
.insert(unwind.variable.clone());
let mut output_schema = self.derive_schema_from_columns(&final_input_columns);
output_schema.push(LogicalType::Any);
let emit_ordinality = unwind.ordinality_var.is_some();
if let Some(ref ord_var) = unwind.ordinality_var {
columns.push(ord_var.clone());
output_schema.push(LogicalType::Int64);
self.scalar_columns.borrow_mut().insert(ord_var.clone());
}
let emit_offset = unwind.offset_var.is_some();
if let Some(ref off_var) = unwind.offset_var {
columns.push(off_var.clone());
output_schema.push(LogicalType::Int64);
self.scalar_columns.borrow_mut().insert(off_var.clone());
}
let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
final_input_op,
col_idx,
unwind.variable.clone(),
output_schema,
emit_ordinality,
emit_offset,
));
Ok((operator, columns))
}
pub(super) fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, mut columns) = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
(None, Vec::new())
} else {
let (op, cols) = self.plan_operator(&merge.input)?;
(Some(op), cols)
};
let match_properties: Vec<(String, PropertySource)> = merge
.match_properties
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let on_create_properties: Vec<(String, PropertySource)> = merge
.on_create
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let on_match_properties: Vec<(String, PropertySource)> = merge
.on_match
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let bound_variable_column = columns.iter().position(|c| c == &merge.variable);
let output_column = columns.len();
columns.push(merge.variable.clone());
let input_cols = &columns[..output_column];
let mut output_schema = self.derive_schema_from_columns(input_cols);
output_schema.push(LogicalType::Node);
let mut merge_op = MergeOperator::new(
Arc::clone(&self.store),
input_op,
MergeConfig {
variable: merge.variable.clone(),
labels: merge.labels.clone(),
match_properties,
on_create_properties,
on_match_properties,
output_schema,
output_column,
bound_variable_column,
},
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref validator) = self.validator {
merge_op = merge_op.with_validator(Arc::clone(validator));
}
let operator: Box<dyn Operator> = Box::new(merge_op);
Ok((operator, columns))
}
pub(super) fn plan_merge_relationship(
&self,
merge_rel: &MergeRelationshipOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, mut columns) = self.plan_operator(&merge_rel.input)?;
let source_column = columns
.iter()
.position(|c| c == &merge_rel.source_variable)
.ok_or_else(|| {
Error::Internal(format!(
"Source variable '{}' not found for MERGE relationship",
merge_rel.source_variable
))
})?;
let target_column = columns
.iter()
.position(|c| c == &merge_rel.target_variable)
.ok_or_else(|| {
Error::Internal(format!(
"Target variable '{}' not found for MERGE relationship",
merge_rel.target_variable
))
})?;
let match_properties: Vec<(String, PropertySource)> = merge_rel
.match_properties
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let on_create_properties: Vec<(String, PropertySource)> = merge_rel
.on_create
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let on_match_properties: Vec<(String, PropertySource)> = merge_rel
.on_match
.iter()
.map(|(name, expr)| {
let source = self
.expression_to_property_source(expr, &columns)
.unwrap_or_else(|_| {
Self::try_fold_expression(expr).map_or(
PropertySource::Constant(Value::Null),
PropertySource::Constant,
)
});
(name.clone(), source)
})
.collect();
let edge_output_column = columns.len();
columns.push(merge_rel.variable.clone());
self.edge_columns
.borrow_mut()
.insert(merge_rel.variable.clone());
let input_cols = &columns[..edge_output_column];
let mut output_schema = self.derive_schema_from_columns(input_cols);
output_schema.push(LogicalType::Edge);
let config = MergeRelationshipConfig {
source_column,
target_column,
source_variable: merge_rel.source_variable.clone(),
target_variable: merge_rel.target_variable.clone(),
edge_type: merge_rel.edge_type.clone(),
match_properties,
on_create_properties,
on_match_properties,
output_schema,
edge_output_column,
};
let mut merge_rel_op =
MergeRelationshipOperator::new(Arc::clone(&self.store), input_op, config)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref validator) = self.validator {
merge_rel_op = merge_rel_op.with_validator(Arc::clone(validator));
}
let operator: Box<dyn Operator> = Box::new(merge_rel_op);
Ok((operator, columns))
}
pub(super) fn plan_shortest_path(
&self,
sp: &ShortestPathOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, mut columns) = self.plan_operator(&sp.input)?;
let source_column = columns
.iter()
.position(|c| c == &sp.source_var)
.ok_or_else(|| {
Error::Internal(format!(
"Source variable '{}' not found for shortestPath",
sp.source_var
))
})?;
let target_column = columns
.iter()
.position(|c| c == &sp.target_var)
.ok_or_else(|| {
Error::Internal(format!(
"Target variable '{}' not found for shortestPath",
sp.target_var
))
})?;
let direction = match sp.direction {
ExpandDirection::Outgoing => Direction::Outgoing,
ExpandDirection::Incoming => Direction::Incoming,
ExpandDirection::Both => Direction::Both,
};
let operator: Box<dyn Operator> = Box::new(
ShortestPathOperator::new(
Arc::clone(&self.store) as Arc<dyn GraphStore>,
input_op,
source_column,
target_column,
sp.edge_types.clone(),
direction,
)
.with_all_paths(sp.all_paths),
);
let path_col_name = format!("_path_length_{}", sp.path_alias);
columns.push(path_col_name.clone());
self.scalar_columns.borrow_mut().insert(path_col_name);
Ok((operator, columns))
}
#[cfg(feature = "algos")]
pub(super) fn plan_call_procedure(
&self,
call: &CallProcedureOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::procedures::{self, BuiltinProcedures};
static PROCEDURES: std::sync::OnceLock<BuiltinProcedures> = std::sync::OnceLock::new();
let registry = PROCEDURES.get_or_init(BuiltinProcedures::new);
let resolved_name = call.name.join(".");
if resolved_name == "grafeo.procedures" || resolved_name == "procedures" {
let result = procedures::procedures_result(registry);
return self.plan_static_result(result, &call.yield_items);
}
if let Some(result) = self.plan_catalog_procedure(&resolved_name) {
return self.plan_static_result(result, &call.yield_items);
}
if let Some(catalog) = &self.catalog {
let proc_name = if call.name.len() == 1 {
&call.name[0]
} else {
call.name.last().expect("name has at least one segment")
};
if let Some(proc_def) = catalog.get_procedure(proc_name) {
return self.plan_user_procedure(call, &proc_def);
}
}
let algorithm = registry.get(&call.name).ok_or_else(|| {
Error::Internal(format!(
"Unknown procedure: '{}'. Use CALL grafeo.procedures() to list available procedures.",
call.name.join(".")
))
})?;
let params = procedures::evaluate_arguments(&call.arguments, algorithm.parameters());
let canonical_columns = procedures::output_columns_for_name(algorithm.as_ref());
let yield_columns = call.yield_items.as_ref().map(|items| {
items
.iter()
.map(|item| (item.field_name.clone(), item.alias.clone()))
.collect::<Vec<_>>()
});
let output_columns = if let Some(yield_cols) = &yield_columns {
yield_cols
.iter()
.map(|(name, alias)| alias.clone().unwrap_or_else(|| name.clone()))
.collect()
} else {
canonical_columns.clone()
};
let operator = Box::new(
crate::query::executor::procedure_call::ProcedureCallOperator::new(
Arc::clone(&self.store) as Arc<dyn GraphStore>,
algorithm,
params,
yield_columns,
canonical_columns,
),
);
for col in &output_columns {
self.scalar_columns.borrow_mut().insert(col.clone());
}
Ok((operator, output_columns))
}
#[cfg(feature = "algos")]
pub(super) fn plan_static_result(
&self,
result: grafeo_adapters::plugins::AlgorithmResult,
yield_items: &Option<Vec<crate::query::plan::ProcedureYield>>,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (output_columns, column_indices) = if let Some(items) = yield_items {
let mut cols = Vec::new();
let mut indices = Vec::new();
for item in items {
let idx = result
.columns
.iter()
.position(|c| c == &item.field_name)
.ok_or_else(|| {
Error::Internal(format!(
"YIELD column '{}' not found (available: {})",
item.field_name,
result.columns.join(", ")
))
})?;
indices.push(idx);
cols.push(
item.alias
.clone()
.unwrap_or_else(|| item.field_name.clone()),
);
}
(cols, indices)
} else {
let indices: Vec<usize> = (0..result.columns.len()).collect();
(result.columns.clone(), indices)
};
let operator = Box::new(StaticResultOperator {
rows: result.rows,
column_indices,
row_index: 0,
});
for col in &output_columns {
self.scalar_columns.borrow_mut().insert(col.clone());
}
Ok((operator, output_columns))
}
#[cfg(feature = "algos")]
fn plan_user_procedure(
&self,
call: &CallProcedureOp,
proc_def: &crate::catalog::ProcedureDefinition,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::executor::user_procedure::{ProcedureContext, UserProcedureOperator};
if call.arguments.len() != proc_def.params.len() {
return Err(Error::Internal(format!(
"Procedure '{}' expects {} arguments, got {}",
proc_def.name,
proc_def.params.len(),
call.arguments.len()
)));
}
let mut arg_values = Vec::new();
for arg in &call.arguments {
let val = crate::query::planner::eval_constant_expression(arg)?;
arg_values.push(val);
}
let mut param_map = std::collections::HashMap::new();
for (param, value) in proc_def.params.iter().zip(arg_values) {
param_map.insert(param.0.clone(), value);
}
let return_columns: Vec<String> = proc_def.returns.iter().map(|r| r.0.clone()).collect();
let output_columns = if let Some(yield_items) = &call.yield_items {
yield_items
.iter()
.map(|item| {
item.alias
.clone()
.unwrap_or_else(|| item.field_name.clone())
})
.collect()
} else {
return_columns.clone()
};
let yield_columns = call.yield_items.as_ref().map(|items| {
items
.iter()
.map(|item| item.field_name.clone())
.collect::<Vec<_>>()
});
let operator = Box::new(UserProcedureOperator::new(
proc_def.body.clone(),
param_map,
return_columns,
yield_columns,
ProcedureContext {
store: Arc::clone(&self.store) as Arc<dyn GraphStoreMut>,
transaction_manager: self.transaction_manager.clone(),
transaction_id: self.transaction_id,
viewing_epoch: self.viewing_epoch,
catalog: self.catalog.clone(),
},
));
for col in &output_columns {
self.scalar_columns.borrow_mut().insert(col.clone());
}
Ok((operator, output_columns))
}
#[cfg(feature = "algos")]
fn plan_catalog_procedure(
&self,
name: &str,
) -> Option<grafeo_adapters::plugins::AlgorithmResult> {
use grafeo_adapters::plugins::AlgorithmResult;
use grafeo_common::types::Value;
match name {
"db.labels" | "grafeo.labels" => {
let labels = self.store.all_labels();
let mut result = AlgorithmResult::new(vec!["label".to_string()]);
for label in labels {
result.rows.push(vec![Value::String(label.into())]);
}
Some(result)
}
"db.relationshipTypes" | "grafeo.relationshipTypes" => {
let types = self.store.all_edge_types();
let mut result = AlgorithmResult::new(vec!["relationshipType".to_string()]);
for t in types {
result.rows.push(vec![Value::String(t.into())]);
}
Some(result)
}
"db.propertyKeys" | "grafeo.propertyKeys" => {
let keys = self.store.all_property_keys();
let mut result = AlgorithmResult::new(vec!["propertyKey".to_string()]);
for key in keys {
result.rows.push(vec![Value::String(key.into())]);
}
Some(result)
}
_ => None,
}
}
pub(super) fn plan_add_label(
&self,
add_label: &AddLabelOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, columns) = self.plan_operator(&add_label.input)?;
let node_column = columns
.iter()
.position(|c| c == &add_label.variable)
.ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for ADD LABEL",
add_label.variable
))
})?;
let mut output_schema = self.derive_schema_from_columns(&columns);
output_schema.push(LogicalType::Int64);
let mut output_columns = columns.clone();
output_columns.push("labels_added".to_string());
let mut op = AddLabelOperator::new(
Arc::clone(&self.store),
input_op,
node_column,
add_label.labels.clone(),
output_schema,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
Ok((Box::new(op), output_columns))
}
pub(super) fn plan_remove_label(
&self,
remove_label: &RemoveLabelOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, columns) = self.plan_operator(&remove_label.input)?;
let node_column = columns
.iter()
.position(|c| c == &remove_label.variable)
.ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for REMOVE LABEL",
remove_label.variable
))
})?;
let mut output_schema = self.derive_schema_from_columns(&columns);
output_schema.push(LogicalType::Int64);
let mut output_columns = columns.clone();
output_columns.push("labels_removed".to_string());
let mut op = RemoveLabelOperator::new(
Arc::clone(&self.store),
input_op,
node_column,
remove_label.labels.clone(),
output_schema,
)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
Ok((Box::new(op), output_columns))
}
pub(super) fn plan_set_property(
&self,
set_prop: &SetPropertyOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, columns) = self.plan_operator(&set_prop.input)?;
let entity_column = columns
.iter()
.position(|c| c == &set_prop.variable)
.ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for SET",
set_prop.variable
))
})?;
let properties: Vec<(String, PropertySource)> = set_prop
.properties
.iter()
.map(|(name, expr)| {
let source = match self.expression_to_property_source(expr, &columns) {
Ok(s) => s,
Err(_) => {
Self::try_fold_expression(expr).map_or_else(
|| {
Err(Error::Internal(format!(
"Cannot resolve SET expression for property '{name}': \
variable not in scope or unsupported expression"
)))
},
|v| Ok(PropertySource::Constant(v)),
)?
}
};
Ok((name.clone(), source))
})
.collect::<Result<Vec<_>>>()?;
let output_schema = self.derive_schema_from_columns(&columns);
let output_columns = columns.clone();
let is_edge = set_prop.is_edge || self.edge_columns.borrow().contains(&set_prop.variable);
let operator: Box<dyn Operator> = if is_edge {
let mut op = SetPropertyOperator::new_for_edge(
Arc::clone(&self.store),
input_op,
entity_column,
properties,
output_schema,
)
.with_replace(set_prop.replace)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
if let Some(ref validator) = self.validator {
op = op.with_validator(Arc::clone(validator));
}
Box::new(op)
} else {
let mut op = SetPropertyOperator::new_for_node(
Arc::clone(&self.store),
input_op,
entity_column,
properties,
output_schema,
)
.with_replace(set_prop.replace)
.with_transaction_context(self.viewing_epoch, self.transaction_id);
if let Some(ref tracker) = self.write_tracker {
op = op.with_write_tracker(Arc::clone(tracker));
}
if let Some(ref validator) = self.validator {
op = op.with_validator(Arc::clone(validator));
}
Box::new(op)
};
Ok((operator, output_columns))
}
pub(super) fn expression_to_property_source(
&self,
expr: &LogicalExpression,
columns: &[String],
) -> Result<PropertySource> {
match expr {
LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
LogicalExpression::Variable(name) => {
let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
Error::Internal(format!("Variable '{}' not found for property source", name))
})?;
Ok(PropertySource::Column(col_idx))
}
LogicalExpression::Property { variable, property } => {
let col_idx = columns.iter().position(|c| c == variable).ok_or_else(|| {
Error::Internal(format!(
"Variable '{}' not found for property access '{}.{}'",
variable, variable, property
))
})?;
Ok(PropertySource::PropertyAccess {
column: col_idx,
property: property.clone(),
})
}
LogicalExpression::Parameter(name) => {
Ok(PropertySource::Constant(
grafeo_common::types::Value::String(format!("${}", name).into()),
))
}
_ => {
if let Some(value) = Self::try_fold_expression(expr) {
Ok(PropertySource::Constant(value))
} else {
Err(Error::Internal(format!(
"Unsupported expression type for property source: {:?}",
expr
)))
}
}
}
}
pub(super) fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
match expr {
LogicalExpression::Literal(v) => Some(v.clone()),
LogicalExpression::List(items) => {
let values: Option<Vec<Value>> =
items.iter().map(Self::try_fold_expression).collect();
Some(Value::List(values?.into()))
}
LogicalExpression::FunctionCall { name, args, .. } => {
match name.to_lowercase().as_str() {
"vector" => {
if args.len() != 1 {
return None;
}
let val = Self::try_fold_expression(&args[0])?;
match val {
Value::List(items) => {
let floats: Vec<f32> = items
.iter()
.filter_map(|v| match v {
Value::Float64(f) => Some(*f as f32),
Value::Int64(i) => Some(*i as f32),
_ => None,
})
.collect();
if floats.len() == items.len() {
Some(Value::Vector(floats.into()))
} else {
None
}
}
Value::Vector(v) => Some(Value::Vector(v)),
_ => None,
}
}
"timestamp" => {
if !args.is_empty() {
return None;
}
Some(Value::Int64(
grafeo_common::types::Timestamp::now().as_millis(),
))
}
"now" | "current_timestamp" | "currenttimestamp" => {
if !args.is_empty() {
return None;
}
Some(Value::Timestamp(grafeo_common::types::Timestamp::now()))
}
"date" | "todate" | "current_date" | "currentdate" => {
if args.is_empty() {
return Some(Value::Date(grafeo_common::types::Date::today()));
}
if args.len() != 1 {
return None;
}
let val = Self::try_fold_expression(&args[0])?;
match val {
Value::String(s) => {
grafeo_common::types::Date::parse(&s).map(Value::Date)
}
_ => None,
}
}
"time" | "totime" | "local_time" | "current_time" | "currenttime" => {
if args.is_empty() {
return Some(Value::Time(grafeo_common::types::Time::now()));
}
if args.len() != 1 {
return None;
}
let val = Self::try_fold_expression(&args[0])?;
match val {
Value::String(s) => {
grafeo_common::types::Time::parse(&s).map(Value::Time)
}
_ => None,
}
}
"datetime" | "localdatetime" | "local_datetime" | "todatetime" => {
if args.is_empty() {
return Some(Value::Timestamp(grafeo_common::types::Timestamp::now()));
}
if args.len() != 1 {
return None;
}
let val = Self::try_fold_expression(&args[0])?;
match val {
Value::String(s) => {
if let Some(d) = grafeo_common::types::Date::parse(&s) {
return Some(Value::Timestamp(d.to_timestamp()));
}
if let Some(pos) = s.find('T') {
let (date_part, time_part) = (&s[..pos], &s[pos + 1..]);
if let (Some(d), Some(t)) = (
grafeo_common::types::Date::parse(date_part),
grafeo_common::types::Time::parse(time_part),
) {
return Some(Value::Timestamp(
grafeo_common::types::Timestamp::from_date_time(d, t),
));
}
}
None
}
_ => None,
}
}
_ => None,
}
}
LogicalExpression::Map(entries) => {
let folded: Option<Vec<(String, Value)>> = entries
.iter()
.map(|(k, v)| Self::try_fold_expression(v).map(|val| (k.clone(), val)))
.collect();
let folded = folded?;
let map: std::collections::BTreeMap<grafeo_common::types::PropertyKey, Value> =
folded
.into_iter()
.map(|(k, v)| (grafeo_common::types::PropertyKey::from(k), v))
.collect();
Some(Value::Map(std::sync::Arc::new(map)))
}
LogicalExpression::Unary { op, operand } => {
let value = Self::try_fold_expression(operand)?;
match op {
UnaryOp::Neg => match value {
Value::Int64(n) => Some(Value::Int64(-n)),
Value::Float64(f) => Some(Value::Float64(-f)),
_ => None,
},
UnaryOp::Not => match value {
Value::Bool(b) => Some(Value::Bool(!b)),
_ => None,
},
UnaryOp::IsNull | UnaryOp::IsNotNull => None,
}
}
_ => None,
}
}
}