selene-db-gql 1.3.0

ISO/IEC 39075:2024 GQL parser, planner, optimizer, and executor for selene-db.
Documentation
//! Analyzed-statement to plan lowering.

mod aggregate;
mod binding_refs;
mod bindings;
mod call;
mod catalog;
mod expr;
mod match_clause;
mod match_mode;
mod mutation;
mod optional_filters;
mod path_mode;
mod path_search;
mod query;
mod repeat;
mod sequential_match;
mod set_op;

use query::{lower_query_pipeline, lower_return, nullable_call_yield_type, visible_after_pattern};
use set_op::assert_arms_column_name_equal;

use crate::{
    GqlType, ProcedureRegistry, QueryPipeline, SourceSpan,
    analyze::{AnalyzedStatement, AnalyzedStatementKind, AnalyzedType, ExprId, StatementCategory},
    plan::{
        BindingTableColumn, BindingTableSchema, ExecutionPlan, ImplDefinedCaps, PipelineOp,
        PlannerError, SessionOp, TxOp,
    },
};

/// Lower an analyzed statement into a literal, unoptimized execution plan
/// using the default implementation-defined caps ([`ImplDefinedCaps::DEFAULT`]).
///
/// Thin wrapper over [`plan_with_caps`]; call that to inject caller-configured
/// caps (see [`crate::runtime::Session::with_impl_defined_caps`]).
pub fn plan(
    analyzed: &AnalyzedStatement,
    registry: &dyn ProcedureRegistry,
) -> Result<ExecutionPlan, PlannerError> {
    plan_with_caps(analyzed, registry, &ImplDefinedCaps::DEFAULT)
}

/// Lower an analyzed statement into a literal, unoptimized execution plan,
/// stamping the caller-supplied implementation-defined caps.
///
/// Dispatches by [`AnalyzedStatementKind`]: queries / set-composed / NEXT-chained
/// pipelines walk the read pipeline; mutations lower from the analyzer's
/// [`MutationWriteSet`]; DDL lowers to a single [`PipelineOp::Catalog`];
/// transaction control lowers to a single [`PipelineOp::Tx`]; top-level CALL
/// looks up procedure metadata in `registry` and lowers to [`PipelineOp::Call`].
///
/// `caps` reach the plan two ways: the plan-time variable-length quantifier gate
/// consults `caps.max_quantifier` *during* lowering (threaded as `max_quantifier`),
/// and the finished top-level plan carries `*caps` in `impl_defined_caps` for the
/// runtime context and optimizer. Nested plans (set-op / NEXT / CALL-subquery
/// bodies) execute under the parent context, so only the top-level stamp is read.
///
/// [`MutationWriteSet`]: crate::analyze::MutationWriteSet
#[tracing::instrument(
    name = "selene.gql.plan",
    skip(analyzed, registry, caps),
    fields(category = ?analyzed.category)
)]
pub fn plan_with_caps(
    analyzed: &AnalyzedStatement,
    registry: &dyn ProcedureRegistry,
    caps: &ImplDefinedCaps,
) -> Result<ExecutionPlan, PlannerError> {
    let mut plan = lower_statement_kind(&analyzed.statement, registry, analyzed, caps)?;
    plan.category = analyzed.category;
    plan.expr_ids = analyzed.expr_ids.clone();
    expr::populate_plan_subqueries(&mut plan, analyzed, registry, caps.max_quantifier)?;
    // Why: only the top-level plan's caps are consumed — statement.rs builds the
    // runtime TxContext and the optimizer's OptimizeContext from
    // `plan.impl_defined_caps`, and nested plans execute under that same context.
    // The one cap needed mid-lowering (the quantifier gate) is threaded above as
    // `max_quantifier`, so a post-lowering stamp here covers every statement kind.
    plan.impl_defined_caps = *caps;
    plan.refresh_pipeline_op_high_water();
    Ok(plan)
}

