mod aggregate;
mod call;
mod call_subquery;
mod catalog;
mod catalog_index;
mod chain;
mod distinct;
mod explain;
mod filter;
mod group_by;
mod let_op;
mod limit;
mod match_op;
mod mutation;
mod order_by;
mod project;
pub(crate) mod session;
mod top_k;
pub(crate) mod tx;
mod union;
mod unwind;
use crate::{
PipelineOp, SubqueryRegistry,
analyze::ExprIdLookup,
runtime::{
Binding, BindingTable, EvalCtx, ExecutorError, TxContext, plan_runner,
value_key::RuntimeEqKey,
},
};
pub(super) fn row_key(row: &Binding) -> RuntimeEqKey {
RuntimeEqKey::from_row(row.values().to_vec())
}
pub fn execute_pipeline(
pipeline: &[PipelineOp],
table: BindingTable,
ctx: &mut TxContext<'_, '_>,
) -> Result<BindingTable, ExecutorError> {
let expr_ids = ExprIdLookup::default();
let subqueries = SubqueryRegistry::default();
let (expr_ids, subqueries) = ctx.plan_metadata().unwrap_or((&expr_ids, &subqueries));
execute_pipeline_with_plan(pipeline, table, ctx, expr_ids, subqueries)
}
pub(crate) fn execute_pipeline_with_plan(
pipeline: &[PipelineOp],
table: BindingTable,
ctx: &mut TxContext<'_, '_>,
expr_ids: &ExprIdLookup,
subqueries: &SubqueryRegistry,
) -> Result<BindingTable, ExecutorError> {
dispatch_pipeline(
pipeline,
table,
TxCtxRef::ReadWrite(ctx),
expr_ids,
subqueries,
)
}
pub(crate) fn execute_pipeline_read_only_with_plan(
pipeline: &[PipelineOp],
table: BindingTable,
ctx: &TxContext<'_, '_>,
expr_ids: &ExprIdLookup,
subqueries: &SubqueryRegistry,
) -> Result<BindingTable, ExecutorError> {
dispatch_pipeline(
pipeline,
table,
TxCtxRef::ReadOnly(ctx),
expr_ids,
subqueries,
)
}
enum TxCtxRef<'r, 'a, 'g> {
ReadWrite(&'r mut TxContext<'a, 'g>),
ReadOnly(&'r TxContext<'a, 'g>),
}
impl<'a, 'g> TxCtxRef<'_, 'a, 'g> {
fn shared(&self) -> &TxContext<'a, 'g> {
match self {
Self::ReadWrite(ctx) => ctx,
Self::ReadOnly(ctx) => ctx,
}
}
fn is_read_only(&self) -> bool {
matches!(self, Self::ReadOnly(_))
}
fn eval_ctx<'s, 'plan>(
&'s self,
expr_ids: &'plan ExprIdLookup,
subqueries: &'plan SubqueryRegistry,
) -> EvalCtx<'s, 'a, 'g, 'plan> {
EvalCtx {
tx: self.shared(),
expr_ids,
subqueries,
}
}
}
fn dispatch_pipeline(
pipeline: &[PipelineOp],
mut table: BindingTable,
mut ctx: TxCtxRef<'_, '_, '_>,
expr_ids: &ExprIdLookup,
subqueries: &SubqueryRegistry,
) -> Result<BindingTable, ExecutorError> {
for op in pipeline {
table = match op {
PipelineOp::Filter(predicate) => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
filter::execute(predicate, table, &eval_ctx)?
}
PipelineOp::Project(items) => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
project::execute(items, table, &eval_ctx)?
}
PipelineOp::Let(items) => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
let_op::execute(items, table, &eval_ctx)?
}
PipelineOp::Unwind {
source,
alias,
position,
span,
} => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
unwind::execute(
source,
alias.clone(),
position.clone(),
*span,
table,
&eval_ctx,
)?
}
PipelineOp::OrderBy(keys) => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
order_by::execute(keys, table, &eval_ctx)?
}
PipelineOp::Limit { offset, count } => {
limit::execute(offset, count, table, ctx.shared())?
}
PipelineOp::TopK {
keys,
offset,
count,
} => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
top_k::execute(keys, offset, count, table, ctx.shared(), &eval_ctx)?
}
PipelineOp::GroupBy { keys, aggregates } => {
let eval_ctx = ctx.eval_ctx(expr_ids, subqueries);
group_by::execute(keys, aggregates, table, &eval_ctx)?
}
PipelineOp::Distinct => distinct::execute(table, ctx.shared())?,
PipelineOp::Match(pattern) => {
match_op::execute(pattern, table, ctx.shared(), expr_ids, subqueries)?
}
PipelineOp::OptionalMatch(pattern) => {
match_op::execute_optional(pattern, table, ctx.shared(), expr_ids, subqueries)?
}
PipelineOp::ExplainPlan { inner, .. } => explain::execute(inner)?,
PipelineOp::Union { op, rhs } => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => union::execute(*op, rhs, table, ctx)?,
TxCtxRef::ReadOnly(ctx) => union::execute_read_only(*op, rhs, table, ctx)?,
},
PipelineOp::Chain(rhs) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => chain::execute(rhs, table, ctx)?,
TxCtxRef::ReadOnly(ctx) => plan_runner::execute_plan_read_only(rhs, ctx)?,
},
PipelineOp::CorrelatedChain(rhs) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => chain::execute_correlated(rhs, table, ctx)?,
TxCtxRef::ReadOnly(ctx) => chain::execute_correlated_read_only(rhs, table, ctx)?,
},
PipelineOp::Call(call) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => call::execute(call, table, ctx, expr_ids, subqueries)?,
TxCtxRef::ReadOnly(ctx) => {
call::execute_read_only(call, table, ctx, expr_ids, subqueries)?
}
},
PipelineOp::CallSubquery(call) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => call_subquery::execute(call, table, ctx)?,
TxCtxRef::ReadOnly(ctx) => call_subquery::execute_read_only(call, table, ctx)?,
},
PipelineOp::Mutation(mutation) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => {
mutation::execute(mutation, table, ctx, expr_ids, subqueries)?
}
TxCtxRef::ReadOnly(_) => return Err(read_only_write_op_error()),
},
PipelineOp::Catalog(catalog) => match &mut ctx {
TxCtxRef::ReadWrite(ctx) => catalog::execute(catalog, table, ctx)?,
TxCtxRef::ReadOnly(_) => return Err(read_only_write_op_error()),
},
PipelineOp::Tx(_) => {
if ctx.is_read_only() {
return Err(read_only_write_op_error());
}
return Err(ExecutorError::ImplementationDefined {
detail: "TX op surfaced inside execute_pipeline; should be dispatched at statement level",
});
}
PipelineOp::Session(_) => {
if ctx.is_read_only() {
return Err(read_only_write_op_error());
}
return Err(ExecutorError::ImplementationDefined {
detail: "session op surfaced inside execute_pipeline; should be dispatched at statement level",
});
}
};
ctx.shared().check_cancellation()?;
}
Ok(table)
}
fn read_only_write_op_error() -> ExecutorError {
ExecutorError::InvalidTransactionState {
detail: "write pipeline op invoked from read-only subquery",
span: crate::SourceSpan::default(),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use selene_core::{GraphId, Value};
use crate::{
EmptyProcedureRegistry, ExecutionPlan, analyze, parse, plan,
runtime::{ExecutorError, TxContext, plan_runner::execute_plan_read_only},
};
fn planned(source: &str) -> ExecutionPlan {
let statement = parse(source).expect("test input parses");
let analyzed =
analyze(statement, &EmptyProcedureRegistry, None).expect("test input analyzes");
plan(&analyzed, &EmptyProcedureRegistry).expect("test input plans")
}
fn read_only_ctx(plan: &ExecutionPlan) -> TxContext<'_, '_> {
TxContext::read_only(
Arc::new(selene_graph::SeleneGraph::new(GraphId::new(7710))),
&plan.impl_defined_caps,
&EmptyProcedureRegistry,
&[],
)
}
#[test]
fn read_only_dispatch_rejects_mutation_op() {
let plan = planned("INSERT (:Person)");
let ctx = read_only_ctx(&plan);
let err = execute_plan_read_only(&plan, &ctx)
.expect_err("mutation op must be rejected on the read-only path");
assert!(matches!(
err,
ExecutorError::InvalidTransactionState {
detail: "write pipeline op invoked from read-only subquery",
..
}
));
}
#[test]
fn read_only_dispatch_rejects_catalog_op() {
let plan = planned("CREATE NODE TYPE :Person ()");
let ctx = read_only_ctx(&plan);
let err = execute_plan_read_only(&plan, &ctx)
.expect_err("catalog op must be rejected on the read-only path");
assert!(matches!(
err,
ExecutorError::InvalidTransactionState {
detail: "write pipeline op invoked from read-only subquery",
..
}
));
}
#[test]
fn read_only_dispatch_runs_read_op() {
let plan = planned("RETURN 1 AS n");
let ctx = read_only_ctx(&plan);
let table = execute_plan_read_only(&plan, &ctx).expect("read op runs read-only");
assert_eq!(table.row_count(), 1);
assert_eq!(table.rows()[0].values(), &[Value::Int(1)]);
}
}