selene-db-gql 1.3.0

ISO/IEC 39075:2024 GQL parser, planner, optimizer, and executor for selene-db.
Documentation
//! Inline `CALL { ... }` table-subquery pipeline operator.

use selene_core::Value;

use crate::{
    BindingTableColumn, BindingTableSchema, ExecutionPlan, PatternPlan, PipelineOp,
    PlannedTableSubquery,
    runtime::{Binding, BindingTable, ExecutorError, TxContext, pattern, plan_runner},
};

pub(super) fn execute(
    call: &PlannedTableSubquery,
    table: BindingTable,
    ctx: &mut TxContext<'_, '_>,
) -> Result<BindingTable, ExecutorError> {
    execute_read_only(call, table, ctx)
}

pub(super) fn execute_read_only(
    call: &PlannedTableSubquery,
    table: BindingTable,
    ctx: &TxContext<'_, '_>,
) -> Result<BindingTable, ExecutorError> {
    let (input_schema, input_rows) = table.into_parts();
    let output_schema = output_schema(&input_schema, call);
    let mut output = Vec::new();
    let mut rows_since_check = 0;

    for row in input_rows {
        ctx.check_cancellation_stride(&mut rows_since_check, 1)?;
        let target_schema = target_schema(call, &input_schema)?;
        if null_outer_binding_is_plan_pattern_binding(call, &row, &input_schema)? {
            if call.optional {
                output.push(optional_output_row(call, &row));
            }
            continue;
        }
        let seed = seed_binding(call, &row, &input_schema, &target_schema)?;
        let inner = plan_runner::execute_plan_read_only_with_seed(
            &call.body,
            Some(BindingTable::new(target_schema, vec![seed])),
            ctx,
        )?;
        let yield_indices = yield_indices(call, inner.schema())?;
        if call.optional && inner.rows().is_empty() {
            output.push(optional_output_row(call, &row));
            continue;
        }
        for inner_row in inner.rows() {
            ctx.check_cancellation_stride(&mut rows_since_check, 1)?;
            output.push(
                row.with_appended_values(
                    yield_indices
                        .iter()
                        .map(|index| inner_row.get(*index).cloned().unwrap_or(Value::Null)),
                ),
            );
        }
    }

    Ok(BindingTable::new(output_schema, output))
}

fn output_schema(
    input_schema: &BindingTableSchema,
    call: &PlannedTableSubquery,
) -> BindingTableSchema {
    let mut schema = input_schema.clone();
    schema.columns.extend(call.yield_schema.clone());
    schema
}

fn optional_output_row(call: &PlannedTableSubquery, input: &Binding) -> Binding {
    input.with_appended_values(std::iter::repeat_n(Value::Null, call.yield_schema.len()))
}

fn target_schema(
    call: &PlannedTableSubquery,
    source_schema: &BindingTableSchema,
) -> Result<BindingTableSchema, ExecutorError> {
    let mut schema = call
        .body
        .pattern_plan
        .as_ref()
        .map(pattern::schema_for_pattern)
        .unwrap_or_else(|| BindingTableSchema {
            columns: Vec::new(),
        });
    for outer in &call.outer_binding_refs {
        if pattern::column_index(&schema, &outer.name).is_some() {
            continue;
        }
        let source_index = source_index(source_schema, outer.name.clone())?;
        let source_column = &source_schema.columns[source_index];
        schema.columns.push(BindingTableColumn {
            name: Some(outer.name.clone()),
            hidden: None,
            ty: source_column.ty.clone(),
        });
    }
    Ok(schema)
}

fn seed_binding(
    call: &PlannedTableSubquery,
    row: &Binding,
    source_schema: &BindingTableSchema,
    target_schema: &BindingTableSchema,
) -> Result<Binding, ExecutorError> {
    let mut values = vec![Value::Null; target_schema.columns.len()];
    for outer in &call.outer_binding_refs {
        let source_index = source_index(source_schema, outer.name.clone())?;
        let value = row.get(source_index).cloned().unwrap_or(Value::Null);
        let target_index = pattern::column_index(target_schema, &outer.name).ok_or(
            ExecutorError::ImplementationDefined {
                detail: "CALL subquery outer binding missing from target row",
            },
        )?;
        values[target_index] = value;
    }
    Ok(Binding::new(values))
}

fn null_outer_binding_is_plan_pattern_binding(
    call: &PlannedTableSubquery,
    row: &Binding,
    source_schema: &BindingTableSchema,
) -> Result<bool, ExecutorError> {
    for outer in &call.outer_binding_refs {
        if !plan_binds_name(&call.body, outer.name.clone()) {
            continue;
        }
        let source_index = source_index(source_schema, outer.name.clone())?;
        if matches!(row.get(source_index), Some(Value::Null) | None) {
            return Ok(true);
        }
    }
    Ok(false)
}

fn plan_binds_name(plan: &ExecutionPlan, name: selene_core::DbString) -> bool {
    plan.pattern_plan
        .as_ref()
        .is_some_and(|pattern| pattern_binds_name(pattern, name.clone()))
        || plan
            .pipeline
            .iter()
            .any(|op| op_binds_name(op, name.clone()))
}

fn op_binds_name(op: &PipelineOp, name: selene_core::DbString) -> bool {
    match op {
        PipelineOp::Match(pattern) | PipelineOp::OptionalMatch(pattern) => {
            pattern_binds_name(pattern, name)
        }
        PipelineOp::Union { rhs, .. }
        | PipelineOp::Chain(rhs)
        | PipelineOp::CorrelatedChain(rhs)
        | PipelineOp::ExplainPlan { inner: rhs, .. } => plan_binds_name(rhs, name),
        PipelineOp::CallSubquery(subquery) => plan_binds_name(&subquery.body, name),
        _ => false,
    }
}

fn pattern_binds_name(pattern: &PatternPlan, name: selene_core::DbString) -> bool {
    pattern.bindings.iter().any(|binding| binding.name == name)
}

fn yield_indices(
    call: &PlannedTableSubquery,
    inner_schema: &BindingTableSchema,
) -> Result<Vec<usize>, ExecutorError> {
    call.yield_items
        .iter()
        .map(|item| source_index(inner_schema, item.source.clone()))
        .collect()
}

fn source_index(
    schema: &BindingTableSchema,
    name: selene_core::DbString,
) -> Result<usize, ExecutorError> {
    pattern::column_index(schema, &name).ok_or(ExecutorError::ImplementationDefined {
        detail: "CALL subquery binding missing from source row",
    })
}