fn lower_statement_kind(
    statement: &AnalyzedStatementKind,
    registry: &dyn ProcedureRegistry,
    analyzed: &AnalyzedStatement,
    caps: &ImplDefinedCaps,
) -> Result<ExecutionPlan, PlannerError> {
    // The quantifier gate is the only cap threaded recursively mid-lowering; the
    // DDL key-label-set IL003 gate needs the full caps, so `lower_ddl` receives
    // `caps` directly.
    let max_quantifier = caps.max_quantifier;
    match statement {
        AnalyzedStatementKind::Query(pipeline) => {
            lower_query_pipeline(pipeline, registry, analyzed, max_quantifier)
        }
        AnalyzedStatementKind::Composite { first, rest, .. } => {
            let mut plan = lower_query_pipeline(first, registry, analyzed, max_quantifier)?;
            for (op, rhs) in rest {
                let rhs_plan = lower_query_pipeline(rhs, registry, analyzed, max_quantifier)?;
                // ISO §14.2 SR v: set-composition arms must be column
                // name-equal. The binder binds each arm independently, so the
                // names are first available here on the lowered output schemas.
                assert_arms_column_name_equal(
                    *op,
                    &plan.output_schema,
                    &rhs_plan.output_schema,
                    rhs.span,
                )?;
                plan.pipeline.push(PipelineOp::Union {
                    op: *op,
                    rhs: Box::new(rhs_plan),
                });
            }
            Ok(plan)
        }
        AnalyzedStatementKind::Chained { blocks, .. } => {
            lower_chained(blocks, registry, analyzed, max_quantifier)
        }
        AnalyzedStatementKind::Mutate(pipeline) => {
            mutation::lower_mutation(pipeline, analyzed, max_quantifier)
        }
        AnalyzedStatementKind::Ddl(statement) => catalog::lower_ddl(statement, analyzed, caps),
        AnalyzedStatementKind::Call(call) => call::lower_top_level_call(call, registry, analyzed),
        AnalyzedStatementKind::Explain { inner, span } => {
            lower_explain(inner, *span, registry, analyzed, caps)
        }
        AnalyzedStatementKind::StartTransaction(span) => Ok(tx_plan(TxOp::Start { span: *span })),
        AnalyzedStatementKind::Commit(span) => Ok(tx_plan(TxOp::Commit { span: *span })),
        AnalyzedStatementKind::Rollback(span) => Ok(tx_plan(TxOp::Rollback { span: *span })),
        AnalyzedStatementKind::SessionSetValue {
            param,
            declared_type,
            value,
            if_not_exists,
            span,
        } => Ok(session_plan(SessionOp::SetValue {
            param: param.clone(),
            declared_type: declared_type.clone(),
            value: value.clone(),
            if_not_exists: *if_not_exists,
            span: *span,
        })),
        AnalyzedStatementKind::SessionSetTimeZone { zone, span } => {
            Ok(session_plan(SessionOp::SetTimeZone {
                zone: zone.clone(),
                span: *span,
            }))
        }
        AnalyzedStatementKind::SessionSetGraph { target, span } => {
            Ok(session_plan(SessionOp::SetGraph {
                target: *target,
                span: *span,
            }))
        }
        AnalyzedStatementKind::SessionReset { target, span } => {
            Ok(session_plan(session_reset_op(target, *span)))
        }
        AnalyzedStatementKind::SessionClose(span) => {
            Ok(session_plan(SessionOp::Close { span: *span }))
        }
    }
}

fn session_reset_op(target: &crate::SessionResetTarget, span: SourceSpan) -> SessionOp {
    use crate::SessionResetTarget;
    match target {
        SessionResetTarget::AllCharacteristics => SessionOp::ResetAllCharacteristics { span },
        SessionResetTarget::Parameters => SessionOp::ResetParameters { span },
        SessionResetTarget::TimeZone => SessionOp::ResetTimeZone { span },
        SessionResetTarget::Parameter(param) => SessionOp::ResetParameter {
            param: param.clone(),
            span,
        },
    }
}

