Skip to main content

selene_gql/runtime/
pattern.rs

1//! Pattern join-tree executor.
2
3use std::collections::BTreeMap;
4
5use rustc_hash::FxHashSet;
6use selene_core::{DbString, NodeId, Value};
7
8use crate::{
9    AnalyzedType, BindingElement, BindingId, BindingTableColumn, BindingTableSchema,
10    FilterPredicate, GqlType, HiddenBindingId, JoinTree, PatternPlan, ScanKind, SubqueryRegistry,
11    analyze::ExprIdLookup,
12    runtime::{Binding, BindingTable, EvalCtx, ExecutorError, TxContext},
13};
14
15use super::{
16    evaluator, expand, hash_join, match_mode, outer, path_mode, questioned, scan, subplan,
17    value_compare, wco,
18};
19
20/// Execute a pattern plan and produce its initial binding table.
21pub fn execute_pattern(
22    pattern: &PatternPlan,
23    ctx: &TxContext<'_, '_>,
24) -> Result<BindingTable, ExecutorError> {
25    let expr_ids = ExprIdLookup::default();
26    let subqueries = SubqueryRegistry::default();
27    let (expr_ids, subqueries) = ctx.plan_metadata().unwrap_or((&expr_ids, &subqueries));
28    let eval_ctx = EvalCtx {
29        tx: ctx,
30        expr_ids,
31        subqueries,
32    };
33    execute_pattern_with_seed(pattern, None, &eval_ctx)
34}
35
36/// Execute a pattern plan with an optional row seed for correlated subqueries.
37pub(crate) fn execute_pattern_with_seed(
38    pattern: &PatternPlan,
39    seed: Option<&Binding>,
40    ctx: &EvalCtx<'_, '_, '_, '_>,
41) -> Result<BindingTable, ExecutorError> {
42    let schema = schema_for_pattern(pattern);
43    execute_pattern_with_seed_and_schema(pattern, seed, schema, ctx)
44}
45
46pub(crate) fn execute_pattern_with_seed_and_schema(
47    pattern: &PatternPlan,
48    seed: Option<&Binding>,
49    schema: BindingTableSchema,
50    ctx: &EvalCtx<'_, '_, '_, '_>,
51) -> Result<BindingTable, ExecutorError> {
52    execute_pattern_with_seed_schema_and_limit(pattern, seed, schema, ctx, None)
53}
54
55pub(crate) fn execute_pattern_with_seed_schema_and_limit(
56    pattern: &PatternPlan,
57    seed: Option<&Binding>,
58    schema: BindingTableSchema,
59    ctx: &EvalCtx<'_, '_, '_, '_>,
60    row_limit: Option<usize>,
61) -> Result<BindingTable, ExecutorError> {
62    let env = WalkContext {
63        pattern,
64        schema: &schema,
65        seed,
66        ctx,
67    };
68    let mut rows = Vec::new();
69    if row_limit == Some(0) {
70        return Ok(BindingTable::new(schema, rows));
71    }
72    let mut rows_since_check = 0;
73    for row in walk_join_tree(&pattern.join_tree, env)? {
74        ctx.tx.check_cancellation_stride(&mut rows_since_check, 1)?;
75        if pattern_filters_pass(pattern, &row, &schema, ctx)? {
76            rows.push(row);
77            if row_limit.is_some_and(|limit| rows.len() >= limit) {
78                break;
79            }
80        }
81    }
82    Ok(BindingTable::new(schema, rows))
83}
84
85#[derive(Clone, Copy)]
86pub(crate) struct WalkContext<'a, 'seed, 'eval, 'ctx, 'g, 'plan> {
87    pub(crate) pattern: &'a PatternPlan,
88    pub(crate) schema: &'a BindingTableSchema,
89    pub(crate) seed: Option<&'seed Binding>,
90    pub(crate) ctx: &'a EvalCtx<'eval, 'ctx, 'g, 'plan>,
91}
92
93pub(crate) fn walk_join_tree(
94    tree: &JoinTree,
95    env: WalkContext<'_, '_, '_, '_, '_, '_>,
96) -> Result<Vec<Binding>, ExecutorError> {
97    match tree {
98        JoinTree::Unit => Ok(vec![Binding::new(vec![
99            Value::Null;
100            env.schema.columns.len()
101        ])]),
102        JoinTree::Scan(scan_node) => {
103            scan::scan_pattern(scan_node, env.pattern, env.schema, env.seed, env.ctx)
104        }
105        JoinTree::Expand {
106            child,
107            edge,
108            direction,
109        } => expand::execute(child, edge, *direction, env),
110        JoinTree::Questioned {
111            child,
112            edge,
113            direction,
114            ..
115        } => questioned::execute(child, edge, *direction, env),
116        JoinTree::Repeat {
117            child,
118            edge,
119            direction,
120            min,
121            max,
122            path_mode,
123        } => super::repeat::execute(child, edge, *direction, *min, *max, *path_mode, env),
124        JoinTree::PathSearch {
125            selector,
126            child,
127            source_binding,
128            final_binding,
129            hop_contributors,
130        } => super::path_search::execute(
131            child,
132            *selector,
133            *source_binding,
134            *final_binding,
135            hop_contributors,
136            env,
137        ),
138        JoinTree::PathModeFilter {
139            path_mode,
140            child,
141            path_contributors,
142        } => path_mode::execute(child, *path_mode, path_contributors, env),
143        JoinTree::MatchModeFilter {
144            match_mode,
145            child,
146            path_contributors,
147        } => match_mode::execute(child, *match_mode, path_contributors, env),
148        JoinTree::HashJoin {
149            left,
150            right,
151            key,
152            build_side,
153        } => hash_join::execute(left, right, key, *build_side, env),
154        JoinTree::Outer {
155            left,
156            right,
157            key,
158            right_filters,
159        } => outer::execute(left, right, key, right_filters, env),
160        JoinTree::WorstCaseOptimal { intersection, .. } => wco::execute_phase_a(intersection, env),
161        JoinTree::Subplan(plan) => subplan::execute(plan, env.schema, env.seed, env.ctx),
162        // Iterate each per-label branch and dedup the resulting `Binding`
163        // rows by the scan anchor's `NodeId`. A node carrying labels A AND B
164        // would otherwise appear in both branch-A and branch-B candidate
165        // rows (a per-branch `UNION ALL` shape), which would change query
166        // semantics observably (COUNT, LIMIT, aggregates) based on whether
167        // the disjunctive-label-expansion rule fired vs the unexpanded
168        // `LabelExpr::Disjunction(any(...))` filter that visits each node
169        // once. The dedup at JoinTree-level restores the
170        // catalog-present-vs-catalog-absent invariant: same query + same
171        // data => same rows out, regardless of which optimizer rule slot
172        // fired (BRIEF-155 PR #177 Codex C1).
173        //
174        // Dedup happens here, before downstream pipeline ops (LIMIT, ORDER
175        // BY, GROUP BY, Distinct), so the union-then-dedup'd binding table
176        // is what those ops see — preserving the option-b architecture
177        // advantage (single union point, no per-branch pipeline fan-out).
178        //
179        // `scan_anchor` carries the original disjunctive `label_predicate`
180        // for EXPLAIN diagnostics and IR round-trips; at execute time we
181        // also use its binding / hidden-binding IDs to locate the column
182        // that holds the per-row `Value::NodeRef(NodeId)` for dedup. All
183        // branches inherit those IDs from `scan_anchor` (constructed by
184        // `plan/optimize/rules/disjunctive_label_expansion.rs`), so all
185        // branches write into the same column.
186        //
187        // The rule is gated to `ScanKind::Node` (see
188        // `disjunctive_label_expansion::maybe_expand_scan`), so the anchor
189        // column always carries `Value::NodeRef` (per
190        // `runtime/scan::entity_value`). The non-`NodeRef` arm is
191        // defensive and currently unreachable.
192        JoinTree::DisjunctiveScan {
193            branches,
194            scan_anchor,
195        } => {
196            let anchor_index = scan_anchor
197                .binding
198                .and_then(|binding_id| binding_index(env.pattern, env.schema, binding_id))
199                .or_else(|| {
200                    scan_anchor
201                        .hidden_binding
202                        .and_then(|hidden_id| hidden_index(env.schema, hidden_id))
203                })
204                .ok_or(ExecutorError::ImplementationDefined {
205                    detail: "DisjunctiveScan anchor binding missing from pattern schema",
206                })?;
207            let mut seen: FxHashSet<NodeId> = FxHashSet::default();
208            let mut rows = Vec::new();
209            for branch in branches {
210                for binding in
211                    scan::scan_pattern(branch, env.pattern, env.schema, env.seed, env.ctx)?
212                {
213                    match binding.get(anchor_index) {
214                        Some(Value::NodeRef(id)) => {
215                            if seen.insert(*id) {
216                                rows.push(binding);
217                            }
218                        }
219                        _ => {
220                            // Defensive: rule is gated to ScanKind::Node, so
221                            // this arm is unreachable. Preserve the row
222                            // rather than silently drop it on the impossible
223                            // variant.
224                            rows.push(binding);
225                        }
226                    }
227                }
228            }
229            Ok(rows)
230        }
231    }
232}
233
234pub(crate) fn schema_for_pattern(pattern: &PatternPlan) -> BindingTableSchema {
235    let mut columns = pattern
236        .bindings
237        .iter()
238        .filter(|binding| {
239            matches!(
240                binding.element,
241                BindingElement::Node | BindingElement::Edge | BindingElement::Path
242            )
243        })
244        .map(|binding| BindingTableColumn {
245            name: Some(binding.name.clone()),
246            hidden: None,
247            ty: binding.ty.clone(),
248        })
249        .collect::<Vec<_>>();
250    let mut hidden = BTreeMap::new();
251    collect_hidden_slots(&pattern.join_tree, &mut hidden);
252    columns.extend(hidden.into_iter().map(|(hidden, ty)| BindingTableColumn {
253        name: None,
254        hidden: Some(hidden),
255        ty,
256    }));
257    BindingTableSchema { columns }
258}
259
260fn collect_hidden_slots(tree: &JoinTree, slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>) {
261    match tree {
262        JoinTree::Unit => {}
263        JoinTree::Scan(scan) => {
264            insert_hidden(slots, scan.hidden_binding, scan.kind);
265        }
266        JoinTree::Expand { child, edge, .. } => {
267            collect_hidden_slots(child, slots);
268            insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
269            insert_hidden(slots, edge.hidden_binding, ScanKind::Edge);
270            insert_hidden(slots, edge.right_hidden_binding, ScanKind::Node);
271        }
272        JoinTree::Questioned { child, edge, .. } => {
273            collect_hidden_slots(child, slots);
274            insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
275            insert_hidden(slots, edge.hidden_binding, ScanKind::Edge);
276            insert_hidden(slots, edge.right_hidden_binding, ScanKind::Node);
277        }
278        JoinTree::Repeat { child, edge, .. } => {
279            collect_hidden_slots(child, slots);
280            insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
281            insert_hidden_type(
282                slots,
283                edge.group_hidden_binding,
284                AnalyzedType::Resolved(GqlType::List(Box::new(GqlType::EdgeRef))),
285            );
286            insert_hidden(slots, edge.final_hidden_binding, ScanKind::Node);
287        }
288        JoinTree::PathSearch { child, .. }
289        | JoinTree::PathModeFilter { child, .. }
290        | JoinTree::MatchModeFilter { child, .. } => {
291            collect_hidden_slots(child, slots);
292        }
293        JoinTree::HashJoin { left, right, .. } | JoinTree::Outer { left, right, .. } => {
294            collect_hidden_slots(left, slots);
295            collect_hidden_slots(right, slots);
296        }
297        JoinTree::WorstCaseOptimal { intersection, .. } => {
298            for tree in intersection {
299                collect_hidden_slots(tree, slots);
300            }
301        }
302        JoinTree::Subplan(_) => {}
303        JoinTree::DisjunctiveScan { branches, .. } => {
304            for branch in branches {
305                insert_hidden(slots, branch.hidden_binding, branch.kind);
306            }
307        }
308    }
309}
310
311fn insert_hidden(
312    slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>,
313    hidden: Option<HiddenBindingId>,
314    kind: ScanKind,
315) {
316    let Some(hidden) = hidden else {
317        return;
318    };
319    slots.entry(hidden).or_insert_with(|| {
320        AnalyzedType::Resolved(match kind {
321            ScanKind::Node => GqlType::NodeRef,
322            ScanKind::Edge => GqlType::EdgeRef,
323        })
324    });
325}
326
327fn insert_hidden_type(
328    slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>,
329    hidden: Option<HiddenBindingId>,
330    ty: AnalyzedType,
331) {
332    let Some(hidden) = hidden else {
333        return;
334    };
335    slots.entry(hidden).or_insert(ty);
336}
337
338pub(crate) fn binding_index(
339    pattern: &PatternPlan,
340    schema: &BindingTableSchema,
341    binding_id: BindingId,
342) -> Option<usize> {
343    let binding = pattern
344        .bindings
345        .iter()
346        .find(|candidate| candidate.binding == binding_id)?;
347    column_index(schema, &binding.name)
348}
349
350pub(crate) fn column_index(schema: &BindingTableSchema, name: &DbString) -> Option<usize> {
351    schema
352        .columns
353        .iter()
354        .position(|column| column.name.as_ref() == Some(name))
355}
356
357pub(crate) fn hidden_index(schema: &BindingTableSchema, hidden: HiddenBindingId) -> Option<usize> {
358    schema
359        .columns
360        .iter()
361        .position(|column| column.hidden == Some(hidden))
362}
363
364#[derive(Clone, Copy)]
365pub(crate) struct ColumnSlot {
366    index: Option<usize>,
367}
368
369impl ColumnSlot {
370    pub(crate) fn binding(
371        pattern: &PatternPlan,
372        schema: &BindingTableSchema,
373        binding_id: Option<BindingId>,
374        detail: &'static str,
375    ) -> Result<Self, ExecutorError> {
376        let Some(binding_id) = binding_id else {
377            return Ok(Self { index: None });
378        };
379        let Some(index) = binding_index(pattern, schema, binding_id) else {
380            return Err(ExecutorError::ImplementationDefined { detail });
381        };
382        Ok(Self { index: Some(index) })
383    }
384
385    pub(crate) fn hidden(
386        schema: &BindingTableSchema,
387        hidden: Option<HiddenBindingId>,
388        detail: &'static str,
389    ) -> Result<Self, ExecutorError> {
390        let Some(hidden) = hidden else {
391            return Ok(Self { index: None });
392        };
393        let Some(index) = hidden_index(schema, hidden) else {
394            return Err(ExecutorError::ImplementationDefined { detail });
395        };
396        Ok(Self { index: Some(index) })
397    }
398
399    pub(crate) fn set(self, values: &mut [Value], value: Value) -> bool {
400        let Some(index) = self.index else {
401            return true;
402        };
403        if !matches!(values[index], Value::Null)
404            && !value_compare::equal_non_null(&values[index], &value)
405        {
406            return false;
407        }
408        values[index] = value;
409        true
410    }
411
412    pub(crate) fn index(self) -> Option<usize> {
413        self.index
414    }
415}
416
417pub(crate) fn source_index(
418    pattern: &PatternPlan,
419    schema: &BindingTableSchema,
420    binding: Option<BindingId>,
421    hidden: Option<HiddenBindingId>,
422    detail: &'static str,
423) -> Result<usize, ExecutorError> {
424    if let Some(binding) = binding {
425        return binding_index(pattern, schema, binding)
426            .ok_or(ExecutorError::ImplementationDefined { detail });
427    }
428    if let Some(hidden) = hidden {
429        return hidden_index(schema, hidden).ok_or(ExecutorError::ImplementationDefined { detail });
430    }
431    Err(ExecutorError::ImplementationDefined { detail })
432}
433
434pub(crate) fn node_at_index(
435    row: &Binding,
436    index: usize,
437    wrong_type_detail: &'static str,
438) -> Result<Option<NodeId>, ExecutorError> {
439    match row.get(index).cloned().unwrap_or(Value::Null) {
440        Value::NodeRef(id) => Ok(Some(id)),
441        Value::Null => Ok(None),
442        _ => Err(ExecutorError::ImplementationDefined {
443            detail: wrong_type_detail,
444        }),
445    }
446}
447
448pub(crate) fn merge_rows(left: &Binding, right: &Binding, schema: &BindingTableSchema) -> Binding {
449    let mut values = Vec::with_capacity(schema.columns.len());
450    for index in 0..schema.columns.len() {
451        let left_value = left.get(index).cloned().unwrap_or(Value::Null);
452        // Null is the row-local unbound sentinel; prefer the bound side.
453        if matches!(left_value, Value::Null) {
454            values.push(right.get(index).cloned().unwrap_or(Value::Null));
455        } else {
456            values.push(left_value);
457        }
458    }
459    Binding::new(values)
460}
461
462pub(crate) fn resolve_key(
463    schema: &BindingTableSchema,
464    key: &[DbString],
465) -> Result<Vec<usize>, ExecutorError> {
466    let mut indexes = Vec::with_capacity(key.len());
467    for name in key {
468        let Some(index) = column_index(schema, name) else {
469            return Err(ExecutorError::ImplementationDefined {
470                detail: "join key column missing from pattern schema",
471            });
472        };
473        indexes.push(index);
474    }
475    Ok(indexes)
476}
477
478pub(crate) fn key_values_at(row: &Binding, indexes: &[usize]) -> Option<Vec<Value>> {
479    let mut values = Vec::with_capacity(indexes.len());
480    for index in indexes {
481        let value = row.get(*index).cloned().unwrap_or(Value::Null);
482        if matches!(value, Value::Null) {
483            return None;
484        }
485        values.push(value);
486    }
487    Some(values)
488}
489
490pub(crate) fn key_values_equal(lhs: &[Value], rhs: &[Value]) -> bool {
491    lhs.len() == rhs.len()
492        && lhs
493            .iter()
494            .zip(rhs)
495            .all(|(lhs, rhs)| value_compare::equal_non_null(lhs, rhs))
496}
497
498pub(crate) fn rows_match_on_resolved_key(
499    left: &Binding,
500    right: &Binding,
501    indexes: &[usize],
502) -> bool {
503    let Some(left_key) = key_values_at(left, indexes) else {
504        return false;
505    };
506    let Some(right_key) = key_values_at(right, indexes) else {
507        return false;
508    };
509    key_values_equal(&left_key, &right_key)
510}
511
512pub(crate) fn resolve_projection(
513    source_schema: &BindingTableSchema,
514    target_schema: &BindingTableSchema,
515) -> Vec<Option<usize>> {
516    target_schema
517        .columns
518        .iter()
519        .map(|target_column| {
520            target_column
521                .name
522                .as_ref()
523                .and_then(|name| column_index(source_schema, name))
524        })
525        .collect()
526}
527
528pub(crate) fn project_row_with_projection(
529    row: &Binding,
530    target_schema: &BindingTableSchema,
531    projection: &[Option<usize>],
532    seed: Option<&Binding>,
533) -> Binding {
534    let mut values = seed
535        .map(|row| row.values().to_vec())
536        .unwrap_or_else(|| vec![Value::Null; target_schema.columns.len()]);
537    values.resize(target_schema.columns.len(), Value::Null);
538    for (target_index, source_index) in projection.iter().enumerate() {
539        let Some(source_index) = source_index else {
540            continue;
541        };
542        values[target_index] = row.get(*source_index).cloned().unwrap_or(Value::Null);
543    }
544    Binding::new(values)
545}
546
547fn pattern_filters_pass(
548    pattern: &PatternPlan,
549    row: &Binding,
550    schema: &BindingTableSchema,
551    ctx: &EvalCtx<'_, '_, '_, '_>,
552) -> Result<bool, ExecutorError> {
553    filter_predicates_pass(&pattern.filters, pattern, row, schema, ctx)
554}
555
556pub(crate) fn filter_predicates_pass(
557    predicates: &[FilterPredicate],
558    pattern: &PatternPlan,
559    row: &Binding,
560    schema: &BindingTableSchema,
561    ctx: &EvalCtx<'_, '_, '_, '_>,
562) -> Result<bool, ExecutorError> {
563    for predicate in predicates {
564        if !filter_predicate_passes(predicate, pattern, row, schema, ctx)? {
565            return Ok(false);
566        }
567    }
568    Ok(true)
569}
570
571fn filter_predicate_passes(
572    predicate: &FilterPredicate,
573    pattern: &PatternPlan,
574    row: &Binding,
575    schema: &BindingTableSchema,
576    ctx: &EvalCtx<'_, '_, '_, '_>,
577) -> Result<bool, ExecutorError> {
578    if predicate.index_consumed {
579        return Ok(true);
580    }
581    match predicate.kind {
582        crate::FilterPredicateKind::Expression => {
583            let value = evaluator::evaluate(&predicate.expr, row, schema, ctx)?;
584            Ok(matches!(value, Value::Bool(true)))
585        }
586        crate::FilterPredicateKind::PropertyEquals { .. } => {
587            scan::predicate_passes(predicate, pattern, row, schema, &Value::Null, ctx)
588        }
589    }
590}