selene-db-gql 1.3.0

ISO/IEC 39075:2024 GQL parser, planner, optimizer, and executor for selene-db.
Documentation
//! Bounded variable-length edge repeat operator.

use std::collections::BTreeSet;

use selene_core::{EdgeId, NodeId, Value};
use selene_graph::adjacency::AdjacencyEdge;

use crate::{
    EdgeDirection, JoinTree, PathMode, PatternPlan,
    plan::RepeatEdgeMatch,
    runtime::{Binding, BindingTableSchema, EvalCtx, ExecutorError},
};

use super::{pattern, scan, visited_set};

pub(crate) fn execute(
    child: &JoinTree,
    edge: &RepeatEdgeMatch,
    direction: EdgeDirection,
    min: u32,
    max: Option<u32>,
    path_mode: PathMode,
    env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Vec<Binding>, ExecutorError> {
    if path_mode != PathMode::Walk && max.is_some() {
        return Err(ExecutorError::FeatureNotSupportedYet {
            feature: "non-WALK variable-length edge execution",
            span: edge.span,
        });
    }

    let source_index = pattern::source_index(
        env.pattern,
        env.schema,
        edge.left_binding,
        edge.left_hidden_binding,
        "repeat source binding column missing",
    )?;
    let child_rows = pattern::walk_join_tree(child, env)?;
    let mut state = RepeatState {
        edge,
        direction,
        min,
        max,
        path_mode,
        cap: env.ctx.impl_defined_caps().max_quantifier,
        pattern_plan: env.pattern,
        schema: env.schema,
        source_index,
        final_slot: pattern::ColumnSlot::binding(
            env.pattern,
            env.schema,
            edge.final_binding,
            "repeat final binding column missing",
        )?,
        final_hidden_slot: pattern::ColumnSlot::hidden(
            env.schema,
            edge.final_hidden_binding,
            "repeat final hidden binding column missing",
        )?,
        group_slot: pattern::ColumnSlot::binding(
            env.pattern,
            env.schema,
            edge.group_binding,
            "repeat group binding column missing",
        )?,
        group_hidden_slot: pattern::ColumnSlot::hidden(
            env.schema,
            edge.group_hidden_binding,
            "repeat group hidden binding column missing",
        )?,
        ctx: env.ctx,
        output: Vec::new(),
        steps_since_check: 0,
    };
    for row in child_rows {
        state
            .ctx
            .tx
            .check_cancellation_stride(&mut state.steps_since_check, 1)?;
        let Some(source) = pattern::node_at_index(
            &row,
            state.source_index,
            "repeat source binding is not a node",
        )?
        else {
            continue;
        };
        let mut path_edges = Vec::new();
        traverse(source, source, &row, &mut path_edges, 0, &mut state)?;
    }
    Ok(state.output)
}

struct RepeatState<'a, 'eval, 'ctx, 'g, 'plan> {
    edge: &'a RepeatEdgeMatch,
    direction: EdgeDirection,
    min: u32,
    max: Option<u32>,
    path_mode: PathMode,
    cap: u32,
    pattern_plan: &'a PatternPlan,
    schema: &'a BindingTableSchema,
    source_index: usize,
    final_slot: pattern::ColumnSlot,
    final_hidden_slot: pattern::ColumnSlot,
    group_slot: pattern::ColumnSlot,
    group_hidden_slot: pattern::ColumnSlot,
    ctx: &'a EvalCtx<'eval, 'ctx, 'g, 'plan>,
    output: Vec<Binding>,
    steps_since_check: usize,
}

fn traverse(
    source: NodeId,
    current: NodeId,
    row: &Binding,
    path_edges: &mut Vec<EdgeId>,
    depth: u32,
    state: &mut RepeatState<'_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    if depth >= state.min {
        maybe_emit_path(current, row, path_edges, state)?;
    }
    if state.max == Some(depth) {
        return Ok(());
    }

    for adjacent in adjacent_edges(current, state.direction, state.ctx) {
        state
            .ctx
            .tx
            .check_cancellation_stride(&mut state.steps_since_check, 1)?;
        if !edge_step_matches(adjacent.edge_id, row, path_edges, state)? {
            continue;
        }
        let next_depth = depth.saturating_add(1);
        let step = visited_set::repeat_step(
            visited_set::RepeatStepInput {
                path_mode: state.path_mode,
                source,
                target: adjacent.neighbor,
                edge: adjacent.edge_id,
                direction: state.direction,
                path_edges,
                next_depth,
                min: state.min,
            },
            pattern::WalkContext {
                pattern: state.pattern_plan,
                schema: state.schema,
                seed: None,
                ctx: state.ctx,
            },
        )?;
        let Some(step) = step else {
            continue;
        };
        if state.max.is_none() && next_depth > state.cap {
            return Err(ExecutorError::ProgramLimitExceeded {
                detail: "max_quantifier",
                span: state.edge.span,
            });
        }
        path_edges.push(adjacent.edge_id);
        if step.terminal {
            maybe_emit_path(adjacent.neighbor, row, path_edges, state)?;
        } else {
            traverse(
                source,
                adjacent.neighbor,
                row,
                path_edges,
                next_depth,
                state,
            )?;
        }
        path_edges.pop();
    }
    Ok(())
}