fn lower_explain(
    inner: &AnalyzedStatementKind,
    span: SourceSpan,
    registry: &dyn ProcedureRegistry,
    analyzed: &AnalyzedStatement,
    caps: &ImplDefinedCaps,
) -> Result<ExecutionPlan, PlannerError> {
    let inner = lower_statement_kind(inner, registry, analyzed, caps)?;
    Ok(ExecutionPlan {
        category: StatementCategory::ReadOnly,
        pattern_plan: None,
        pipeline: vec![PipelineOp::ExplainPlan {
            inner: Box::new(inner),
            span,
        }],
        output_schema: explain_output_schema(span)?,
        impl_defined_caps: ImplDefinedCaps::default(),
        expr_ids: analyzed.expr_ids.clone(),
        subqueries: Default::default(),
        next_expr_id: next_expr_id(analyzed),
        next_pipeline_op_id: crate::PipelineOpId::new(1),
    })
}

fn lower_chained(
    blocks: &[QueryPipeline],
    registry: &dyn ProcedureRegistry,
    analyzed: &AnalyzedStatement,
    max_quantifier: u32,
) -> Result<ExecutionPlan, PlannerError> {
    let Some((first, rest)) = blocks.split_first() else {
        return Ok(empty_plan());
    };
    let mut plan = lower_query_pipeline(first, registry, analyzed, max_quantifier)?;
    // NEXT's output_schema must reflect the final block's projection because
    // each NEXT discards the prior block's columns.
    for block in rest {
        let correlated = block_references_prior_bindings(block.span, analyzed);
        let inner = lower_query_pipeline(block, registry, analyzed, max_quantifier)?;
        plan.output_schema = inner.output_schema.clone();
        let inner = Box::new(inner);
        if correlated {
            plan.pipeline.push(PipelineOp::CorrelatedChain(inner));
        } else {
            plan.pipeline.push(PipelineOp::Chain(inner));
        }
    }
    Ok(plan)
}

fn block_references_prior_bindings(block_span: SourceSpan, analyzed: &AnalyzedStatement) -> bool {
    for reference in &analyzed.references {
        if !span_contains(block_span, reference.span) {
            continue;
        }
        let Some(decl) = analyzed.scopes.declaration(reference.binding) else {
            continue;
        };
        if !span_contains(block_span, decl.span()) {
            return true;
        }
    }
    false
}

fn span_contains(outer: SourceSpan, inner: SourceSpan) -> bool {
    outer.byte_offset <= inner.byte_offset && inner.end() <= outer.end()
}

fn explain_output_schema(span: SourceSpan) -> Result<BindingTableSchema, PlannerError> {
    let name = selene_core::db_string("plan").map_err(|_err| {
        PlannerError::StaticStringConstructionFailed {
            detail: "static EXPLAIN column 'plan'",
            span,
        }
    })?;
    Ok(BindingTableSchema {
        columns: vec![BindingTableColumn {
            name: Some(name),
            hidden: None,
            ty: AnalyzedType::Resolved(GqlType::String),
        }],
    })
}

fn empty_plan() -> ExecutionPlan {
    ExecutionPlan {
        category: StatementCategory::ReadOnly,
        pattern_plan: None,
        pipeline: Vec::new(),
        output_schema: BindingTableSchema {
            columns: Vec::new(),
        },
        impl_defined_caps: ImplDefinedCaps::default(),
        expr_ids: Default::default(),
        subqueries: Default::default(),
        next_expr_id: ExprId::new(0),
        next_pipeline_op_id: crate::PipelineOpId::new(0),
    }
}

fn tx_plan(op: TxOp) -> ExecutionPlan {
    ExecutionPlan {
        category: StatementCategory::TransactionControl,
        pattern_plan: None,
        pipeline: vec![PipelineOp::Tx(op)],
        output_schema: BindingTableSchema {
            columns: Vec::new(),
        },
        impl_defined_caps: ImplDefinedCaps::default(),
        expr_ids: Default::default(),
        subqueries: Default::default(),
        next_expr_id: ExprId::new(0),
        next_pipeline_op_id: crate::PipelineOpId::new(1),
    }
}

