1use std::collections::BTreeMap;
4
5use rustc_hash::FxHashSet;
6use selene_core::{DbString, NodeId, Value};
7
8use crate::{
9 AnalyzedType, BindingElement, BindingId, BindingTableColumn, BindingTableSchema,
10 FilterPredicate, GqlType, HiddenBindingId, JoinTree, PatternPlan, ScanKind, SubqueryRegistry,
11 analyze::ExprIdLookup,
12 runtime::{Binding, BindingTable, EvalCtx, ExecutorError, TxContext},
13};
14
15use super::{
16 evaluator, expand, hash_join, match_mode, outer, path_mode, questioned, scan, subplan,
17 value_compare, wco,
18};
19
20pub fn execute_pattern(
22 pattern: &PatternPlan,
23 ctx: &TxContext<'_, '_>,
24) -> Result<BindingTable, ExecutorError> {
25 let expr_ids = ExprIdLookup::default();
26 let subqueries = SubqueryRegistry::default();
27 let (expr_ids, subqueries) = ctx.plan_metadata().unwrap_or((&expr_ids, &subqueries));
28 let eval_ctx = EvalCtx {
29 tx: ctx,
30 expr_ids,
31 subqueries,
32 };
33 execute_pattern_with_seed(pattern, None, &eval_ctx)
34}
35
36pub(crate) fn execute_pattern_with_seed(
38 pattern: &PatternPlan,
39 seed: Option<&Binding>,
40 ctx: &EvalCtx<'_, '_, '_, '_>,
41) -> Result<BindingTable, ExecutorError> {
42 let schema = schema_for_pattern(pattern);
43 execute_pattern_with_seed_and_schema(pattern, seed, schema, ctx)
44}
45
46pub(crate) fn execute_pattern_with_seed_and_schema(
47 pattern: &PatternPlan,
48 seed: Option<&Binding>,
49 schema: BindingTableSchema,
50 ctx: &EvalCtx<'_, '_, '_, '_>,
51) -> Result<BindingTable, ExecutorError> {
52 execute_pattern_with_seed_schema_and_limit(pattern, seed, schema, ctx, None)
53}
54
55pub(crate) fn execute_pattern_with_seed_schema_and_limit(
56 pattern: &PatternPlan,
57 seed: Option<&Binding>,
58 schema: BindingTableSchema,
59 ctx: &EvalCtx<'_, '_, '_, '_>,
60 row_limit: Option<usize>,
61) -> Result<BindingTable, ExecutorError> {
62 let env = WalkContext {
63 pattern,
64 schema: &schema,
65 seed,
66 ctx,
67 };
68 let mut rows = Vec::new();
69 if row_limit == Some(0) {
70 return Ok(BindingTable::new(schema, rows));
71 }
72 let mut rows_since_check = 0;
73 for row in walk_join_tree(&pattern.join_tree, env)? {
74 ctx.tx.check_cancellation_stride(&mut rows_since_check, 1)?;
75 if pattern_filters_pass(pattern, &row, &schema, ctx)? {
76 rows.push(row);
77 if row_limit.is_some_and(|limit| rows.len() >= limit) {
78 break;
79 }
80 }
81 }
82 Ok(BindingTable::new(schema, rows))
83}
84
85#[derive(Clone, Copy)]
86pub(crate) struct WalkContext<'a, 'seed, 'eval, 'ctx, 'g, 'plan> {
87 pub(crate) pattern: &'a PatternPlan,
88 pub(crate) schema: &'a BindingTableSchema,
89 pub(crate) seed: Option<&'seed Binding>,
90 pub(crate) ctx: &'a EvalCtx<'eval, 'ctx, 'g, 'plan>,
91}
92
93pub(crate) fn walk_join_tree(
94 tree: &JoinTree,
95 env: WalkContext<'_, '_, '_, '_, '_, '_>,
96) -> Result<Vec<Binding>, ExecutorError> {
97 match tree {
98 JoinTree::Unit => Ok(vec![Binding::new(vec![
99 Value::Null;
100 env.schema.columns.len()
101 ])]),
102 JoinTree::Scan(scan_node) => {
103 scan::scan_pattern(scan_node, env.pattern, env.schema, env.seed, env.ctx)
104 }
105 JoinTree::Expand {
106 child,
107 edge,
108 direction,
109 } => expand::execute(child, edge, *direction, env),
110 JoinTree::Questioned {
111 child,
112 edge,
113 direction,
114 ..
115 } => questioned::execute(child, edge, *direction, env),
116 JoinTree::Repeat {
117 child,
118 edge,
119 direction,
120 min,
121 max,
122 path_mode,
123 } => super::repeat::execute(child, edge, *direction, *min, *max, *path_mode, env),
124 JoinTree::PathSearch {
125 selector,
126 child,
127 source_binding,
128 final_binding,
129 hop_contributors,
130 } => super::path_search::execute(
131 child,
132 *selector,
133 *source_binding,
134 *final_binding,
135 hop_contributors,
136 env,
137 ),
138 JoinTree::PathModeFilter {
139 path_mode,
140 child,
141 path_contributors,
142 } => path_mode::execute(child, *path_mode, path_contributors, env),
143 JoinTree::MatchModeFilter {
144 match_mode,
145 child,
146 path_contributors,
147 } => match_mode::execute(child, *match_mode, path_contributors, env),
148 JoinTree::HashJoin {
149 left,
150 right,
151 key,
152 build_side,
153 } => hash_join::execute(left, right, key, *build_side, env),
154 JoinTree::Outer {
155 left,
156 right,
157 key,
158 right_filters,
159 } => outer::execute(left, right, key, right_filters, env),
160 JoinTree::WorstCaseOptimal { intersection, .. } => wco::execute_phase_a(intersection, env),
161 JoinTree::Subplan(plan) => subplan::execute(plan, env.schema, env.seed, env.ctx),
162 JoinTree::DisjunctiveScan {
193 branches,
194 scan_anchor,
195 } => {
196 let anchor_index = scan_anchor
197 .binding
198 .and_then(|binding_id| binding_index(env.pattern, env.schema, binding_id))
199 .or_else(|| {
200 scan_anchor
201 .hidden_binding
202 .and_then(|hidden_id| hidden_index(env.schema, hidden_id))
203 })
204 .ok_or(ExecutorError::ImplementationDefined {
205 detail: "DisjunctiveScan anchor binding missing from pattern schema",
206 })?;
207 let mut seen: FxHashSet<NodeId> = FxHashSet::default();
208 let mut rows = Vec::new();
209 for branch in branches {
210 for binding in
211 scan::scan_pattern(branch, env.pattern, env.schema, env.seed, env.ctx)?
212 {
213 match binding.get(anchor_index) {
214 Some(Value::NodeRef(id)) => {
215 if seen.insert(*id) {
216 rows.push(binding);
217 }
218 }
219 _ => {
220 rows.push(binding);
225 }
226 }
227 }
228 }
229 Ok(rows)
230 }
231 }
232}
233
234pub(crate) fn schema_for_pattern(pattern: &PatternPlan) -> BindingTableSchema {
235 let mut columns = pattern
236 .bindings
237 .iter()
238 .filter(|binding| {
239 matches!(
240 binding.element,
241 BindingElement::Node | BindingElement::Edge | BindingElement::Path
242 )
243 })
244 .map(|binding| BindingTableColumn {
245 name: Some(binding.name.clone()),
246 hidden: None,
247 ty: binding.ty.clone(),
248 })
249 .collect::<Vec<_>>();
250 let mut hidden = BTreeMap::new();
251 collect_hidden_slots(&pattern.join_tree, &mut hidden);
252 columns.extend(hidden.into_iter().map(|(hidden, ty)| BindingTableColumn {
253 name: None,
254 hidden: Some(hidden),
255 ty,
256 }));
257 BindingTableSchema { columns }
258}
259
260fn collect_hidden_slots(tree: &JoinTree, slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>) {
261 match tree {
262 JoinTree::Unit => {}
263 JoinTree::Scan(scan) => {
264 insert_hidden(slots, scan.hidden_binding, scan.kind);
265 }
266 JoinTree::Expand { child, edge, .. } => {
267 collect_hidden_slots(child, slots);
268 insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
269 insert_hidden(slots, edge.hidden_binding, ScanKind::Edge);
270 insert_hidden(slots, edge.right_hidden_binding, ScanKind::Node);
271 }
272 JoinTree::Questioned { child, edge, .. } => {
273 collect_hidden_slots(child, slots);
274 insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
275 insert_hidden(slots, edge.hidden_binding, ScanKind::Edge);
276 insert_hidden(slots, edge.right_hidden_binding, ScanKind::Node);
277 }
278 JoinTree::Repeat { child, edge, .. } => {
279 collect_hidden_slots(child, slots);
280 insert_hidden(slots, edge.left_hidden_binding, ScanKind::Node);
281 insert_hidden_type(
282 slots,
283 edge.group_hidden_binding,
284 AnalyzedType::Resolved(GqlType::List(Box::new(GqlType::EdgeRef))),
285 );
286 insert_hidden(slots, edge.final_hidden_binding, ScanKind::Node);
287 }
288 JoinTree::PathSearch { child, .. }
289 | JoinTree::PathModeFilter { child, .. }
290 | JoinTree::MatchModeFilter { child, .. } => {
291 collect_hidden_slots(child, slots);
292 }
293 JoinTree::HashJoin { left, right, .. } | JoinTree::Outer { left, right, .. } => {
294 collect_hidden_slots(left, slots);
295 collect_hidden_slots(right, slots);
296 }
297 JoinTree::WorstCaseOptimal { intersection, .. } => {
298 for tree in intersection {
299 collect_hidden_slots(tree, slots);
300 }
301 }
302 JoinTree::Subplan(_) => {}
303 JoinTree::DisjunctiveScan { branches, .. } => {
304 for branch in branches {
305 insert_hidden(slots, branch.hidden_binding, branch.kind);
306 }
307 }
308 }
309}
310
311fn insert_hidden(
312 slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>,
313 hidden: Option<HiddenBindingId>,
314 kind: ScanKind,
315) {
316 let Some(hidden) = hidden else {
317 return;
318 };
319 slots.entry(hidden).or_insert_with(|| {
320 AnalyzedType::Resolved(match kind {
321 ScanKind::Node => GqlType::NodeRef,
322 ScanKind::Edge => GqlType::EdgeRef,
323 })
324 });
325}
326
327fn insert_hidden_type(
328 slots: &mut BTreeMap<HiddenBindingId, AnalyzedType>,
329 hidden: Option<HiddenBindingId>,
330 ty: AnalyzedType,
331) {
332 let Some(hidden) = hidden else {
333 return;
334 };
335 slots.entry(hidden).or_insert(ty);
336}
337
338pub(crate) fn binding_index(
339 pattern: &PatternPlan,
340 schema: &BindingTableSchema,
341 binding_id: BindingId,
342) -> Option<usize> {
343 let binding = pattern
344 .bindings
345 .iter()
346 .find(|candidate| candidate.binding == binding_id)?;
347 column_index(schema, &binding.name)
348}
349
350pub(crate) fn column_index(schema: &BindingTableSchema, name: &DbString) -> Option<usize> {
351 schema
352 .columns
353 .iter()
354 .position(|column| column.name.as_ref() == Some(name))
355}
356
357pub(crate) fn hidden_index(schema: &BindingTableSchema, hidden: HiddenBindingId) -> Option<usize> {
358 schema
359 .columns
360 .iter()
361 .position(|column| column.hidden == Some(hidden))
362}
363
364#[derive(Clone, Copy)]
365pub(crate) struct ColumnSlot {
366 index: Option<usize>,
367}
368
369impl ColumnSlot {
370 pub(crate) fn binding(
371 pattern: &PatternPlan,
372 schema: &BindingTableSchema,
373 binding_id: Option<BindingId>,
374 detail: &'static str,
375 ) -> Result<Self, ExecutorError> {
376 let Some(binding_id) = binding_id else {
377 return Ok(Self { index: None });
378 };
379 let Some(index) = binding_index(pattern, schema, binding_id) else {
380 return Err(ExecutorError::ImplementationDefined { detail });
381 };
382 Ok(Self { index: Some(index) })
383 }
384
385 pub(crate) fn hidden(
386 schema: &BindingTableSchema,
387 hidden: Option<HiddenBindingId>,
388 detail: &'static str,
389 ) -> Result<Self, ExecutorError> {
390 let Some(hidden) = hidden else {
391 return Ok(Self { index: None });
392 };
393 let Some(index) = hidden_index(schema, hidden) else {
394 return Err(ExecutorError::ImplementationDefined { detail });
395 };
396 Ok(Self { index: Some(index) })
397 }
398
399 pub(crate) fn set(self, values: &mut [Value], value: Value) -> bool {
400 let Some(index) = self.index else {
401 return true;
402 };
403 if !matches!(values[index], Value::Null)
404 && !value_compare::equal_non_null(&values[index], &value)
405 {
406 return false;
407 }
408 values[index] = value;
409 true
410 }
411
412 pub(crate) fn index(self) -> Option<usize> {
413 self.index
414 }
415}
416
417pub(crate) fn source_index(
418 pattern: &PatternPlan,
419 schema: &BindingTableSchema,
420 binding: Option<BindingId>,
421 hidden: Option<HiddenBindingId>,
422 detail: &'static str,
423) -> Result<usize, ExecutorError> {
424 if let Some(binding) = binding {
425 return binding_index(pattern, schema, binding)
426 .ok_or(ExecutorError::ImplementationDefined { detail });
427 }
428 if let Some(hidden) = hidden {
429 return hidden_index(schema, hidden).ok_or(ExecutorError::ImplementationDefined { detail });
430 }
431 Err(ExecutorError::ImplementationDefined { detail })
432}
433
434pub(crate) fn node_at_index(
435 row: &Binding,
436 index: usize,
437 wrong_type_detail: &'static str,
438) -> Result<Option<NodeId>, ExecutorError> {
439 match row.get(index).cloned().unwrap_or(Value::Null) {
440 Value::NodeRef(id) => Ok(Some(id)),
441 Value::Null => Ok(None),
442 _ => Err(ExecutorError::ImplementationDefined {
443 detail: wrong_type_detail,
444 }),
445 }
446}
447
448pub(crate) fn merge_rows(left: &Binding, right: &Binding, schema: &BindingTableSchema) -> Binding {
449 let mut values = Vec::with_capacity(schema.columns.len());
450 for index in 0..schema.columns.len() {
451 let left_value = left.get(index).cloned().unwrap_or(Value::Null);
452 if matches!(left_value, Value::Null) {
454 values.push(right.get(index).cloned().unwrap_or(Value::Null));
455 } else {
456 values.push(left_value);
457 }
458 }
459 Binding::new(values)
460}
461
462pub(crate) fn resolve_key(
463 schema: &BindingTableSchema,
464 key: &[DbString],
465) -> Result<Vec<usize>, ExecutorError> {
466 let mut indexes = Vec::with_capacity(key.len());
467 for name in key {
468 let Some(index) = column_index(schema, name) else {
469 return Err(ExecutorError::ImplementationDefined {
470 detail: "join key column missing from pattern schema",
471 });
472 };
473 indexes.push(index);
474 }
475 Ok(indexes)
476}
477
478pub(crate) fn key_values_at(row: &Binding, indexes: &[usize]) -> Option<Vec<Value>> {
479 let mut values = Vec::with_capacity(indexes.len());
480 for index in indexes {
481 let value = row.get(*index).cloned().unwrap_or(Value::Null);
482 if matches!(value, Value::Null) {
483 return None;
484 }
485 values.push(value);
486 }
487 Some(values)
488}
489
490pub(crate) fn key_values_equal(lhs: &[Value], rhs: &[Value]) -> bool {
491 lhs.len() == rhs.len()
492 && lhs
493 .iter()
494 .zip(rhs)
495 .all(|(lhs, rhs)| value_compare::equal_non_null(lhs, rhs))
496}
497
498pub(crate) fn rows_match_on_resolved_key(
499 left: &Binding,
500 right: &Binding,
501 indexes: &[usize],
502) -> bool {
503 let Some(left_key) = key_values_at(left, indexes) else {
504 return false;
505 };
506 let Some(right_key) = key_values_at(right, indexes) else {
507 return false;
508 };
509 key_values_equal(&left_key, &right_key)
510}
511
512pub(crate) fn resolve_projection(
513 source_schema: &BindingTableSchema,
514 target_schema: &BindingTableSchema,
515) -> Vec<Option<usize>> {
516 target_schema
517 .columns
518 .iter()
519 .map(|target_column| {
520 target_column
521 .name
522 .as_ref()
523 .and_then(|name| column_index(source_schema, name))
524 })
525 .collect()
526}
527
528pub(crate) fn project_row_with_projection(
529 row: &Binding,
530 target_schema: &BindingTableSchema,
531 projection: &[Option<usize>],
532 seed: Option<&Binding>,
533) -> Binding {
534 let mut values = seed
535 .map(|row| row.values().to_vec())
536 .unwrap_or_else(|| vec![Value::Null; target_schema.columns.len()]);
537 values.resize(target_schema.columns.len(), Value::Null);
538 for (target_index, source_index) in projection.iter().enumerate() {
539 let Some(source_index) = source_index else {
540 continue;
541 };
542 values[target_index] = row.get(*source_index).cloned().unwrap_or(Value::Null);
543 }
544 Binding::new(values)
545}
546
547fn pattern_filters_pass(
548 pattern: &PatternPlan,
549 row: &Binding,
550 schema: &BindingTableSchema,
551 ctx: &EvalCtx<'_, '_, '_, '_>,
552) -> Result<bool, ExecutorError> {
553 filter_predicates_pass(&pattern.filters, pattern, row, schema, ctx)
554}
555
556pub(crate) fn filter_predicates_pass(
557 predicates: &[FilterPredicate],
558 pattern: &PatternPlan,
559 row: &Binding,
560 schema: &BindingTableSchema,
561 ctx: &EvalCtx<'_, '_, '_, '_>,
562) -> Result<bool, ExecutorError> {
563 for predicate in predicates {
564 if !filter_predicate_passes(predicate, pattern, row, schema, ctx)? {
565 return Ok(false);
566 }
567 }
568 Ok(true)
569}
570
571fn filter_predicate_passes(
572 predicate: &FilterPredicate,
573 pattern: &PatternPlan,
574 row: &Binding,
575 schema: &BindingTableSchema,
576 ctx: &EvalCtx<'_, '_, '_, '_>,
577) -> Result<bool, ExecutorError> {
578 if predicate.index_consumed {
579 return Ok(true);
580 }
581 match predicate.kind {
582 crate::FilterPredicateKind::Expression => {
583 let value = evaluator::evaluate(&predicate.expr, row, schema, ctx)?;
584 Ok(matches!(value, Value::Bool(true)))
585 }
586 crate::FilterPredicateKind::PropertyEquals { .. } => {
587 scan::predicate_passes(predicate, pattern, row, schema, &Value::Null, ctx)
588 }
589 }
590}