fn maybe_emit_path(
    final_node: NodeId,
    row: &Binding,
    path_edges: &[EdgeId],
    state: &mut RepeatState<'_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    if !final_node_label_matches(state.edge, final_node, state.ctx) {
        return Ok(());
    }

    let mut values = row.values().to_vec();
    values.resize(state.schema.columns.len(), Value::Null);
    if !state
        .final_slot
        .set(&mut values, Value::NodeRef(final_node))
    {
        return Ok(());
    }
    if !state
        .final_hidden_slot
        .set(&mut values, Value::NodeRef(final_node))
    {
        return Ok(());
    }
    let group_value = edge_list_value(path_edges);
    if !state.group_slot.set(&mut values, group_value.clone()) {
        return Ok(());
    }
    if !state.group_hidden_slot.set(&mut values, group_value) {
        return Ok(());
    }

    let candidate = Binding::new(values);
    if !predicates_pass(
        &state.edge.final_property_predicates,
        &candidate,
        &Value::NodeRef(final_node),
        state,
    )? {
        return Ok(());
    }
    state.output.push(candidate);
    Ok(())
}

fn edge_step_matches(
    edge_id: EdgeId,
    row: &Binding,
    path_edges: &[EdgeId],
    state: &RepeatState<'_, '_, '_, '_, '_>,
) -> Result<bool, ExecutorError> {
    if !edge_label_matches(state.edge, edge_id, state.ctx) {
        return Ok(false);
    }
    predicates_pass(
        &state.edge.property_predicates,
        row,
        &Value::EdgeRef(edge_id),
        state,
    )
    .and_then(|passes| {
        if !passes || state.edge.inline_predicates.is_empty() {
            return Ok(passes);
        }
        let candidate;
        let predicate_row = if state.edge.group_binding.is_some() {
            candidate = current_step_row(row, path_edges, edge_id, state)?;
            let Some(candidate) = candidate.as_ref() else {
                return Ok(false);
            };
            candidate
        } else {
            row
        };
        pattern::filter_predicates_pass(
            &state.edge.inline_predicates,
            state.pattern_plan,
            predicate_row,
            state.schema,
            state.ctx,
        )
    })
}

fn current_step_row(
    row: &Binding,
    path_edges: &[EdgeId],
    edge_id: EdgeId,
    state: &RepeatState<'_, '_, '_, '_, '_>,
) -> Result<Option<Binding>, ExecutorError> {
    let mut values = row.values().to_vec();
    values.resize(state.schema.columns.len(), Value::Null);
    let edge_list = path_edges
        .iter()
        .copied()
        .chain(std::iter::once(edge_id))
        .map(Value::EdgeRef)
        .collect();
    if !state.group_slot.set(&mut values, Value::List(edge_list)) {
        return Ok(None);
    }
    Ok(Some(Binding::new(values)))
}

fn edge_list_value(path_edges: &[EdgeId]) -> Value {
    Value::List(path_edges.iter().copied().map(Value::EdgeRef).collect())
}

fn predicates_pass(
    predicates: &[crate::FilterPredicate],
    row: &Binding,
    entity: &Value,
    state: &RepeatState<'_, '_, '_, '_, '_>,
) -> Result<bool, ExecutorError> {
    for predicate in predicates {
        if !scan::predicate_passes(
            predicate,
            state.pattern_plan,
            row,
            state.schema,
            entity,
            state.ctx,
        )? {
            return Ok(false);
        }
    }
    Ok(true)
}

fn adjacent_edges(
    node: NodeId,
    direction: EdgeDirection,
    ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Vec<AdjacencyEdge> {
    match direction {
        EdgeDirection::Right => ctx
            .tx
            .snapshot()
            .outgoing_edges(node)
            .map(|entry| entry.iter().cloned().collect())
            .unwrap_or_default(),
        EdgeDirection::Left => ctx
            .tx
            .snapshot()
            .incoming_edges(node)
            .map(|entry| entry.iter().cloned().collect())
            .unwrap_or_default(),
        EdgeDirection::Undirected => {
            let mut seen = BTreeSet::new();
            let mut edges = Vec::new();
            if let Some(entry) = ctx.tx.snapshot().outgoing_edges(node) {
                for adjacent in entry.iter().cloned() {
                    if seen.insert(adjacent.edge_id) {
                        edges.push(adjacent);
                    }
                }
            }
            if let Some(entry) = ctx.tx.snapshot().incoming_edges(node) {
                for adjacent in entry.iter().cloned() {
                    if seen.insert(adjacent.edge_id) {
                        edges.push(adjacent);
                    }
                }
            }
            edges
        }
    }
}

fn edge_label_matches(
    edge: &RepeatEdgeMatch,
    edge_id: EdgeId,
    ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
    let Some(label_expr) = &edge.label_predicate else {
        return true;
    };
    ctx.tx
        .snapshot()
        .edge_label(edge_id)
        .is_some_and(|label| scan::label_matches_edge(label_expr, label.clone()))
}

fn final_node_label_matches(
    edge: &RepeatEdgeMatch,
    node: NodeId,
    ctx: &EvalCtx<'_, '_, '_, '_>,
) -> bool {
    let Some(label_expr) = &edge.final_label_predicate else {
        return true;
    };
    ctx.tx
        .snapshot()
        .node_labels(node)
        .is_some_and(|labels| scan::label_matches_node(label_expr, labels))
}