fn session_plan(op: SessionOp) -> ExecutionPlan {
    ExecutionPlan {
        category: StatementCategory::SessionControl,
        pattern_plan: None,
        pipeline: vec![PipelineOp::Session(op)],
        output_schema: BindingTableSchema {
            columns: Vec::new(),
        },
        impl_defined_caps: ImplDefinedCaps::default(),
        expr_ids: Default::default(),
        subqueries: Default::default(),
        next_expr_id: ExprId::new(0),
        next_pipeline_op_id: crate::PipelineOpId::new(1),
    }
}

pub(super) fn next_expr_id(analyzed: &AnalyzedStatement) -> ExprId {
    ExprId::new(analyzed.expr_types.len() as u32)
}

#[cfg(test)]
mod defensive_tests {
    use super::*;
    use crate::{
        EmptyProcedureRegistry, Literal, PipelineStatement, ReturnClause, ReturnItem, SourceSpan,
        Statement, ValueExpr,
        analyze::{BindingId, BindingScopeTree, ExprIdLookup, ExprTypeTable, StatementCategory},
        parse,
    };

    #[test]
    fn missing_expression_type_reports_planner_error() {
        let expr = ValueExpr::Literal(Literal::Integer(1, SourceSpan::new(7, 1)));
        let statement = AnalyzedStatement {
            statement: AnalyzedStatementKind::Query(QueryPipeline {
                statements: vec![PipelineStatement::Return(ReturnClause {
                    distinct: false,
                    star: false,
                    items: vec![ReturnItem {
                        expr,
                        alias: None,
                        span: SourceSpan::new(7, 1),
                    }],
                    group_by: None,
                    having: None,
                    span: SourceSpan::new(0, 8),
                })],
                span: SourceSpan::new(0, 8),
            }),
            scopes: BindingScopeTree::new(SourceSpan::new(0, 8)),
            references: Vec::new(),
            expr_types: ExprTypeTable::default(),
            expr_ids: ExprIdLookup::default(),
            span: SourceSpan::new(0, 8),
            category: StatementCategory::ReadOnly,
            write_set: None,
        };
        let err = plan(&statement, &EmptyProcedureRegistry).expect_err("missing expr cell");
        assert!(matches!(err, PlannerError::ExpressionTypeMissing { .. }));
    }

    #[test]
    fn lost_binding_reference_reports_planner_error() {
        let parsed = parse("RETURN n").expect("test input parses");
        let Statement::Query(parsed_query) = parsed else {
            unreachable!("parser returns query");
        };
        let PipelineStatement::Return(parsed_return) = parsed_query.statements[0].clone() else {
            unreachable!("parser returns return");
        };
        let ValueExpr::Variable { name, .. } = &parsed_return.items[0].expr else {
            unreachable!("test projection is variable");
        };
        let name = name.clone();
        let mut expr_types = ExprTypeTable::default();
        let expr_id = expr_types.push(crate::AnalyzedType::DYNAMIC);
        let mut statement = AnalyzedStatement {
            statement: AnalyzedStatementKind::Query(QueryPipeline {
                statements: vec![PipelineStatement::Return(parsed_return)],
                span: parsed_query.span,
            }),
            scopes: BindingScopeTree::new(SourceSpan::new(0, 8)),
            references: vec![crate::BindingUse {
                name,
                binding: BindingId::new(999),
                span: SourceSpan::new(7, 1),
                kind: crate::BindingUseKind::Variable,
            }],
            expr_types,
            expr_ids: ExprIdLookup::default(),
            span: SourceSpan::new(0, 8),
            category: StatementCategory::ReadOnly,
            write_set: None,
        };
        let AnalyzedStatementKind::Query(query) = &statement.statement else {
            unreachable!("test builds query");
        };
        let PipelineStatement::Return(return_clause) = &query.statements[0] else {
            unreachable!("test builds return");
        };
        let mut expr_ids = ExprIdLookup::default();
        expr_ids.insert(&return_clause.items[0].expr, expr_id);
        statement.expr_ids = expr_ids;
        let err = plan(&statement, &EmptyProcedureRegistry).expect_err("lost binding");
        assert!(matches!(err, PlannerError::BindingResolutionLost { .. }));
    }
}