selene-db-gql 1.3.0

ISO/IEC 39075:2024 GQL parser, planner, optimizer, and executor for selene-db.
Documentation
//! Expand join-tree operator.

use std::collections::{BTreeMap, BTreeSet};

use roaring::RoaringBitmap;
use selene_core::{EdgeId, NodeId, Value};
use selene_graph::RowIndex;

use crate::{
    EdgeDirection, EdgeMatch, JoinTree, PatternPlan,
    runtime::{Binding, BindingTableSchema, EvalCtx, ExecutorError},
};

use super::{edge_access, pattern, scan};

pub(crate) fn execute(
    child: &JoinTree,
    edge: &EdgeMatch,
    direction: EdgeDirection,
    env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Vec<Binding>, ExecutorError> {
    let source_index = pattern::source_index(
        env.pattern,
        env.schema,
        edge.left_binding,
        edge.left_hidden_binding,
        "expand source binding column missing",
    )?;
    let child_rows = pattern::walk_join_tree(child, env)?;
    let mut rows = Vec::new();
    let edge_row_filter = edge_access::candidate_row_filter(edge, env.ctx)?;
    let mut state = ExpandState {
        edge,
        pattern_plan: env.pattern,
        schema: env.schema,
        source_index,
        edge_slot: pattern::ColumnSlot::binding(
            env.pattern,
            env.schema,
            edge.binding,
            "expand edge binding column missing",
        )?,
        edge_hidden_slot: pattern::ColumnSlot::hidden(
            env.schema,
            edge.hidden_binding,
            "expand edge hidden binding column missing",
        )?,
        right_slot: pattern::ColumnSlot::binding(
            env.pattern,
            env.schema,
            edge.right_binding,
            "expand right binding column missing",
        )?,
        right_hidden_slot: pattern::ColumnSlot::hidden(
            env.schema,
            edge.right_hidden_binding,
            "expand right hidden binding column missing",
        )?,
        edge_row_filter,
        ctx: env.ctx,
        output: &mut rows,
    };
    if state
        .edge_row_filter
        .as_ref()
        .is_some_and(|filter| filter.len() <= child_rows.len() as u64)
    {
        expand_from_indexed_edges(&child_rows, direction, &mut state)?;
    } else {
        for row in child_rows {
            let Some(source) = pattern::node_at_index(
                &row,
                state.source_index,
                "expand source binding is not a node",
            )?
            else {
                continue;
            };
            expand_from_source(source, &row, direction, &mut state)?;
        }
    }
    Ok(rows)
}

struct ExpandState<'a, 'eval, 'ctx, 'g, 'plan, 'out> {
    edge: &'a EdgeMatch,
    pattern_plan: &'a PatternPlan,
    schema: &'a BindingTableSchema,
    source_index: usize,
    edge_slot: pattern::ColumnSlot,
    edge_hidden_slot: pattern::ColumnSlot,
    right_slot: pattern::ColumnSlot,
    right_hidden_slot: pattern::ColumnSlot,
    edge_row_filter: Option<RoaringBitmap>,
    ctx: &'a EvalCtx<'eval, 'ctx, 'g, 'plan>,
    output: &'out mut Vec<Binding>,
}

fn expand_from_source(
    source: NodeId,
    row: &Binding,
    direction: EdgeDirection,
    state: &mut ExpandState<'_, '_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    let mut seen = BTreeSet::new();
    match direction {
        EdgeDirection::Right => {
            if let Some(entry) = state.ctx.tx.snapshot().outgoing_edges(source) {
                for adjacent in entry.iter() {
                    maybe_emit(adjacent.edge_id, adjacent.neighbor, row, state)?;
                }
            }
        }
        EdgeDirection::Left => {
            if let Some(entry) = state.ctx.tx.snapshot().incoming_edges(source) {
                for adjacent in entry.iter() {
                    maybe_emit(adjacent.edge_id, adjacent.neighbor, row, state)?;
                }
            }
        }
        EdgeDirection::Undirected => {
            if let Some(entry) = state.ctx.tx.snapshot().outgoing_edges(source) {
                for adjacent in entry.iter() {
                    if seen.insert(adjacent.edge_id) {
                        maybe_emit(adjacent.edge_id, adjacent.neighbor, row, state)?;
                    }
                }
            }
            if let Some(entry) = state.ctx.tx.snapshot().incoming_edges(source) {
                for adjacent in entry.iter() {
                    if seen.insert(adjacent.edge_id) {
                        maybe_emit(adjacent.edge_id, adjacent.neighbor, row, state)?;
                    }
                }
            }
        }
    }
    Ok(())
}

fn expand_from_indexed_edges(
    child_rows: &[Binding],
    direction: EdgeDirection,
    state: &mut ExpandState<'_, '_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    let Some(edge_rows) = state.edge_row_filter.as_ref() else {
        return Ok(());
    };
    let mut rows_by_source: BTreeMap<NodeId, Vec<&Binding>> = BTreeMap::new();
    for row in child_rows {
        let Some(source) = pattern::node_at_index(
            row,
            state.source_index,
            "expand source binding is not a node",
        )?
        else {
            continue;
        };
        rows_by_source.entry(source).or_default().push(row);
    }
    let edge_rows: Vec<u32> = edge_rows.iter().collect();
    for row in edge_rows {
        let Some(edge_id) = state.ctx.tx.snapshot().edge_id_for_row(RowIndex::new(row)) else {
            continue;
        };
        let Some((source, target)) = state.ctx.tx.snapshot().edge_endpoints(edge_id) else {
            continue;
        };
        match direction {
            EdgeDirection::Right => {
                emit_indexed_edge_for_source(edge_id, target, source, &rows_by_source, state)?;
            }
            EdgeDirection::Left => {
                emit_indexed_edge_for_source(edge_id, source, target, &rows_by_source, state)?;
            }
            EdgeDirection::Undirected => {
                emit_indexed_edge_for_source(edge_id, target, source, &rows_by_source, state)?;
                if source != target {
                    emit_indexed_edge_for_source(edge_id, source, target, &rows_by_source, state)?;
                }
            }
        }
    }
    Ok(())
}

fn emit_indexed_edge_for_source(
    edge_id: EdgeId,
    right_node: NodeId,
    source: NodeId,
    rows_by_source: &BTreeMap<NodeId, Vec<&Binding>>,
    state: &mut ExpandState<'_, '_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    let Some(rows) = rows_by_source.get(&source) else {
        return Ok(());
    };
    for row in rows {
        maybe_emit(edge_id, right_node, row, state)?;
    }
    Ok(())
}

fn maybe_emit(
    edge_id: EdgeId,
    right_node: NodeId,
    row: &Binding,
    state: &mut ExpandState<'_, '_, '_, '_, '_, '_>,
) -> Result<(), ExecutorError> {
    if !edge_access::row_filter_matches(state.edge_row_filter.as_ref(), edge_id, state.ctx) {
        return Ok(());
    }
    if !edge_label_matches(state.edge, edge_id, state.ctx)
        || !right_node_matches(state.edge, right_node, state.ctx)
    {
        return Ok(());
    }

    let mut values = row.values().to_vec();
    values.resize(state.schema.columns.len(), Value::Null);
    if !state.edge_slot.set(&mut values, Value::EdgeRef(edge_id)) {
        return Ok(());
    }
    if !state
        .edge_hidden_slot
        .set(&mut values, Value::EdgeRef(edge_id))
    {
        return Ok(());
    }
    if !state
        .right_slot
        .set(&mut values, Value::NodeRef(right_node))
    {
        return Ok(());
    }
    if !state
        .right_hidden_slot
        .set(&mut values, Value::NodeRef(right_node))
    {
        return Ok(());
    }
    let candidate = Binding::new(values);
    if !predicates_pass(
        &state.edge.property_predicates,
        state.pattern_plan,
        &candidate,
        state.schema,
        &Value::EdgeRef(edge_id),
        state.ctx,
    )? {
        return Ok(());
    }
    if !predicates_pass(
        &state.edge.right_property_predicates,
        state.pattern_plan,
        &candidate,
        state.schema,
        &Value::NodeRef(right_node),
        state.ctx,
    )? {
        return Ok(());
    }
    state.output.push(candidate);
    Ok(())
}

fn edge_label_matches(edge: &EdgeMatch, edge_id: EdgeId, ctx: &EvalCtx<'_, '_, '_, '_>) -> bool {
    let Some(label_expr) = &edge.label_predicate else {
        return true;
    };
    ctx.tx
        .snapshot()
        .edge_label(edge_id)
        .is_some_and(|label| scan::label_matches_edge(label_expr, label.clone()))
}

fn right_node_matches(edge: &EdgeMatch, node: NodeId, ctx: &EvalCtx<'_, '_, '_, '_>) -> bool {
    let Some(label_expr) = &edge.right_label_predicate else {
        return true;
    };
    ctx.tx
        .snapshot()
        .node_labels(node)
        .is_some_and(|labels| scan::label_matches_node(label_expr, labels))
}

fn predicates_pass(
    predicates: &[crate::FilterPredicate],
    pattern_plan: &PatternPlan,
    row: &Binding,
    schema: &BindingTableSchema,
    entity: &Value,
    ctx: &EvalCtx<'_, '_, '_, '_>,
) -> Result<bool, ExecutorError> {
    for predicate in predicates {
        if !scan::predicate_passes(predicate, pattern_plan, row, schema, entity, ctx)? {
            return Ok(false);
        }
    }
    Ok(true)
}