1use crate::query::pushdown::{PredicateAnalyzer, try_label_or_to_union, try_type_or_to_union};
5use anyhow::{Result, anyhow};
6use arrow_array::RecordBatch;
7use arrow_schema::{DataType, SchemaRef};
8use parking_lot::RwLock;
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use uni_common::Value;
12use uni_common::core::schema::{
13 DistanceMetric, EmbeddingConfig, FullTextIndexConfig, IndexDefinition, JsonFtsIndexConfig,
14 ScalarIndexConfig, ScalarIndexType, Schema, TokenizerConfig, VectorIndexConfig,
15 VectorIndexType,
16};
17use uni_cypher::ast::{
18 AlterEdgeType, AlterLabel, BinaryOp, CallKind, Clause, CreateConstraint, CreateEdgeType,
19 CreateLabel, CypherLiteral, Direction, DropConstraint, DropEdgeType, DropLabel, Expr,
20 MatchClause, MergeClause, NodePattern, PathPattern, Pattern, PatternElement, Query,
21 RelationshipPattern, RemoveItem, ReturnClause, ReturnItem, SchemaCommand, SetClause, SetItem,
22 ShortestPathMode, ShowConstraints, SortItem, Statement, WindowSpec, WithClause,
23 WithRecursiveClause,
24};
25
26pub(crate) const STRUCT_ONLY_SENTINEL: &str = "__set_struct__";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum VariableType {
47 Node,
49 Edge,
51 Path,
53 Scalar,
56 ScalarLiteral,
59 Imported,
62}
63
64impl VariableType {
65 fn is_compatible_with(self, expected: VariableType) -> bool {
69 self == expected
70 || self == VariableType::Imported
71 || (self == VariableType::ScalarLiteral && expected == VariableType::Scalar)
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct VariableInfo {
79 pub name: String,
81 pub var_type: VariableType,
83 pub is_vlp: bool,
87}
88
89impl VariableInfo {
90 pub fn new(name: String, var_type: VariableType) -> Self {
91 Self {
92 name,
93 var_type,
94 is_vlp: false,
95 }
96 }
97}
98
99fn find_var_in_scope<'a>(vars: &'a [VariableInfo], name: &str) -> Option<&'a VariableInfo> {
101 vars.iter().find(|v| v.name == name)
102}
103
104fn is_var_in_scope(vars: &[VariableInfo], name: &str) -> bool {
106 find_var_in_scope(vars, name).is_some()
107}
108
109fn contains_pattern_predicate(expr: &Expr) -> bool {
111 if matches!(
112 expr,
113 Expr::Exists {
114 from_pattern_predicate: true,
115 ..
116 }
117 ) {
118 return true;
119 }
120 let mut found = false;
121 expr.for_each_child(&mut |child| {
122 if !found {
123 found = contains_pattern_predicate(child);
124 }
125 });
126 found
127}
128
129fn add_var_to_scope(
132 vars: &mut Vec<VariableInfo>,
133 name: &str,
134 var_type: VariableType,
135) -> Result<()> {
136 if name.is_empty() {
137 return Ok(());
138 }
139
140 if let Some(existing) = vars.iter_mut().find(|v| v.name == name) {
141 if existing.var_type == VariableType::Imported {
142 existing.var_type = var_type;
144 } else if var_type == VariableType::Imported || existing.var_type == var_type {
145 } else if matches!(
147 existing.var_type,
148 VariableType::Scalar | VariableType::ScalarLiteral
149 ) && matches!(var_type, VariableType::Node | VariableType::Edge)
150 {
151 existing.var_type = var_type;
154 } else {
155 return Err(anyhow!(
156 "SyntaxError: VariableTypeConflict - Variable '{}' already defined as {:?}, cannot use as {:?}",
157 name,
158 existing.var_type,
159 var_type
160 ));
161 }
162 } else {
163 vars.push(VariableInfo::new(name.to_string(), var_type));
164 }
165 Ok(())
166}
167
168fn vars_to_strings(vars: &[VariableInfo]) -> Vec<String> {
170 vars.iter().map(|v| v.name.clone()).collect()
171}
172
173fn infer_with_output_type(expr: &Expr, vars_in_scope: &[VariableInfo]) -> VariableType {
174 match expr {
175 Expr::Variable(v) => find_var_in_scope(vars_in_scope, v)
176 .map(|info| info.var_type)
177 .unwrap_or(VariableType::Scalar),
178 Expr::Literal(CypherLiteral::Null) => VariableType::Imported,
179 Expr::Literal(CypherLiteral::Integer(_))
181 | Expr::Literal(CypherLiteral::Float(_))
182 | Expr::Literal(CypherLiteral::String(_))
183 | Expr::Literal(CypherLiteral::Bool(_))
184 | Expr::Literal(CypherLiteral::Bytes(_)) => VariableType::ScalarLiteral,
185 Expr::FunctionCall { name, args, .. } => {
186 let lower = name.to_lowercase();
187 if lower == "coalesce" {
188 infer_coalesce_type(args, vars_in_scope)
189 } else if lower == "collect" && !args.is_empty() {
190 let collected = infer_with_output_type(&args[0], vars_in_scope);
191 if matches!(
192 collected,
193 VariableType::Node
194 | VariableType::Edge
195 | VariableType::Path
196 | VariableType::Imported
197 ) {
198 collected
199 } else {
200 VariableType::Scalar
201 }
202 } else {
203 VariableType::Scalar
204 }
205 }
206 Expr::List(_) => VariableType::ScalarLiteral,
211 _ => VariableType::Scalar,
212 }
213}
214
215fn infer_coalesce_type(args: &[Expr], vars_in_scope: &[VariableInfo]) -> VariableType {
216 let mut resolved: Option<VariableType> = None;
217 let mut saw_imported = false;
218 for arg in args {
219 let t = infer_with_output_type(arg, vars_in_scope);
220 match t {
221 VariableType::Node | VariableType::Edge | VariableType::Path => {
222 if let Some(existing) = resolved {
223 if existing != t {
224 return VariableType::Scalar;
225 }
226 } else {
227 resolved = Some(t);
228 }
229 }
230 VariableType::Imported => saw_imported = true,
231 VariableType::Scalar | VariableType::ScalarLiteral => {}
232 }
233 }
234 if let Some(t) = resolved {
235 t
236 } else if saw_imported {
237 VariableType::Imported
238 } else {
239 VariableType::Scalar
240 }
241}
242
243fn infer_unwind_output_type(expr: &Expr, vars_in_scope: &[VariableInfo]) -> VariableType {
244 match expr {
245 Expr::Variable(v) => find_var_in_scope(vars_in_scope, v)
246 .map(|info| info.var_type)
247 .unwrap_or(VariableType::Scalar),
248 Expr::FunctionCall { name, args, .. }
249 if name.eq_ignore_ascii_case("collect") && !args.is_empty() =>
250 {
251 infer_with_output_type(&args[0], vars_in_scope)
252 }
253 Expr::List(items) => {
254 let mut inferred: Option<VariableType> = None;
255 for item in items {
256 let t = infer_with_output_type(item, vars_in_scope);
257 if !matches!(
258 t,
259 VariableType::Node
260 | VariableType::Edge
261 | VariableType::Path
262 | VariableType::Imported
263 ) {
264 return VariableType::Scalar;
265 }
266 if let Some(existing) = inferred {
267 if existing != t
268 && t != VariableType::Imported
269 && existing != VariableType::Imported
270 {
271 return VariableType::Scalar;
272 }
273 if existing == VariableType::Imported && t != VariableType::Imported {
274 inferred = Some(t);
275 }
276 } else {
277 inferred = Some(t);
278 }
279 }
280 inferred.unwrap_or(VariableType::Scalar)
281 }
282 _ => VariableType::Scalar,
283 }
284}
285
286fn collect_expr_variables(expr: &Expr) -> Vec<String> {
288 let mut vars = Vec::new();
289 collect_expr_variables_inner(expr, &mut vars);
290 vars
291}
292
293fn collect_expr_variables_inner(expr: &Expr, vars: &mut Vec<String>) {
294 let mut add_var = |name: &String| {
295 if !vars.contains(name) {
296 vars.push(name.clone());
297 }
298 };
299
300 match expr {
301 Expr::Variable(name) => add_var(name),
302 Expr::Property(base, _) => collect_expr_variables_inner(base, vars),
303 Expr::BinaryOp { left, right, .. } => {
304 collect_expr_variables_inner(left, vars);
305 collect_expr_variables_inner(right, vars);
306 }
307 Expr::UnaryOp { expr: e, .. }
308 | Expr::IsNull(e)
309 | Expr::IsNotNull(e)
310 | Expr::IsUnique(e) => collect_expr_variables_inner(e, vars),
311 Expr::FunctionCall { args, .. } => {
312 for a in args {
313 collect_expr_variables_inner(a, vars);
314 }
315 }
316 Expr::List(items) => {
317 for item in items {
318 collect_expr_variables_inner(item, vars);
319 }
320 }
321 Expr::In { expr: e, list } => {
322 collect_expr_variables_inner(e, vars);
323 collect_expr_variables_inner(list, vars);
324 }
325 Expr::Case {
326 expr: case_expr,
327 when_then,
328 else_expr,
329 } => {
330 if let Some(e) = case_expr {
331 collect_expr_variables_inner(e, vars);
332 }
333 for (w, t) in when_then {
334 collect_expr_variables_inner(w, vars);
335 collect_expr_variables_inner(t, vars);
336 }
337 if let Some(e) = else_expr {
338 collect_expr_variables_inner(e, vars);
339 }
340 }
341 Expr::Map(entries) => {
342 for (_, v) in entries {
343 collect_expr_variables_inner(v, vars);
344 }
345 }
346 Expr::LabelCheck { expr, .. } => collect_expr_variables_inner(expr, vars),
347 Expr::ArrayIndex { array, index } => {
348 collect_expr_variables_inner(array, vars);
349 collect_expr_variables_inner(index, vars);
350 }
351 Expr::ArraySlice { array, start, end } => {
352 collect_expr_variables_inner(array, vars);
353 if let Some(s) = start {
354 collect_expr_variables_inner(s, vars);
355 }
356 if let Some(e) = end {
357 collect_expr_variables_inner(e, vars);
358 }
359 }
360 _ => {}
363 }
364}
365
366fn rewrite_order_by_expr_with_aliases(expr: &Expr, aliases: &HashMap<String, Expr>) -> Expr {
371 let repr = expr.to_string_repr();
372 if let Some(rewritten) = aliases.get(&repr) {
373 return rewritten.clone();
374 }
375
376 match expr {
377 Expr::Variable(name) => aliases.get(name).cloned().unwrap_or_else(|| expr.clone()),
378 Expr::Property(base, prop) => Expr::Property(
379 Box::new(rewrite_order_by_expr_with_aliases(base, aliases)),
380 prop.clone(),
381 ),
382 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
383 left: Box::new(rewrite_order_by_expr_with_aliases(left, aliases)),
384 op: *op,
385 right: Box::new(rewrite_order_by_expr_with_aliases(right, aliases)),
386 },
387 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
388 op: *op,
389 expr: Box::new(rewrite_order_by_expr_with_aliases(inner, aliases)),
390 },
391 Expr::FunctionCall {
392 name,
393 args,
394 distinct,
395 window_spec,
396 } => Expr::FunctionCall {
397 name: name.clone(),
398 args: args
399 .iter()
400 .map(|a| rewrite_order_by_expr_with_aliases(a, aliases))
401 .collect(),
402 distinct: *distinct,
403 window_spec: window_spec.clone(),
404 },
405 Expr::List(items) => Expr::List(
406 items
407 .iter()
408 .map(|item| rewrite_order_by_expr_with_aliases(item, aliases))
409 .collect(),
410 ),
411 Expr::Map(entries) => Expr::Map(
412 entries
413 .iter()
414 .map(|(k, v)| (k.clone(), rewrite_order_by_expr_with_aliases(v, aliases)))
415 .collect(),
416 ),
417 Expr::Case {
418 expr: case_expr,
419 when_then,
420 else_expr,
421 } => Expr::Case {
422 expr: case_expr
423 .as_ref()
424 .map(|e| Box::new(rewrite_order_by_expr_with_aliases(e, aliases))),
425 when_then: when_then
426 .iter()
427 .map(|(w, t)| {
428 (
429 rewrite_order_by_expr_with_aliases(w, aliases),
430 rewrite_order_by_expr_with_aliases(t, aliases),
431 )
432 })
433 .collect(),
434 else_expr: else_expr
435 .as_ref()
436 .map(|e| Box::new(rewrite_order_by_expr_with_aliases(e, aliases))),
437 },
438 _ => expr.clone(),
441 }
442}
443
444fn validate_function_call(name: &str, args: &[Expr], vars_in_scope: &[VariableInfo]) -> Result<()> {
447 let name_lower = name.to_lowercase();
448
449 if name_lower == "labels"
451 && let Some(Expr::Variable(var_name)) = args.first()
452 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
453 && !info.var_type.is_compatible_with(VariableType::Node)
454 {
455 return Err(anyhow!(
456 "SyntaxError: InvalidArgumentType - labels() requires a node argument"
457 ));
458 }
459
460 if name_lower == "type"
462 && let Some(Expr::Variable(var_name)) = args.first()
463 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
464 && !info.var_type.is_compatible_with(VariableType::Edge)
465 {
466 return Err(anyhow!(
467 "SyntaxError: InvalidArgumentType - type() requires a relationship argument"
468 ));
469 }
470
471 if name_lower == "properties"
473 && let Some(arg) = args.first()
474 {
475 match arg {
476 Expr::Literal(CypherLiteral::Integer(_))
477 | Expr::Literal(CypherLiteral::Float(_))
478 | Expr::Literal(CypherLiteral::String(_))
479 | Expr::Literal(CypherLiteral::Bool(_))
480 | Expr::List(_) => {
481 return Err(anyhow!(
482 "SyntaxError: InvalidArgumentType - properties() requires a node, relationship, or map"
483 ));
484 }
485 Expr::Variable(var_name) => {
486 if let Some(info) = find_var_in_scope(vars_in_scope, var_name)
487 && matches!(
488 info.var_type,
489 VariableType::Scalar | VariableType::ScalarLiteral
490 )
491 {
492 return Err(anyhow!(
493 "SyntaxError: InvalidArgumentType - properties() requires a node, relationship, or map"
494 ));
495 }
496 }
497 _ => {}
498 }
499 }
500
501 if (name_lower == "nodes" || name_lower == "relationships")
503 && let Some(Expr::Variable(var_name)) = args.first()
504 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
505 && !info.var_type.is_compatible_with(VariableType::Path)
506 {
507 return Err(anyhow!(
508 "SyntaxError: InvalidArgumentType - {}() requires a path argument",
509 name_lower
510 ));
511 }
512
513 if name_lower == "size"
515 && let Some(Expr::Variable(var_name)) = args.first()
516 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
517 && info.var_type == VariableType::Path
518 {
519 return Err(anyhow!(
520 "SyntaxError: InvalidArgumentType - size() requires a string, list, or map argument"
521 ));
522 }
523
524 if (name_lower == "length" || name_lower == "size")
528 && let Some(Expr::Variable(var_name)) = args.first()
529 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
530 && (info.var_type == VariableType::Node
531 || (info.var_type == VariableType::Edge && !info.is_vlp))
532 {
533 return Err(anyhow!(
534 "SyntaxError: InvalidArgumentType - {}() requires a string, list, or path argument",
535 name_lower
536 ));
537 }
538
539 Ok(())
540}
541
542fn is_non_boolean_literal(expr: &Expr) -> bool {
544 matches!(
545 expr,
546 Expr::Literal(CypherLiteral::Integer(_))
547 | Expr::Literal(CypherLiteral::Float(_))
548 | Expr::Literal(CypherLiteral::String(_))
549 | Expr::List(_)
550 | Expr::Map(_)
551 )
552}
553
554fn validate_boolean_expression(expr: &Expr) -> Result<()> {
556 if let Expr::BinaryOp { left, op, right } = expr
558 && matches!(op, BinaryOp::And | BinaryOp::Or | BinaryOp::Xor)
559 {
560 let op_name = format!("{op:?}").to_uppercase();
561 for operand in [left.as_ref(), right.as_ref()] {
562 if is_non_boolean_literal(operand) {
563 return Err(anyhow!(
564 "SyntaxError: InvalidArgumentType - {} requires boolean arguments",
565 op_name
566 ));
567 }
568 }
569 }
570 if let Expr::UnaryOp {
571 op: uni_cypher::ast::UnaryOp::Not,
572 expr: inner,
573 } = expr
574 && is_non_boolean_literal(inner)
575 {
576 return Err(anyhow!(
577 "SyntaxError: InvalidArgumentType - NOT requires a boolean argument"
578 ));
579 }
580 let mut result = Ok(());
581 expr.for_each_child(&mut |child| {
582 if result.is_ok() {
583 result = validate_boolean_expression(child);
584 }
585 });
586 result
587}
588
589fn validate_expression_variables(expr: &Expr, vars_in_scope: &[VariableInfo]) -> Result<()> {
591 let used_vars = collect_expr_variables(expr);
592 for var in used_vars {
593 if !is_var_in_scope(vars_in_scope, &var) {
594 return Err(anyhow!(
595 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
596 var
597 ));
598 }
599 }
600 Ok(())
601}
602
603fn is_aggregate_function_name(name: &str) -> bool {
605 matches!(
606 name.to_lowercase().as_str(),
607 "count"
608 | "sum"
609 | "avg"
610 | "min"
611 | "max"
612 | "collect"
613 | "stdev"
614 | "stdevp"
615 | "percentiledisc"
616 | "percentilecont"
617 | "btic_min"
618 | "btic_max"
619 | "btic_span_agg"
620 | "btic_count_at"
621 ) || uni_cypher::is_known_plugin_aggregate(name)
622}
623
624fn is_window_function(expr: &Expr) -> bool {
626 matches!(
627 expr,
628 Expr::FunctionCall {
629 window_spec: Some(_),
630 ..
631 }
632 )
633}
634
635fn is_compound_aggregate(expr: &Expr) -> bool {
640 if !expr.is_aggregate() {
641 return false;
642 }
643 match expr {
644 Expr::FunctionCall {
645 name, window_spec, ..
646 } => {
647 if window_spec.is_some() {
649 return true; }
651 !is_aggregate_function_name(name)
652 }
653 Expr::CountSubquery(_) | Expr::CollectSubquery(_) => false,
655 _ => true,
657 }
658}
659
660fn extract_inner_aggregates(expr: &Expr) -> Vec<Expr> {
668 let mut out = Vec::new();
669 extract_inner_aggregates_rec(expr, &mut out);
670 out
671}
672
673fn extract_inner_aggregates_rec(expr: &Expr, out: &mut Vec<Expr>) {
674 match expr {
675 Expr::FunctionCall {
676 name, window_spec, ..
677 } if window_spec.is_none() && is_aggregate_function_name(name) => {
678 out.push(expr.clone());
680 }
681 Expr::CountSubquery(_) | Expr::CollectSubquery(_) => {
682 out.push(expr.clone());
683 }
684 Expr::ListComprehension { list, .. } => {
686 extract_inner_aggregates_rec(list, out);
687 }
688 Expr::Quantifier { list, .. } => {
690 extract_inner_aggregates_rec(list, out);
691 }
692 Expr::Reduce { init, list, .. } => {
694 extract_inner_aggregates_rec(init, out);
695 extract_inner_aggregates_rec(list, out);
696 }
697 Expr::FunctionCall { args, .. } => {
699 for arg in args {
700 extract_inner_aggregates_rec(arg, out);
701 }
702 }
703 Expr::BinaryOp { left, right, .. } => {
704 extract_inner_aggregates_rec(left, out);
705 extract_inner_aggregates_rec(right, out);
706 }
707 Expr::UnaryOp { expr: e, .. }
708 | Expr::IsNull(e)
709 | Expr::IsNotNull(e)
710 | Expr::IsUnique(e) => extract_inner_aggregates_rec(e, out),
711 Expr::Property(base, _) => extract_inner_aggregates_rec(base, out),
712 Expr::List(items) => {
713 for item in items {
714 extract_inner_aggregates_rec(item, out);
715 }
716 }
717 Expr::Case {
718 expr: case_expr,
719 when_then,
720 else_expr,
721 } => {
722 if let Some(e) = case_expr {
723 extract_inner_aggregates_rec(e, out);
724 }
725 for (w, t) in when_then {
726 extract_inner_aggregates_rec(w, out);
727 extract_inner_aggregates_rec(t, out);
728 }
729 if let Some(e) = else_expr {
730 extract_inner_aggregates_rec(e, out);
731 }
732 }
733 Expr::In {
734 expr: in_expr,
735 list,
736 } => {
737 extract_inner_aggregates_rec(in_expr, out);
738 extract_inner_aggregates_rec(list, out);
739 }
740 Expr::ArrayIndex { array, index } => {
741 extract_inner_aggregates_rec(array, out);
742 extract_inner_aggregates_rec(index, out);
743 }
744 Expr::ArraySlice { array, start, end } => {
745 extract_inner_aggregates_rec(array, out);
746 if let Some(s) = start {
747 extract_inner_aggregates_rec(s, out);
748 }
749 if let Some(e) = end {
750 extract_inner_aggregates_rec(e, out);
751 }
752 }
753 Expr::Map(entries) => {
754 for (_, v) in entries {
755 extract_inner_aggregates_rec(v, out);
756 }
757 }
758 _ => {}
759 }
760}
761
762fn replace_aggregates_with_columns(expr: &Expr) -> Expr {
768 match expr {
769 Expr::FunctionCall {
770 name, window_spec, ..
771 } if window_spec.is_none() && is_aggregate_function_name(name) => {
772 Expr::Variable(aggregate_column_name(expr))
774 }
775 Expr::CountSubquery(_) | Expr::CollectSubquery(_) => {
776 Expr::Variable(aggregate_column_name(expr))
777 }
778 Expr::ListComprehension {
779 variable,
780 list,
781 where_clause,
782 map_expr,
783 } => Expr::ListComprehension {
784 variable: variable.clone(),
785 list: Box::new(replace_aggregates_with_columns(list)),
786 where_clause: where_clause.clone(), map_expr: map_expr.clone(), },
789 Expr::Quantifier {
790 quantifier,
791 variable,
792 list,
793 predicate,
794 } => Expr::Quantifier {
795 quantifier: *quantifier,
796 variable: variable.clone(),
797 list: Box::new(replace_aggregates_with_columns(list)),
798 predicate: predicate.clone(), },
800 Expr::Reduce {
801 accumulator,
802 init,
803 variable,
804 list,
805 expr: body,
806 } => Expr::Reduce {
807 accumulator: accumulator.clone(),
808 init: Box::new(replace_aggregates_with_columns(init)),
809 variable: variable.clone(),
810 list: Box::new(replace_aggregates_with_columns(list)),
811 expr: body.clone(), },
813 Expr::FunctionCall {
814 name,
815 args,
816 distinct,
817 window_spec,
818 } => Expr::FunctionCall {
819 name: name.clone(),
820 args: args.iter().map(replace_aggregates_with_columns).collect(),
821 distinct: *distinct,
822 window_spec: window_spec.clone(),
823 },
824 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
825 left: Box::new(replace_aggregates_with_columns(left)),
826 op: *op,
827 right: Box::new(replace_aggregates_with_columns(right)),
828 },
829 Expr::UnaryOp { op, expr: e } => Expr::UnaryOp {
830 op: *op,
831 expr: Box::new(replace_aggregates_with_columns(e)),
832 },
833 Expr::IsNull(e) => Expr::IsNull(Box::new(replace_aggregates_with_columns(e))),
834 Expr::IsNotNull(e) => Expr::IsNotNull(Box::new(replace_aggregates_with_columns(e))),
835 Expr::IsUnique(e) => Expr::IsUnique(Box::new(replace_aggregates_with_columns(e))),
836 Expr::Property(base, prop) => Expr::Property(
837 Box::new(replace_aggregates_with_columns(base)),
838 prop.clone(),
839 ),
840 Expr::List(items) => {
841 Expr::List(items.iter().map(replace_aggregates_with_columns).collect())
842 }
843 Expr::Case {
844 expr: case_expr,
845 when_then,
846 else_expr,
847 } => Expr::Case {
848 expr: case_expr
849 .as_ref()
850 .map(|e| Box::new(replace_aggregates_with_columns(e))),
851 when_then: when_then
852 .iter()
853 .map(|(w, t)| {
854 (
855 replace_aggregates_with_columns(w),
856 replace_aggregates_with_columns(t),
857 )
858 })
859 .collect(),
860 else_expr: else_expr
861 .as_ref()
862 .map(|e| Box::new(replace_aggregates_with_columns(e))),
863 },
864 Expr::In {
865 expr: in_expr,
866 list,
867 } => Expr::In {
868 expr: Box::new(replace_aggregates_with_columns(in_expr)),
869 list: Box::new(replace_aggregates_with_columns(list)),
870 },
871 Expr::ArrayIndex { array, index } => Expr::ArrayIndex {
872 array: Box::new(replace_aggregates_with_columns(array)),
873 index: Box::new(replace_aggregates_with_columns(index)),
874 },
875 Expr::ArraySlice { array, start, end } => Expr::ArraySlice {
876 array: Box::new(replace_aggregates_with_columns(array)),
877 start: start
878 .as_ref()
879 .map(|e| Box::new(replace_aggregates_with_columns(e))),
880 end: end
881 .as_ref()
882 .map(|e| Box::new(replace_aggregates_with_columns(e))),
883 },
884 Expr::Map(entries) => Expr::Map(
885 entries
886 .iter()
887 .map(|(k, v)| (k.clone(), replace_aggregates_with_columns(v)))
888 .collect(),
889 ),
890 other => other.clone(),
892 }
893}
894
895fn contains_aggregate_recursive(expr: &Expr) -> bool {
897 match expr {
898 Expr::FunctionCall { name, args, .. } => {
899 is_aggregate_function_name(name) || args.iter().any(contains_aggregate_recursive)
900 }
901 Expr::BinaryOp { left, right, .. } => {
902 contains_aggregate_recursive(left) || contains_aggregate_recursive(right)
903 }
904 Expr::UnaryOp { expr: e, .. }
905 | Expr::IsNull(e)
906 | Expr::IsNotNull(e)
907 | Expr::IsUnique(e) => contains_aggregate_recursive(e),
908 Expr::List(items) => items.iter().any(contains_aggregate_recursive),
909 Expr::Case {
910 expr,
911 when_then,
912 else_expr,
913 } => {
914 expr.as_deref().is_some_and(contains_aggregate_recursive)
915 || when_then.iter().any(|(w, t)| {
916 contains_aggregate_recursive(w) || contains_aggregate_recursive(t)
917 })
918 || else_expr
919 .as_deref()
920 .is_some_and(contains_aggregate_recursive)
921 }
922 Expr::In { expr, list } => {
923 contains_aggregate_recursive(expr) || contains_aggregate_recursive(list)
924 }
925 Expr::Property(base, _) => contains_aggregate_recursive(base),
926 Expr::ListComprehension { list, .. } => {
927 contains_aggregate_recursive(list)
929 }
930 Expr::Quantifier { list, .. } => contains_aggregate_recursive(list),
931 Expr::Reduce { init, list, .. } => {
932 contains_aggregate_recursive(init) || contains_aggregate_recursive(list)
933 }
934 Expr::ArrayIndex { array, index } => {
935 contains_aggregate_recursive(array) || contains_aggregate_recursive(index)
936 }
937 Expr::ArraySlice { array, start, end } => {
938 contains_aggregate_recursive(array)
939 || start.as_deref().is_some_and(contains_aggregate_recursive)
940 || end.as_deref().is_some_and(contains_aggregate_recursive)
941 }
942 Expr::Map(entries) => entries.iter().any(|(_, v)| contains_aggregate_recursive(v)),
943 _ => false,
944 }
945}
946
947fn contains_non_deterministic(expr: &Expr) -> bool {
949 if matches!(expr, Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("rand")) {
950 return true;
951 }
952 let mut found = false;
953 expr.for_each_child(&mut |child| {
954 if !found {
955 found = contains_non_deterministic(child);
956 }
957 });
958 found
959}
960
961fn collect_aggregate_reprs(expr: &Expr, out: &mut HashSet<String>) {
962 match expr {
963 Expr::FunctionCall { name, args, .. } => {
964 if is_aggregate_function_name(name) {
965 out.insert(expr.to_string_repr());
966 return;
967 }
968 for arg in args {
969 collect_aggregate_reprs(arg, out);
970 }
971 }
972 Expr::BinaryOp { left, right, .. } => {
973 collect_aggregate_reprs(left, out);
974 collect_aggregate_reprs(right, out);
975 }
976 Expr::UnaryOp { expr, .. }
977 | Expr::IsNull(expr)
978 | Expr::IsNotNull(expr)
979 | Expr::IsUnique(expr) => collect_aggregate_reprs(expr, out),
980 Expr::List(items) => {
981 for item in items {
982 collect_aggregate_reprs(item, out);
983 }
984 }
985 Expr::Case {
986 expr,
987 when_then,
988 else_expr,
989 } => {
990 if let Some(e) = expr {
991 collect_aggregate_reprs(e, out);
992 }
993 for (w, t) in when_then {
994 collect_aggregate_reprs(w, out);
995 collect_aggregate_reprs(t, out);
996 }
997 if let Some(e) = else_expr {
998 collect_aggregate_reprs(e, out);
999 }
1000 }
1001 Expr::In { expr, list } => {
1002 collect_aggregate_reprs(expr, out);
1003 collect_aggregate_reprs(list, out);
1004 }
1005 Expr::Property(base, _) => collect_aggregate_reprs(base, out),
1006 Expr::ListComprehension { list, .. } => {
1007 collect_aggregate_reprs(list, out);
1008 }
1009 Expr::Quantifier { list, .. } => {
1010 collect_aggregate_reprs(list, out);
1011 }
1012 Expr::Reduce { init, list, .. } => {
1013 collect_aggregate_reprs(init, out);
1014 collect_aggregate_reprs(list, out);
1015 }
1016 Expr::ArrayIndex { array, index } => {
1017 collect_aggregate_reprs(array, out);
1018 collect_aggregate_reprs(index, out);
1019 }
1020 Expr::ArraySlice { array, start, end } => {
1021 collect_aggregate_reprs(array, out);
1022 if let Some(s) = start {
1023 collect_aggregate_reprs(s, out);
1024 }
1025 if let Some(e) = end {
1026 collect_aggregate_reprs(e, out);
1027 }
1028 }
1029 _ => {}
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1034enum NonAggregateRef {
1035 Var(String),
1036 Property {
1037 repr: String,
1038 base_var: Option<String>,
1039 },
1040}
1041
1042fn collect_non_aggregate_refs(expr: &Expr, inside_agg: bool, out: &mut Vec<NonAggregateRef>) {
1043 match expr {
1044 Expr::FunctionCall { name, args, .. } => {
1045 if is_aggregate_function_name(name) {
1046 return;
1047 }
1048 for arg in args {
1049 collect_non_aggregate_refs(arg, inside_agg, out);
1050 }
1051 }
1052 Expr::Variable(v) if !inside_agg => out.push(NonAggregateRef::Var(v.clone())),
1053 Expr::Property(base, _) if !inside_agg => {
1054 let base_var = if let Expr::Variable(v) = base.as_ref() {
1055 Some(v.clone())
1056 } else {
1057 None
1058 };
1059 out.push(NonAggregateRef::Property {
1060 repr: expr.to_string_repr(),
1061 base_var,
1062 });
1063 }
1064 Expr::BinaryOp { left, right, .. } => {
1065 collect_non_aggregate_refs(left, inside_agg, out);
1066 collect_non_aggregate_refs(right, inside_agg, out);
1067 }
1068 Expr::UnaryOp { expr, .. }
1069 | Expr::IsNull(expr)
1070 | Expr::IsNotNull(expr)
1071 | Expr::IsUnique(expr) => collect_non_aggregate_refs(expr, inside_agg, out),
1072 Expr::List(items) => {
1073 for item in items {
1074 collect_non_aggregate_refs(item, inside_agg, out);
1075 }
1076 }
1077 Expr::Case {
1078 expr,
1079 when_then,
1080 else_expr,
1081 } => {
1082 if let Some(e) = expr {
1083 collect_non_aggregate_refs(e, inside_agg, out);
1084 }
1085 for (w, t) in when_then {
1086 collect_non_aggregate_refs(w, inside_agg, out);
1087 collect_non_aggregate_refs(t, inside_agg, out);
1088 }
1089 if let Some(e) = else_expr {
1090 collect_non_aggregate_refs(e, inside_agg, out);
1091 }
1092 }
1093 Expr::In { expr, list } => {
1094 collect_non_aggregate_refs(expr, inside_agg, out);
1095 collect_non_aggregate_refs(list, inside_agg, out);
1096 }
1097 Expr::ListComprehension { list, .. } => {
1100 collect_non_aggregate_refs(list, inside_agg, out);
1101 }
1102 Expr::Quantifier { list, .. } => {
1103 collect_non_aggregate_refs(list, inside_agg, out);
1104 }
1105 Expr::Reduce { init, list, .. } => {
1106 collect_non_aggregate_refs(init, inside_agg, out);
1107 collect_non_aggregate_refs(list, inside_agg, out);
1108 }
1109 _ => {}
1110 }
1111}
1112
1113fn validate_with_order_by_aggregate_item(
1114 expr: &Expr,
1115 projected_aggregate_reprs: &HashSet<String>,
1116 projected_simple_reprs: &HashSet<String>,
1117 projected_aliases: &HashSet<String>,
1118) -> Result<()> {
1119 let mut aggregate_reprs = HashSet::new();
1120 collect_aggregate_reprs(expr, &mut aggregate_reprs);
1121 for agg in aggregate_reprs {
1122 if !projected_aggregate_reprs.contains(&agg) {
1123 return Err(anyhow!(
1124 "SyntaxError: UndefinedVariable - Aggregation expression '{}' is not projected in WITH",
1125 agg
1126 ));
1127 }
1128 }
1129
1130 let mut refs = Vec::new();
1131 collect_non_aggregate_refs(expr, false, &mut refs);
1132 refs.retain(|r| match r {
1133 NonAggregateRef::Var(v) => !projected_aliases.contains(v),
1134 NonAggregateRef::Property { repr, .. } => !projected_simple_reprs.contains(repr),
1135 });
1136
1137 let mut dedup = HashSet::new();
1138 refs.retain(|r| {
1139 let key = match r {
1140 NonAggregateRef::Var(v) => format!("v:{v}"),
1141 NonAggregateRef::Property { repr, .. } => format!("p:{repr}"),
1142 };
1143 dedup.insert(key)
1144 });
1145
1146 if refs.len() > 1 {
1147 return Err(anyhow!(
1148 "SyntaxError: AmbiguousAggregationExpression - ORDER BY item mixes aggregation with multiple non-grouping references"
1149 ));
1150 }
1151
1152 if let Some(r) = refs.first() {
1153 return match r {
1154 NonAggregateRef::Var(v) => Err(anyhow!(
1155 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1156 v
1157 )),
1158 NonAggregateRef::Property { base_var, .. } => Err(anyhow!(
1159 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1160 base_var
1161 .clone()
1162 .unwrap_or_else(|| "<property-base>".to_string())
1163 )),
1164 };
1165 }
1166
1167 Ok(())
1168}
1169
1170fn validate_no_aggregation_in_where(predicate: &Expr) -> Result<()> {
1172 if contains_aggregate_recursive(predicate) {
1173 return Err(anyhow!(
1174 "SyntaxError: InvalidAggregation - Aggregation functions not allowed in WHERE"
1175 ));
1176 }
1177 Ok(())
1178}
1179
1180#[derive(Debug, Clone, Copy)]
1181enum ConstNumber {
1182 Int(i64),
1183 Float(f64),
1184}
1185
1186impl ConstNumber {
1187 fn to_f64(self) -> f64 {
1188 match self {
1189 Self::Int(v) => v as f64,
1190 Self::Float(v) => v,
1191 }
1192 }
1193}
1194
1195fn eval_const_numeric_expr(
1196 expr: &Expr,
1197 params: &HashMap<String, uni_common::Value>,
1198) -> Result<ConstNumber> {
1199 match expr {
1200 Expr::Literal(CypherLiteral::Integer(n)) => Ok(ConstNumber::Int(*n)),
1201 Expr::Literal(CypherLiteral::Float(f)) => Ok(ConstNumber::Float(*f)),
1202 Expr::Parameter(name) => match params.get(name) {
1203 Some(uni_common::Value::Int(n)) => Ok(ConstNumber::Int(*n)),
1204 Some(uni_common::Value::Float(f)) => Ok(ConstNumber::Float(*f)),
1205 Some(uni_common::Value::Null) => Err(anyhow!(
1206 "TypeError: InvalidArgumentType - expected numeric value for parameter ${}, got null",
1207 name
1208 )),
1209 Some(other) => Err(anyhow!(
1210 "TypeError: InvalidArgumentType - expected numeric value for parameter ${}, got {:?}",
1211 name,
1212 other
1213 )),
1214 None => Err(anyhow!(
1215 "SyntaxError: InvalidArgumentType - expression is not a constant integer expression"
1216 )),
1217 },
1218 Expr::UnaryOp {
1219 op: uni_cypher::ast::UnaryOp::Neg,
1220 expr,
1221 } => match eval_const_numeric_expr(expr, params)? {
1222 ConstNumber::Int(v) => Ok(ConstNumber::Int(-v)),
1223 ConstNumber::Float(v) => Ok(ConstNumber::Float(-v)),
1224 },
1225 Expr::BinaryOp { left, op, right } => {
1226 let l = eval_const_numeric_expr(left, params)?;
1227 let r = eval_const_numeric_expr(right, params)?;
1228 match op {
1229 BinaryOp::Add => match (l, r) {
1230 (ConstNumber::Int(a), ConstNumber::Int(b)) => Ok(ConstNumber::Int(a + b)),
1231 _ => Ok(ConstNumber::Float(l.to_f64() + r.to_f64())),
1232 },
1233 BinaryOp::Sub => match (l, r) {
1234 (ConstNumber::Int(a), ConstNumber::Int(b)) => Ok(ConstNumber::Int(a - b)),
1235 _ => Ok(ConstNumber::Float(l.to_f64() - r.to_f64())),
1236 },
1237 BinaryOp::Mul => match (l, r) {
1238 (ConstNumber::Int(a), ConstNumber::Int(b)) => Ok(ConstNumber::Int(a * b)),
1239 _ => Ok(ConstNumber::Float(l.to_f64() * r.to_f64())),
1240 },
1241 BinaryOp::Div => Ok(ConstNumber::Float(l.to_f64() / r.to_f64())),
1242 BinaryOp::Mod => match (l, r) {
1243 (ConstNumber::Int(a), ConstNumber::Int(b)) => Ok(ConstNumber::Int(a % b)),
1244 _ => Ok(ConstNumber::Float(l.to_f64() % r.to_f64())),
1245 },
1246 BinaryOp::Pow => Ok(ConstNumber::Float(l.to_f64().powf(r.to_f64()))),
1247 _ => Err(anyhow!(
1248 "SyntaxError: InvalidArgumentType - unsupported operator in constant expression"
1249 )),
1250 }
1251 }
1252 Expr::FunctionCall { name, args, .. } => {
1253 let lower = name.to_lowercase();
1254 match lower.as_str() {
1255 "rand" if args.is_empty() => {
1256 use rand::RngExt;
1257 let mut rng = rand::rng();
1258 Ok(ConstNumber::Float(rng.random::<f64>()))
1259 }
1260 "tointeger" | "toint" if args.len() == 1 => {
1261 match eval_const_numeric_expr(&args[0], params)? {
1262 ConstNumber::Int(v) => Ok(ConstNumber::Int(v)),
1263 ConstNumber::Float(v) => Ok(ConstNumber::Int(v.trunc() as i64)),
1264 }
1265 }
1266 "ceil" if args.len() == 1 => Ok(ConstNumber::Float(
1267 eval_const_numeric_expr(&args[0], params)?.to_f64().ceil(),
1268 )),
1269 "floor" if args.len() == 1 => Ok(ConstNumber::Float(
1270 eval_const_numeric_expr(&args[0], params)?.to_f64().floor(),
1271 )),
1272 "abs" if args.len() == 1 => match eval_const_numeric_expr(&args[0], params)? {
1273 ConstNumber::Int(v) => Ok(ConstNumber::Int(v.abs())),
1274 ConstNumber::Float(v) => Ok(ConstNumber::Float(v.abs())),
1275 },
1276 _ => Err(anyhow!(
1277 "SyntaxError: InvalidArgumentType - expression is not a constant integer expression"
1278 )),
1279 }
1280 }
1281 _ => Err(anyhow!(
1282 "SyntaxError: InvalidArgumentType - expression is not a constant integer expression"
1283 )),
1284 }
1285}
1286
1287fn parse_non_negative_integer(
1290 expr: &Expr,
1291 clause_name: &str,
1292 params: &HashMap<String, uni_common::Value>,
1293) -> Result<Option<usize>> {
1294 let referenced_vars = collect_expr_variables(expr);
1295 if !referenced_vars.is_empty() {
1296 return Err(anyhow!(
1297 "SyntaxError: NonConstantExpression - {} requires expression independent of row variables",
1298 clause_name
1299 ));
1300 }
1301
1302 let value = eval_const_numeric_expr(expr, params)?;
1303 let as_int = match value {
1304 ConstNumber::Int(v) => v,
1305 ConstNumber::Float(v) => {
1306 if !v.is_finite() || (v.fract().abs() > f64::EPSILON) {
1307 return Err(anyhow!(
1308 "SyntaxError: InvalidArgumentType - {} requires integer, got float",
1309 clause_name
1310 ));
1311 }
1312 v as i64
1313 }
1314 };
1315 if as_int < 0 {
1316 return Err(anyhow!(
1317 "SyntaxError: NegativeIntegerArgument - {} requires non-negative integer",
1318 clause_name
1319 ));
1320 }
1321 Ok(Some(as_int as usize))
1322}
1323
1324fn validate_no_nested_aggregation(expr: &Expr) -> Result<()> {
1326 if let Expr::FunctionCall { name, args, .. } = expr
1327 && is_aggregate_function_name(name)
1328 {
1329 for arg in args {
1330 if contains_aggregate_recursive(arg) {
1331 return Err(anyhow!(
1332 "SyntaxError: NestedAggregation - Cannot nest aggregation functions"
1333 ));
1334 }
1335 if contains_non_deterministic(arg) {
1336 return Err(anyhow!(
1337 "SyntaxError: NonConstantExpression - Non-deterministic function inside aggregation"
1338 ));
1339 }
1340 }
1341 }
1342 let mut result = Ok(());
1343 expr.for_each_child(&mut |child| {
1344 if result.is_ok() {
1345 result = validate_no_nested_aggregation(child);
1346 }
1347 });
1348 result
1349}
1350
1351fn validate_no_deleted_entity_access(expr: &Expr, deleted_vars: &HashSet<String>) -> Result<()> {
1355 if let Expr::Property(inner, _) = expr
1357 && let Expr::Variable(name) = inner.as_ref()
1358 && deleted_vars.contains(name)
1359 {
1360 return Err(anyhow!(
1361 "EntityNotFound: DeletedEntityAccess - Cannot access properties of deleted entity '{}'",
1362 name
1363 ));
1364 }
1365 if let Expr::FunctionCall { name, args, .. } = expr
1367 && matches!(name.to_lowercase().as_str(), "labels" | "keys")
1368 && args.len() == 1
1369 && let Expr::Variable(var) = &args[0]
1370 && deleted_vars.contains(var)
1371 {
1372 return Err(anyhow!(
1373 "EntityNotFound: DeletedEntityAccess - Cannot access {} of deleted entity '{}'",
1374 name.to_lowercase(),
1375 var
1376 ));
1377 }
1378 let mut result = Ok(());
1379 expr.for_each_child(&mut |child| {
1380 if result.is_ok() {
1381 result = validate_no_deleted_entity_access(child, deleted_vars);
1382 }
1383 });
1384 result
1385}
1386
1387fn validate_property_variables(
1390 properties: &Option<Expr>,
1391 vars_in_scope: &[VariableInfo],
1392 create_vars: &[&str],
1393) -> Result<()> {
1394 if let Some(props) = properties {
1395 for var in collect_expr_variables(props) {
1396 if !is_var_in_scope(vars_in_scope, &var) && !create_vars.contains(&var.as_str()) {
1397 return Err(anyhow!(
1398 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1399 var
1400 ));
1401 }
1402 }
1403 }
1404 Ok(())
1405}
1406
1407fn check_not_already_bound(
1410 name: &str,
1411 vars_in_scope: &[VariableInfo],
1412 create_vars: &[&str],
1413) -> Result<()> {
1414 if is_var_in_scope(vars_in_scope, name) {
1415 return Err(anyhow!(
1416 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
1417 name
1418 ));
1419 }
1420 if create_vars.contains(&name) {
1421 return Err(anyhow!(
1422 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined in CREATE",
1423 name
1424 ));
1425 }
1426 Ok(())
1427}
1428
1429fn build_merge_scope(pattern: &Pattern, vars_in_scope: &[VariableInfo]) -> Vec<VariableInfo> {
1430 let mut scope = vars_in_scope.to_vec();
1431
1432 for path in &pattern.paths {
1433 if let Some(path_var) = &path.variable
1434 && !path_var.is_empty()
1435 && !is_var_in_scope(&scope, path_var)
1436 {
1437 scope.push(VariableInfo::new(path_var.clone(), VariableType::Path));
1438 }
1439 for element in &path.elements {
1440 match element {
1441 PatternElement::Node(n) => {
1442 if let Some(v) = &n.variable
1443 && !v.is_empty()
1444 && !is_var_in_scope(&scope, v)
1445 {
1446 scope.push(VariableInfo::new(v.clone(), VariableType::Node));
1447 }
1448 }
1449 PatternElement::Relationship(r) => {
1450 if let Some(v) = &r.variable
1451 && !v.is_empty()
1452 && !is_var_in_scope(&scope, v)
1453 {
1454 scope.push(VariableInfo::new(v.clone(), VariableType::Edge));
1455 }
1456 }
1457 PatternElement::Parenthesized { .. } => {}
1458 }
1459 }
1460 }
1461
1462 scope
1463}
1464
1465fn validate_merge_set_item(item: &SetItem, vars_in_scope: &[VariableInfo]) -> Result<()> {
1466 match item {
1467 SetItem::Property { expr, value } => {
1468 validate_expression_variables(expr, vars_in_scope)?;
1469 validate_expression(expr, vars_in_scope)?;
1470 validate_expression_variables(value, vars_in_scope)?;
1471 validate_expression(value, vars_in_scope)?;
1472 if contains_pattern_predicate(expr) || contains_pattern_predicate(value) {
1473 return Err(anyhow!(
1474 "SyntaxError: UnexpectedSyntax - Pattern predicates are not allowed in SET"
1475 ));
1476 }
1477 }
1478 SetItem::Variable { variable, value } | SetItem::VariablePlus { variable, value } => {
1479 if !is_var_in_scope(vars_in_scope, variable) {
1480 return Err(anyhow!(
1481 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1482 variable
1483 ));
1484 }
1485 validate_expression_variables(value, vars_in_scope)?;
1486 validate_expression(value, vars_in_scope)?;
1487 if contains_pattern_predicate(value) {
1488 return Err(anyhow!(
1489 "SyntaxError: UnexpectedSyntax - Pattern predicates are not allowed in SET"
1490 ));
1491 }
1492 }
1493 SetItem::Labels { variable, .. } => {
1494 if !is_var_in_scope(vars_in_scope, variable) {
1495 return Err(anyhow!(
1496 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1497 variable
1498 ));
1499 }
1500 }
1501 }
1502
1503 Ok(())
1504}
1505
1506fn reject_null_merge_properties(properties: &Option<Expr>) -> Result<()> {
1509 if let Some(Expr::Map(entries)) = properties {
1510 for (key, value) in entries {
1511 if matches!(value, Expr::Literal(CypherLiteral::Null)) {
1512 return Err(anyhow!(
1513 "SemanticError: MergeReadOwnWrites - MERGE cannot use null property value for '{}'",
1514 key
1515 ));
1516 }
1517 }
1518 }
1519 Ok(())
1520}
1521
1522fn collect_pattern_labels(pattern: &uni_cypher::ast::Pattern) -> Vec<String> {
1527 let mut out = Vec::new();
1528 for path in &pattern.paths {
1529 for element in &path.elements {
1530 if let PatternElement::Node(n) = element {
1531 for l in n.labels.names() {
1532 out.push(l.clone());
1533 }
1534 }
1535 }
1536 }
1537 out
1538}
1539
1540fn validate_merge_clause(merge_clause: &MergeClause, vars_in_scope: &[VariableInfo]) -> Result<()> {
1541 for path in &merge_clause.pattern.paths {
1542 for element in &path.elements {
1543 match element {
1544 PatternElement::Node(n) => {
1545 if let Some(Expr::Parameter(_)) = &n.properties {
1546 return Err(anyhow!(
1547 "SyntaxError: InvalidParameterUse - Parameters cannot be used as node predicates"
1548 ));
1549 }
1550 reject_null_merge_properties(&n.properties)?;
1551 if let Some(variable) = &n.variable
1555 && !variable.is_empty()
1556 && is_var_in_scope(vars_in_scope, variable)
1557 {
1558 let is_standalone = path.elements.len() == 1;
1559 let has_new_labels = !n.labels.is_empty();
1560 let has_new_properties = n.properties.is_some();
1561 if is_standalone || has_new_labels || has_new_properties {
1562 return Err(anyhow!(
1563 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
1564 variable
1565 ));
1566 }
1567 }
1568 }
1569 PatternElement::Relationship(r) => {
1570 if let Some(variable) = &r.variable
1571 && !variable.is_empty()
1572 && is_var_in_scope(vars_in_scope, variable)
1573 {
1574 return Err(anyhow!(
1575 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
1576 variable
1577 ));
1578 }
1579 if r.types.len() != 1 {
1580 return Err(anyhow!(
1581 "SyntaxError: NoSingleRelationshipType - Exactly one relationship type required for MERGE"
1582 ));
1583 }
1584 if r.range.is_some() {
1585 return Err(anyhow!(
1586 "SyntaxError: CreatingVarLength - Variable length relationships cannot be created"
1587 ));
1588 }
1589 if let Some(Expr::Parameter(_)) = &r.properties {
1590 return Err(anyhow!(
1591 "SyntaxError: InvalidParameterUse - Parameters cannot be used as relationship predicates"
1592 ));
1593 }
1594 reject_null_merge_properties(&r.properties)?;
1595 }
1596 PatternElement::Parenthesized { .. } => {}
1597 }
1598 }
1599 }
1600
1601 let merge_scope = build_merge_scope(&merge_clause.pattern, vars_in_scope);
1602 for item in &merge_clause.on_create {
1603 validate_merge_set_item(item, &merge_scope)?;
1604 }
1605 for item in &merge_clause.on_match {
1606 validate_merge_set_item(item, &merge_scope)?;
1607 }
1608
1609 Ok(())
1610}
1611
1612fn validate_expression(expr: &Expr, vars_in_scope: &[VariableInfo]) -> Result<()> {
1614 validate_boolean_expression(expr)?;
1616 validate_no_nested_aggregation(expr)?;
1617
1618 fn validate_all(exprs: &[Expr], vars: &[VariableInfo]) -> Result<()> {
1620 for e in exprs {
1621 validate_expression(e, vars)?;
1622 }
1623 Ok(())
1624 }
1625
1626 match expr {
1627 Expr::FunctionCall { name, args, .. } => {
1628 validate_function_call(name, args, vars_in_scope)?;
1629 validate_all(args, vars_in_scope)
1630 }
1631 Expr::BinaryOp { left, right, .. } => {
1632 validate_expression(left, vars_in_scope)?;
1633 validate_expression(right, vars_in_scope)
1634 }
1635 Expr::UnaryOp { expr: e, .. }
1636 | Expr::IsNull(e)
1637 | Expr::IsNotNull(e)
1638 | Expr::IsUnique(e) => validate_expression(e, vars_in_scope),
1639 Expr::Property(base, prop) => {
1640 if let Expr::Variable(var_name) = base.as_ref()
1641 && let Some(var_info) = find_var_in_scope(vars_in_scope, var_name)
1642 {
1643 if var_info.var_type == VariableType::Path {
1645 return Err(anyhow!(
1646 "SyntaxError: InvalidArgumentType - Type mismatch: expected Node or Relationship but was Path for property access '{}.{}'",
1647 var_name,
1648 prop
1649 ));
1650 }
1651 if var_info.var_type == VariableType::ScalarLiteral {
1653 return Err(anyhow!(
1654 "TypeError: InvalidArgumentType - Property access on a non-graph element is not allowed"
1655 ));
1656 }
1657 }
1658 validate_expression(base, vars_in_scope)
1659 }
1660 Expr::List(items) => validate_all(items, vars_in_scope),
1661 Expr::Case {
1662 expr: case_expr,
1663 when_then,
1664 else_expr,
1665 } => {
1666 if let Some(e) = case_expr {
1667 validate_expression(e, vars_in_scope)?;
1668 }
1669 for (w, t) in when_then {
1670 validate_expression(w, vars_in_scope)?;
1671 validate_expression(t, vars_in_scope)?;
1672 }
1673 if let Some(e) = else_expr {
1674 validate_expression(e, vars_in_scope)?;
1675 }
1676 Ok(())
1677 }
1678 Expr::In { expr: e, list } => {
1679 validate_expression(e, vars_in_scope)?;
1680 validate_expression(list, vars_in_scope)
1681 }
1682 Expr::Exists {
1683 query,
1684 from_pattern_predicate: true,
1685 } => {
1686 if let Query::Single(stmt) = query.as_ref() {
1689 for clause in &stmt.clauses {
1690 if let Clause::Match(m) = clause {
1691 for path in &m.pattern.paths {
1692 for elem in &path.elements {
1693 match elem {
1694 PatternElement::Node(n) => {
1695 if let Some(var) = &n.variable
1696 && !is_var_in_scope(vars_in_scope, var)
1697 {
1698 return Err(anyhow!(
1699 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1700 var
1701 ));
1702 }
1703 }
1704 PatternElement::Relationship(r) => {
1705 if let Some(var) = &r.variable
1706 && !is_var_in_scope(vars_in_scope, var)
1707 {
1708 return Err(anyhow!(
1709 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
1710 var
1711 ));
1712 }
1713 }
1714 _ => {}
1715 }
1716 }
1717 }
1718 }
1719 }
1720 }
1721 Ok(())
1722 }
1723 _ => Ok(()),
1724 }
1725}
1726
1727#[derive(Debug, Clone)]
1731pub struct QppStepInfo {
1732 pub edge_type_ids: Vec<u32>,
1734 pub direction: Direction,
1736 pub target_label: Option<String>,
1738}
1739
1740#[derive(Debug, Clone)]
1745#[non_exhaustive]
1746pub enum FusionKind {
1747 BtreeUnion,
1749 SortedKWayMerge,
1751 VidUidForkFirst,
1755 AnnRerank,
1759 Bm25Rrf,
1763}
1764
1765#[derive(Debug, Clone)]
1771pub enum LogicalPlan {
1772 Union {
1774 left: Box<LogicalPlan>,
1775 right: Box<LogicalPlan>,
1776 all: bool,
1778 },
1779 Scan {
1781 label_id: u16,
1782 labels: Vec<String>,
1783 variable: String,
1784 filter: Option<Expr>,
1785 optional: bool,
1786 },
1787 FusedIndexScan {
1800 label_id: u16,
1801 labels: Vec<String>,
1802 variable: String,
1803 filter: Option<Expr>,
1804 optional: bool,
1805 kind: FusionKind,
1806 },
1807 FusedIndexScanWrapped {
1819 inner: Box<LogicalPlan>,
1820 kind: FusionKind,
1821 },
1822 ExtIdLookup {
1825 variable: String,
1826 ext_id: String,
1827 filter: Option<Expr>,
1828 optional: bool,
1829 },
1830 ScanAll {
1833 variable: String,
1834 filter: Option<Expr>,
1835 optional: bool,
1836 },
1837 ScanMainByLabels {
1842 labels: Vec<String>,
1843 variable: String,
1844 filter: Option<Expr>,
1845 optional: bool,
1846 },
1847 Empty,
1849 Unwind {
1851 input: Box<LogicalPlan>,
1852 expr: Expr,
1853 variable: String,
1854 },
1855 Traverse {
1856 input: Box<LogicalPlan>,
1857 edge_type_ids: Vec<u32>,
1858 direction: Direction,
1859 source_variable: String,
1860 target_variable: String,
1861 target_label_id: u16,
1862 step_variable: Option<String>,
1863 min_hops: usize,
1864 max_hops: usize,
1865 optional: bool,
1866 target_filter: Option<Expr>,
1867 path_variable: Option<String>,
1868 edge_properties: HashSet<String>,
1869 is_variable_length: bool,
1872 optional_pattern_vars: HashSet<String>,
1876 scope_match_variables: HashSet<String>,
1881 edge_filter_expr: Option<Expr>,
1883 path_mode: crate::query::df_graph::nfa::PathMode,
1885 qpp_steps: Option<Vec<QppStepInfo>>,
1889 },
1890 TraverseMainByType {
1894 type_names: Vec<String>,
1895 input: Box<LogicalPlan>,
1896 direction: Direction,
1897 source_variable: String,
1898 target_variable: String,
1899 step_variable: Option<String>,
1900 min_hops: usize,
1901 max_hops: usize,
1902 optional: bool,
1903 target_filter: Option<Expr>,
1904 path_variable: Option<String>,
1905 is_variable_length: bool,
1908 optional_pattern_vars: HashSet<String>,
1911 scope_match_variables: HashSet<String>,
1915 edge_filter_expr: Option<Expr>,
1917 path_mode: crate::query::df_graph::nfa::PathMode,
1919 },
1920 Filter {
1921 input: Box<LogicalPlan>,
1922 predicate: Expr,
1923 optional_variables: HashSet<String>,
1927 },
1928 Create {
1929 input: Box<LogicalPlan>,
1930 pattern: Pattern,
1931 },
1932 CreateBatch {
1937 input: Box<LogicalPlan>,
1938 patterns: Vec<Pattern>,
1939 },
1940 Merge {
1941 input: Box<LogicalPlan>,
1942 pattern: Pattern,
1943 on_match: Option<SetClause>,
1944 on_create: Option<SetClause>,
1945 },
1946 Set {
1947 input: Box<LogicalPlan>,
1948 items: Vec<SetItem>,
1949 },
1950 Remove {
1951 input: Box<LogicalPlan>,
1952 items: Vec<RemoveItem>,
1953 },
1954 Delete {
1955 input: Box<LogicalPlan>,
1956 items: Vec<Expr>,
1957 detach: bool,
1958 },
1959 Foreach {
1961 input: Box<LogicalPlan>,
1962 variable: String,
1963 list: Expr,
1964 body: Vec<LogicalPlan>,
1965 },
1966 Sort {
1967 input: Box<LogicalPlan>,
1968 order_by: Vec<SortItem>,
1969 },
1970 Limit {
1971 input: Box<LogicalPlan>,
1972 skip: Option<usize>,
1973 fetch: Option<usize>,
1974 },
1975 Aggregate {
1976 input: Box<LogicalPlan>,
1977 group_by: Vec<Expr>,
1978 aggregates: Vec<Expr>,
1979 },
1980 Distinct {
1981 input: Box<LogicalPlan>,
1982 },
1983 Window {
1984 input: Box<LogicalPlan>,
1985 window_exprs: Vec<Expr>,
1986 },
1987 Project {
1988 input: Box<LogicalPlan>,
1989 projections: Vec<(Expr, Option<String>)>,
1990 },
1991 CrossJoin {
1992 left: Box<LogicalPlan>,
1993 right: Box<LogicalPlan>,
1994 },
1995 Apply {
1996 input: Box<LogicalPlan>,
1997 subquery: Box<LogicalPlan>,
1998 input_filter: Option<Expr>,
1999 },
2000 RecursiveCTE {
2001 cte_name: String,
2002 initial: Box<LogicalPlan>,
2003 recursive: Box<LogicalPlan>,
2004 },
2005 ProcedureCall {
2006 procedure_name: String,
2007 arguments: Vec<Expr>,
2008 yield_items: Vec<(String, Option<String>)>,
2009 },
2010 SubqueryCall {
2011 input: Box<LogicalPlan>,
2012 subquery: Box<LogicalPlan>,
2013 },
2014 VectorKnn {
2015 label_id: u16,
2016 variable: String,
2017 property: String,
2018 query: Expr,
2019 k: usize,
2020 threshold: Option<f32>,
2021 },
2022 InvertedIndexLookup {
2023 label_id: u16,
2024 variable: String,
2025 property: String,
2026 terms: Expr,
2027 },
2028 ShortestPath {
2029 input: Box<LogicalPlan>,
2030 edge_type_ids: Vec<u32>,
2031 direction: Direction,
2032 source_variable: String,
2033 target_variable: String,
2034 target_label_id: u16,
2035 path_variable: String,
2036 min_hops: u32,
2038 max_hops: u32,
2040 },
2041 AllShortestPaths {
2043 input: Box<LogicalPlan>,
2044 edge_type_ids: Vec<u32>,
2045 direction: Direction,
2046 source_variable: String,
2047 target_variable: String,
2048 target_label_id: u16,
2049 path_variable: String,
2050 min_hops: u32,
2052 max_hops: u32,
2054 },
2055 QuantifiedPattern {
2056 input: Box<LogicalPlan>,
2057 pattern_plan: Box<LogicalPlan>, min_iterations: u32,
2059 max_iterations: u32,
2060 path_variable: Option<String>,
2061 start_variable: String, binding_variable: String, },
2064 CreateVectorIndex {
2066 config: VectorIndexConfig,
2067 if_not_exists: bool,
2068 },
2069 CreateFullTextIndex {
2070 config: FullTextIndexConfig,
2071 if_not_exists: bool,
2072 },
2073 CreateScalarIndex {
2074 config: ScalarIndexConfig,
2075 if_not_exists: bool,
2076 },
2077 CreateJsonFtsIndex {
2078 config: JsonFtsIndexConfig,
2079 if_not_exists: bool,
2080 },
2081 DropIndex {
2082 name: String,
2083 if_exists: bool,
2084 },
2085 ShowIndexes {
2086 filter: Option<String>,
2087 },
2088 Copy {
2089 target: String,
2090 source: String,
2091 is_export: bool,
2092 options: HashMap<String, Value>,
2093 },
2094 Backup {
2095 destination: String,
2096 options: HashMap<String, Value>,
2097 },
2098 Explain {
2099 plan: Box<LogicalPlan>,
2100 },
2101 ShowDatabase,
2103 ShowConfig,
2104 ShowStatistics,
2105 Vacuum,
2106 Checkpoint,
2107 CopyTo {
2108 label: String,
2109 path: String,
2110 format: String,
2111 options: HashMap<String, Value>,
2112 },
2113 CopyFrom {
2114 label: String,
2115 path: String,
2116 format: String,
2117 options: HashMap<String, Value>,
2118 },
2119 CreateLabel(CreateLabel),
2121 CreateEdgeType(CreateEdgeType),
2122 AlterLabel(AlterLabel),
2123 AlterEdgeType(AlterEdgeType),
2124 DropLabel(DropLabel),
2125 DropEdgeType(DropEdgeType),
2126 CreateConstraint(CreateConstraint),
2128 DropConstraint(DropConstraint),
2129 ShowConstraints(ShowConstraints),
2130 BindZeroLengthPath {
2133 input: Box<LogicalPlan>,
2134 node_variable: String,
2135 path_variable: String,
2136 },
2137 BindPath {
2140 input: Box<LogicalPlan>,
2141 node_variables: Vec<String>,
2142 edge_variables: Vec<String>,
2143 path_variable: String,
2144 },
2145
2146 LocyProgram {
2149 strata: Vec<super::planner_locy_types::LocyStratum>,
2150 commands: Vec<super::planner_locy_types::LocyCommand>,
2151 derived_scan_registry: Arc<super::df_graph::locy_fixpoint::DerivedScanRegistry>,
2152 max_iterations: usize,
2153 timeout: std::time::Duration,
2154 max_derived_bytes: usize,
2155 deterministic_best_by: bool,
2156 strict_probability_domain: bool,
2157 probability_epsilon: f64,
2158 exact_probability: bool,
2159 max_bdd_variables: usize,
2160 top_k_proofs: usize,
2161 semiring_kind: uni_locy::SemiringKind,
2166 classifier_registry: Arc<uni_locy::ClassifierRegistry>,
2169 classifier_cache: Option<Arc<uni_locy::ModelInvocationCache>>,
2173 classifier_provenance_store: Option<Arc<uni_locy::NeuralProvenanceStore>>,
2178 },
2179 LocyFold {
2181 input: Box<LogicalPlan>,
2182 key_columns: Vec<String>,
2183 fold_bindings: Vec<(String, Expr)>,
2184 strict_probability_domain: bool,
2185 probability_epsilon: f64,
2186 },
2187 LocyBestBy {
2189 input: Box<LogicalPlan>,
2190 key_columns: Vec<String>,
2191 criteria: Vec<(Expr, bool)>,
2193 },
2194 LocyPriority {
2196 input: Box<LogicalPlan>,
2197 key_columns: Vec<String>,
2198 },
2199 LocyDerivedScan {
2201 scan_index: usize,
2202 data: Arc<RwLock<Vec<RecordBatch>>>,
2203 schema: SchemaRef,
2204 },
2205 LocyProject {
2208 input: Box<LogicalPlan>,
2209 projections: Vec<(Expr, Option<String>)>,
2210 target_types: Vec<DataType>,
2212 },
2213 LocyModelInvoke {
2226 input: Box<LogicalPlan>,
2227 invocations: Vec<uni_locy::ModelInvocation>,
2228 classifier_registry: Arc<uni_locy::ClassifierRegistry>,
2229 classifier_cache: Option<Arc<uni_locy::ModelInvocationCache>>,
2230 classifier_provenance_store: Option<Arc<uni_locy::NeuralProvenanceStore>>,
2237 path_context_handles: std::collections::HashMap<
2245 String,
2246 super::df_graph::locy_model_invoke::PathContextHandle,
2247 >,
2248 },
2249}
2250
2251struct VectorSimilarityPredicate {
2253 variable: String,
2254 property: String,
2255 query: Expr,
2256 threshold: Option<f32>,
2257}
2258
2259struct VectorSimilarityExtraction {
2261 predicate: VectorSimilarityPredicate,
2263 residual: Option<Expr>,
2265}
2266
2267fn extract_vector_similarity(expr: &Expr) -> Option<VectorSimilarityExtraction> {
2274 match expr {
2275 Expr::BinaryOp { left, op, right } => {
2276 if matches!(op, BinaryOp::And) {
2278 if let Some(vs) = extract_simple_vector_similarity(left) {
2280 return Some(VectorSimilarityExtraction {
2281 predicate: vs,
2282 residual: Some(right.as_ref().clone()),
2283 });
2284 }
2285 if let Some(vs) = extract_simple_vector_similarity(right) {
2287 return Some(VectorSimilarityExtraction {
2288 predicate: vs,
2289 residual: Some(left.as_ref().clone()),
2290 });
2291 }
2292 if let Some(mut extraction) = extract_vector_similarity(left) {
2294 extraction.residual = Some(combine_with_and(
2295 extraction.residual,
2296 right.as_ref().clone(),
2297 ));
2298 return Some(extraction);
2299 }
2300 if let Some(mut extraction) = extract_vector_similarity(right) {
2301 extraction.residual =
2302 Some(combine_with_and(extraction.residual, left.as_ref().clone()));
2303 return Some(extraction);
2304 }
2305 return None;
2306 }
2307
2308 if let Some(vs) = extract_simple_vector_similarity(expr) {
2310 return Some(VectorSimilarityExtraction {
2311 predicate: vs,
2312 residual: None,
2313 });
2314 }
2315 None
2316 }
2317 _ => None,
2318 }
2319}
2320
2321fn combine_with_and(opt_expr: Option<Expr>, other: Expr) -> Expr {
2323 match opt_expr {
2324 Some(e) => Expr::BinaryOp {
2325 left: Box::new(e),
2326 op: BinaryOp::And,
2327 right: Box::new(other),
2328 },
2329 None => other,
2330 }
2331}
2332
2333fn extract_simple_vector_similarity(expr: &Expr) -> Option<VectorSimilarityPredicate> {
2335 match expr {
2336 Expr::BinaryOp { left, op, right } => {
2337 if matches!(op, BinaryOp::Gt | BinaryOp::GtEq)
2339 && let (Some(vs), Some(thresh)) = (
2340 extract_vector_similarity_call(left),
2341 extract_float_literal(right),
2342 )
2343 {
2344 return Some(VectorSimilarityPredicate {
2345 variable: vs.0,
2346 property: vs.1,
2347 query: vs.2,
2348 threshold: Some(thresh),
2349 });
2350 }
2351 if matches!(op, BinaryOp::Lt | BinaryOp::LtEq)
2353 && let (Some(thresh), Some(vs)) = (
2354 extract_float_literal(left),
2355 extract_vector_similarity_call(right),
2356 )
2357 {
2358 return Some(VectorSimilarityPredicate {
2359 variable: vs.0,
2360 property: vs.1,
2361 query: vs.2,
2362 threshold: Some(thresh),
2363 });
2364 }
2365 if matches!(op, BinaryOp::ApproxEq)
2367 && let Expr::Property(var_expr, prop) = left.as_ref()
2368 && let Expr::Variable(var) = var_expr.as_ref()
2369 {
2370 return Some(VectorSimilarityPredicate {
2371 variable: var.clone(),
2372 property: prop.clone(),
2373 query: right.as_ref().clone(),
2374 threshold: None,
2375 });
2376 }
2377 None
2378 }
2379 _ => None,
2380 }
2381}
2382
2383fn extract_vector_similarity_call(expr: &Expr) -> Option<(String, String, Expr)> {
2385 if let Expr::FunctionCall { name, args, .. } = expr
2386 && name.eq_ignore_ascii_case("vector_similarity")
2387 && args.len() == 2
2388 {
2389 if let Expr::Property(var_expr, prop) = &args[0]
2391 && let Expr::Variable(var) = var_expr.as_ref()
2392 {
2393 return Some((var.clone(), prop.clone(), args[1].clone()));
2395 }
2396 }
2397 None
2398}
2399
2400fn extract_float_literal(expr: &Expr) -> Option<f32> {
2402 match expr {
2403 Expr::Literal(CypherLiteral::Integer(i)) => Some(*i as f32),
2404 Expr::Literal(CypherLiteral::Float(f)) => Some(*f as f32),
2405 _ => None,
2406 }
2407}
2408
2409#[derive(Debug)]
2415pub struct QueryPlanner {
2416 schema: Arc<Schema>,
2417 gen_expr_cache: HashMap<(String, String), Expr>,
2419 anon_counter: std::sync::atomic::AtomicUsize,
2421 params: HashMap<String, uni_common::Value>,
2423 plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
2426 replacement_scans_enabled: bool,
2428}
2429
2430struct TraverseParams<'a> {
2431 rel: &'a RelationshipPattern,
2432 target_node: &'a NodePattern,
2433 optional: bool,
2434 path_variable: Option<String>,
2435 optional_pattern_vars: HashSet<String>,
2438}
2439
2440impl QueryPlanner {
2441 pub fn new(schema: Arc<Schema>) -> Self {
2446 let mut gen_expr_cache = HashMap::new();
2448 for (label, props) in &schema.properties {
2449 for (gen_col, meta) in props {
2450 if let Some(expr_str) = &meta.generation_expression
2451 && let Ok(parsed_expr) = uni_cypher::parse_expression(expr_str)
2452 {
2453 gen_expr_cache.insert((label.clone(), gen_col.clone()), parsed_expr);
2454 }
2455 }
2456 }
2457 Self {
2458 schema,
2459 gen_expr_cache,
2460 anon_counter: std::sync::atomic::AtomicUsize::new(0),
2461 params: HashMap::new(),
2462 plugin_registry: None,
2463 replacement_scans_enabled: false,
2464 }
2465 }
2466
2467 pub fn with_params(mut self, params: HashMap<String, uni_common::Value>) -> Self {
2469 self.params = params;
2470 self
2471 }
2472
2473 #[must_use]
2478 pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
2479 self.plugin_registry = Some(registry);
2480 self
2481 }
2482
2483 #[must_use]
2486 pub fn with_replacement_scans(mut self, enabled: bool) -> Self {
2487 self.replacement_scans_enabled = enabled;
2488 self
2489 }
2490
2491 fn allocate_virtual_label(
2504 &self,
2505 name: &str,
2506 ) -> Result<Option<(u16, Arc<dyn uni_plugin::traits::catalog::CatalogTable>)>> {
2507 let Some(registry) = self.plugin_registry.as_ref() else {
2508 return Ok(None);
2509 };
2510 let mut claimed: Option<Arc<dyn uni_plugin::traits::catalog::CatalogTable>> = None;
2512 for cat in registry.catalogs() {
2513 if let Some(t) = cat.resolve_label(name) {
2514 claimed = Some(t);
2515 break;
2516 }
2517 }
2518 if claimed.is_none() {
2521 use uni_plugin::traits::catalog::{Replacement, ReplacementRequest};
2522 if let Some(Replacement::CatalogTable(t)) =
2523 self.consult_replacement_scan(ReplacementRequest::Label(name))
2524 {
2525 claimed = Some(t);
2526 }
2527 }
2528 let Some(table) = claimed else {
2529 return Ok(None);
2530 };
2531 let id = registry
2532 .register_virtual_label(name, Arc::clone(&table))
2533 .map_err(|e| anyhow!("virtual label registration failed for `{name}`: {e}"))?;
2534 Ok(Some((id, table)))
2535 }
2536
2537 fn reject_virtual_label_writes(&self, labels: &[String], op: &str) -> Result<()> {
2545 let Some(registry) = self.plugin_registry.as_ref() else {
2546 return Ok(());
2547 };
2548 for label in labels {
2549 if registry.virtual_label_by_name(label).is_some() {
2550 return Err(anyhow!(
2551 "Cannot {op} on virtual (catalog-resolved) label `{label}` — virtual \
2552 labels are read-only; write back via the originating catalog \
2553 instead"
2554 ));
2555 }
2556 }
2557 Ok(())
2558 }
2559
2560 fn allocate_virtual_edge_type(
2562 &self,
2563 name: &str,
2564 ) -> Result<Option<(u32, Arc<dyn uni_plugin::traits::catalog::CatalogTable>)>> {
2565 let Some(registry) = self.plugin_registry.as_ref() else {
2566 return Ok(None);
2567 };
2568 let mut claimed: Option<Arc<dyn uni_plugin::traits::catalog::CatalogTable>> = None;
2569 for cat in registry.catalogs() {
2570 if let Some(t) = cat.resolve_edge_type(name) {
2571 claimed = Some(t);
2572 break;
2573 }
2574 }
2575 let Some(table) = claimed else {
2576 return Ok(None);
2577 };
2578 let id = registry
2579 .register_virtual_edge_type(name, Arc::clone(&table))
2580 .map_err(|e| anyhow!("virtual edge-type registration failed for `{name}`: {e}"))?;
2581 Ok(Some((id, table)))
2582 }
2583
2584 pub(crate) fn consult_replacement_scan(
2590 &self,
2591 request: uni_plugin::traits::catalog::ReplacementRequest<'_>,
2592 ) -> Option<uni_plugin::traits::catalog::Replacement> {
2593 if !self.replacement_scans_enabled {
2594 return None;
2595 }
2596 let registry = self.plugin_registry.as_ref()?;
2597 for r in registry.replacement_scans().iter() {
2598 if let Some(replacement) = r.replace(&request) {
2599 tracing::debug!(
2600 target: "uni.plugin.registry",
2601 ?request,
2602 ?replacement,
2603 "identifier resolved via ReplacementScanProvider"
2604 );
2605 return Some(replacement);
2606 }
2607 }
2608 None
2609 }
2610
2611 fn procedure_resolves(&self, user_name: &str) -> bool {
2618 let Some(registry) = self.plugin_registry.as_ref() else {
2619 return false;
2620 };
2621 if let Some((ns, local)) = user_name.split_once('.')
2622 && registry
2623 .procedure(&uni_plugin::QName::new(ns, local))
2624 .is_some()
2625 {
2626 return true;
2627 }
2628 let stripped = user_name.strip_prefix("uni.").unwrap_or(user_name);
2629 for plugin_id in ["uni", "builtin", "apoc-core", "custom"] {
2630 if registry
2631 .procedure(&uni_plugin::QName::new(plugin_id, stripped))
2632 .is_some()
2633 {
2634 return true;
2635 }
2636 }
2637 false
2638 }
2639
2640 fn qname_from_user(name: &str) -> uni_plugin::QName {
2648 uni_plugin::QName::parse(name).unwrap_or_else(|_| uni_plugin::QName::new("user", name))
2649 }
2650
2651 fn rewrite_function_calls_in_query(
2662 &self,
2663 query: uni_cypher::ast::Query,
2664 ) -> Result<uni_cypher::ast::Query> {
2665 if !self.replacement_scans_enabled || self.plugin_registry.is_none() {
2666 return Ok(query);
2667 }
2668 let mut rename = |name: &str| -> Result<Option<String>> {
2669 let qname = Self::qname_from_user(name);
2670 use uni_plugin::traits::catalog::{Replacement, ReplacementRequest};
2671 match self.consult_replacement_scan(ReplacementRequest::Function(&qname)) {
2672 Some(Replacement::Function(new_qname)) => {
2673 let rewritten = match new_qname.namespace() {
2683 "builtin" | "user" => new_qname.local().to_string(),
2684 _ => new_qname.to_string(),
2685 };
2686 tracing::debug!(
2687 target: "uni.plugin.registry",
2688 from = %name,
2689 to = %rewritten,
2690 "function call rerouted via ReplacementScanProvider"
2691 );
2692 Ok(Some(rewritten))
2693 }
2694 Some(other) => Err(anyhow!(
2695 "ReplacementScanProvider returned wrong variant for Function \
2696 request `{}`: expected `Function`, got {:?}",
2697 name,
2698 other
2699 )),
2700 None => Ok(None),
2701 }
2702 };
2703 crate::query::rewrite::function_rename::rewrite_function_calls_in_query(query, &mut rename)
2704 }
2705
2706 pub fn plan(&self, query: Query) -> Result<LogicalPlan> {
2708 self.plan_with_scope(query, Vec::new())
2709 }
2710
2711 pub fn plan_with_scope(&self, query: Query, vars: Vec<String>) -> Result<LogicalPlan> {
2716 let rewritten_query = crate::query::rewrite::rewrite_query(query)?;
2718 let rewritten_query = self.rewrite_function_calls_in_query(rewritten_query)?;
2726 if Self::has_mixed_union_modes(&rewritten_query) {
2727 return Err(anyhow!(
2728 "SyntaxError: InvalidClauseComposition - Cannot mix UNION and UNION ALL in the same query"
2729 ));
2730 }
2731
2732 match rewritten_query {
2733 Query::Single(stmt) => self.plan_single(stmt, vars),
2734 Query::Union { left, right, all } => {
2735 let l = self.plan_with_scope(*left, vars.clone())?;
2736 let r = self.plan_with_scope(*right, vars)?;
2737
2738 let left_cols = Self::extract_projection_columns(&l);
2740 let right_cols = Self::extract_projection_columns(&r);
2741
2742 if left_cols != right_cols {
2743 return Err(anyhow!(
2744 "SyntaxError: DifferentColumnsInUnion - UNION queries must have same column names"
2745 ));
2746 }
2747
2748 Ok(LogicalPlan::Union {
2749 left: Box::new(l),
2750 right: Box::new(r),
2751 all,
2752 })
2753 }
2754 Query::Schema(cmd) => self.plan_schema_command(*cmd),
2755 Query::Explain(inner) => {
2756 let inner_plan = self.plan_with_scope(*inner, vars)?;
2757 Ok(LogicalPlan::Explain {
2758 plan: Box::new(inner_plan),
2759 })
2760 }
2761 Query::TimeTravel { .. } => {
2762 unreachable!("TimeTravel should be resolved at API layer before planning")
2763 }
2764 }
2765 }
2766
2767 fn collect_union_modes(query: &Query, out: &mut HashSet<bool>) {
2768 match query {
2769 Query::Union { left, right, all } => {
2770 out.insert(*all);
2771 Self::collect_union_modes(left, out);
2772 Self::collect_union_modes(right, out);
2773 }
2774 Query::Explain(inner) => Self::collect_union_modes(inner, out),
2775 Query::TimeTravel { query, .. } => Self::collect_union_modes(query, out),
2776 Query::Single(_) | Query::Schema(_) => {}
2777 }
2778 }
2779
2780 fn has_mixed_union_modes(query: &Query) -> bool {
2781 let mut modes = HashSet::new();
2782 Self::collect_union_modes(query, &mut modes);
2783 modes.len() > 1
2784 }
2785
2786 fn next_anon_var(&self) -> String {
2787 let id = self
2788 .anon_counter
2789 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2790 format!("_anon_{}", id)
2791 }
2792
2793 fn extract_projection_columns(plan: &LogicalPlan) -> Vec<String> {
2796 match plan {
2797 LogicalPlan::Project { projections, .. } => projections
2798 .iter()
2799 .map(|(expr, alias)| alias.clone().unwrap_or_else(|| expr.to_string_repr()))
2800 .collect(),
2801 LogicalPlan::Limit { input, .. }
2802 | LogicalPlan::Sort { input, .. }
2803 | LogicalPlan::Distinct { input, .. }
2804 | LogicalPlan::Filter { input, .. } => Self::extract_projection_columns(input),
2805 LogicalPlan::Union { left, right, .. } => {
2806 let left_cols = Self::extract_projection_columns(left);
2807 if left_cols.is_empty() {
2808 Self::extract_projection_columns(right)
2809 } else {
2810 left_cols
2811 }
2812 }
2813 LogicalPlan::Aggregate {
2814 group_by,
2815 aggregates,
2816 ..
2817 } => {
2818 let mut cols: Vec<String> = group_by.iter().map(|e| e.to_string_repr()).collect();
2819 cols.extend(aggregates.iter().map(|e| e.to_string_repr()));
2820 cols
2821 }
2822 _ => Vec::new(),
2823 }
2824 }
2825
2826 fn plan_return_clause(
2827 &self,
2828 return_clause: &ReturnClause,
2829 plan: LogicalPlan,
2830 vars_in_scope: &[VariableInfo],
2831 ) -> Result<LogicalPlan> {
2832 let mut plan = plan;
2833 let mut group_by = Vec::new();
2834 let mut aggregates = Vec::new();
2835 let mut compound_agg_exprs: Vec<Expr> = Vec::new();
2836 let mut has_agg = false;
2837 let mut projections = Vec::new();
2838 let mut projected_aggregate_reprs: HashSet<String> = HashSet::new();
2839 let mut projected_simple_reprs: HashSet<String> = HashSet::new();
2840 let mut projected_aliases: HashSet<String> = HashSet::new();
2841
2842 for item in &return_clause.items {
2843 match item {
2844 ReturnItem::All => {
2845 let user_vars: Vec<_> = vars_in_scope
2848 .iter()
2849 .filter(|v| !v.name.starts_with("_anon_"))
2850 .collect();
2851 if user_vars.is_empty() {
2852 return Err(anyhow!(
2853 "SyntaxError: NoVariablesInScope - RETURN * is not allowed when there are no variables in scope"
2854 ));
2855 }
2856 for v in user_vars {
2857 projections.push((Expr::Variable(v.name.clone()), Some(v.name.clone())));
2858 if !group_by.contains(&Expr::Variable(v.name.clone())) {
2859 group_by.push(Expr::Variable(v.name.clone()));
2860 }
2861 projected_aliases.insert(v.name.clone());
2862 projected_simple_reprs.insert(v.name.clone());
2863 }
2864 }
2865 ReturnItem::Expr {
2866 expr,
2867 alias,
2868 source_text,
2869 } => {
2870 if matches!(expr, Expr::Wildcard) {
2871 for v in vars_in_scope {
2872 projections
2873 .push((Expr::Variable(v.name.clone()), Some(v.name.clone())));
2874 if !group_by.contains(&Expr::Variable(v.name.clone())) {
2875 group_by.push(Expr::Variable(v.name.clone()));
2876 }
2877 projected_aliases.insert(v.name.clone());
2878 projected_simple_reprs.insert(v.name.clone());
2879 }
2880 } else {
2881 validate_expression_variables(expr, vars_in_scope)?;
2883 validate_expression(expr, vars_in_scope)?;
2885 if contains_pattern_predicate(expr) {
2887 return Err(anyhow!(
2888 "SyntaxError: UnexpectedSyntax - Pattern predicates are not allowed in RETURN"
2889 ));
2890 }
2891
2892 let effective_alias = alias.clone().or_else(|| source_text.clone());
2894 projections.push((expr.clone(), effective_alias));
2895 if expr.is_aggregate() && !is_compound_aggregate(expr) {
2896 has_agg = true;
2898 aggregates.push(expr.clone());
2899 projected_aggregate_reprs.insert(expr.to_string_repr());
2900 } else if !is_window_function(expr)
2901 && (expr.is_aggregate() || contains_aggregate_recursive(expr))
2902 {
2903 has_agg = true;
2906 compound_agg_exprs.push(expr.clone());
2907 for inner in extract_inner_aggregates(expr) {
2908 let repr = inner.to_string_repr();
2909 if !projected_aggregate_reprs.contains(&repr) {
2910 aggregates.push(inner);
2911 projected_aggregate_reprs.insert(repr);
2912 }
2913 }
2914 } else if !group_by.contains(expr) {
2915 group_by.push(expr.clone());
2916 if matches!(expr, Expr::Variable(_) | Expr::Property(_, _)) {
2917 projected_simple_reprs.insert(expr.to_string_repr());
2918 }
2919 }
2920
2921 if let Some(a) = alias {
2922 if projected_aliases.contains(a) {
2923 return Err(anyhow!(
2924 "SyntaxError: ColumnNameConflict - Duplicate column name '{}' in RETURN",
2925 a
2926 ));
2927 }
2928 projected_aliases.insert(a.clone());
2929 } else if let Expr::Variable(v) = expr {
2930 if projected_aliases.contains(v) {
2931 return Err(anyhow!(
2932 "SyntaxError: ColumnNameConflict - Duplicate column name '{}' in RETURN",
2933 v
2934 ));
2935 }
2936 projected_aliases.insert(v.clone());
2937 }
2938 }
2939 }
2940 }
2941 }
2942
2943 if has_agg {
2946 let group_by_reprs: HashSet<String> =
2947 group_by.iter().map(|e| e.to_string_repr()).collect();
2948 for expr in &compound_agg_exprs {
2949 let mut refs = Vec::new();
2950 collect_non_aggregate_refs(expr, false, &mut refs);
2951 for r in &refs {
2952 let is_covered = match r {
2953 NonAggregateRef::Var(v) => group_by_reprs.contains(v),
2954 NonAggregateRef::Property { repr, .. } => group_by_reprs.contains(repr),
2955 };
2956 if !is_covered {
2957 return Err(anyhow!(
2958 "SyntaxError: AmbiguousAggregationExpression - Expression mixes aggregation with non-grouped reference"
2959 ));
2960 }
2961 }
2962 }
2963 }
2964
2965 if has_agg {
2966 plan = LogicalPlan::Aggregate {
2967 input: Box::new(plan),
2968 group_by,
2969 aggregates,
2970 };
2971 }
2972
2973 let mut window_exprs = Vec::new();
2974 for (expr, _) in &projections {
2975 Self::collect_window_functions(expr, &mut window_exprs);
2976 }
2977
2978 if let Some(order_by) = &return_clause.order_by {
2979 for item in order_by {
2980 Self::collect_window_functions(&item.expr, &mut window_exprs);
2981 }
2982 }
2983
2984 let has_window_exprs = !window_exprs.is_empty();
2985
2986 if has_window_exprs {
2987 let mut props_needed_for_window: Vec<Expr> = Vec::new();
2991 for window_expr in &window_exprs {
2992 Self::collect_properties_from_expr(window_expr, &mut props_needed_for_window);
2993 }
2994
2995 let non_window_projections: Vec<_> = projections
2998 .iter()
2999 .filter_map(|(expr, alias)| {
3000 let keep = if let Expr::FunctionCall { window_spec, .. } = expr {
3002 window_spec.is_none()
3003 } else {
3004 true
3005 };
3006
3007 if keep {
3008 let new_alias = if matches!(expr, Expr::Property(..)) {
3010 Some(expr.to_string_repr())
3011 } else {
3012 alias.clone()
3013 };
3014 Some((expr.clone(), new_alias))
3015 } else {
3016 None
3017 }
3018 })
3019 .collect();
3020
3021 if !non_window_projections.is_empty() || !props_needed_for_window.is_empty() {
3022 let mut intermediate_projections = non_window_projections;
3023 for prop in &props_needed_for_window {
3026 if !intermediate_projections
3027 .iter()
3028 .any(|(e, _)| e.to_string_repr() == prop.to_string_repr())
3029 {
3030 let qualified_name = prop.to_string_repr();
3031 intermediate_projections.push((prop.clone(), Some(qualified_name)));
3032 }
3033 }
3034
3035 if !intermediate_projections.is_empty() {
3036 plan = LogicalPlan::Project {
3037 input: Box::new(plan),
3038 projections: intermediate_projections,
3039 };
3040 }
3041 }
3042
3043 let transformed_window_exprs: Vec<Expr> = window_exprs
3046 .into_iter()
3047 .map(Self::transform_window_expr_properties)
3048 .collect();
3049
3050 plan = LogicalPlan::Window {
3051 input: Box::new(plan),
3052 window_exprs: transformed_window_exprs,
3053 };
3054 }
3055
3056 if let Some(order_by) = &return_clause.order_by {
3057 let alias_exprs: HashMap<String, Expr> = projections
3058 .iter()
3059 .filter_map(|(expr, alias)| {
3060 alias.as_ref().map(|a| {
3061 let rewritten = if has_agg && !has_window_exprs {
3065 if expr.is_aggregate() && !is_compound_aggregate(expr) {
3066 Expr::Variable(aggregate_column_name(expr))
3067 } else if is_compound_aggregate(expr)
3068 || (!expr.is_aggregate() && contains_aggregate_recursive(expr))
3069 {
3070 replace_aggregates_with_columns(expr)
3071 } else {
3072 Expr::Variable(expr.to_string_repr())
3073 }
3074 } else {
3075 expr.clone()
3076 };
3077 (a.clone(), rewritten)
3078 })
3079 })
3080 .collect();
3081
3082 let order_by_scope: Vec<VariableInfo> = if return_clause.distinct {
3085 let mut scope = Vec::new();
3088 for (expr, alias) in &projections {
3089 if let Some(a) = alias
3090 && !is_var_in_scope(&scope, a)
3091 {
3092 scope.push(VariableInfo::new(a.clone(), VariableType::Scalar));
3093 }
3094 if let Expr::Variable(v) = expr
3095 && !is_var_in_scope(&scope, v)
3096 {
3097 scope.push(VariableInfo::new(v.clone(), VariableType::Scalar));
3098 }
3099 }
3100 scope
3101 } else {
3102 let mut scope = vars_in_scope.to_vec();
3103 for (expr, alias) in &projections {
3104 if let Some(a) = alias
3105 && !is_var_in_scope(&scope, a)
3106 {
3107 scope.push(VariableInfo::new(a.clone(), VariableType::Scalar));
3108 } else if let Expr::Variable(v) = expr
3109 && !is_var_in_scope(&scope, v)
3110 {
3111 scope.push(VariableInfo::new(v.clone(), VariableType::Scalar));
3112 }
3113 }
3114 scope
3115 };
3116 for item in order_by {
3118 let matches_projected_expr = return_clause.distinct
3121 && projections
3122 .iter()
3123 .any(|(expr, _)| expr.to_string_repr() == item.expr.to_string_repr());
3124 if !matches_projected_expr {
3125 validate_expression_variables(&item.expr, &order_by_scope)?;
3126 validate_expression(&item.expr, &order_by_scope)?;
3127 }
3128 let has_aggregate_in_item = contains_aggregate_recursive(&item.expr);
3129 if has_aggregate_in_item && !has_agg {
3130 return Err(anyhow!(
3131 "SyntaxError: InvalidAggregation - Aggregation functions not allowed in ORDER BY after RETURN"
3132 ));
3133 }
3134 if has_agg && has_aggregate_in_item {
3135 validate_with_order_by_aggregate_item(
3136 &item.expr,
3137 &projected_aggregate_reprs,
3138 &projected_simple_reprs,
3139 &projected_aliases,
3140 )?;
3141 }
3142 }
3143 let rewritten_order_by: Vec<SortItem> = order_by
3144 .iter()
3145 .map(|item| SortItem {
3146 expr: {
3147 let mut rewritten =
3148 rewrite_order_by_expr_with_aliases(&item.expr, &alias_exprs);
3149 if has_agg && !has_window_exprs {
3150 rewritten = replace_aggregates_with_columns(&rewritten);
3151 }
3152 rewritten
3153 },
3154 ascending: item.ascending,
3155 })
3156 .collect();
3157 plan = LogicalPlan::Sort {
3158 input: Box::new(plan),
3159 order_by: rewritten_order_by,
3160 };
3161 }
3162
3163 if return_clause.skip.is_some() || return_clause.limit.is_some() {
3164 let skip = return_clause
3165 .skip
3166 .as_ref()
3167 .map(|e| parse_non_negative_integer(e, "SKIP", &self.params))
3168 .transpose()?
3169 .flatten();
3170 let fetch = return_clause
3171 .limit
3172 .as_ref()
3173 .map(|e| parse_non_negative_integer(e, "LIMIT", &self.params))
3174 .transpose()?
3175 .flatten();
3176
3177 plan = LogicalPlan::Limit {
3178 input: Box::new(plan),
3179 skip,
3180 fetch,
3181 };
3182 }
3183
3184 if !projections.is_empty() {
3185 let final_projections = if has_agg || has_window_exprs {
3188 projections
3189 .into_iter()
3190 .map(|(expr, alias)| {
3191 if expr.is_aggregate() && !is_compound_aggregate(&expr) && !has_window_exprs
3193 {
3194 let col_name = aggregate_column_name(&expr);
3196 (Expr::Variable(col_name), alias)
3197 } else if !has_window_exprs
3198 && (is_compound_aggregate(&expr)
3199 || (!expr.is_aggregate() && contains_aggregate_recursive(&expr)))
3200 {
3201 (replace_aggregates_with_columns(&expr), alias)
3204 }
3205 else if has_agg
3209 && !has_window_exprs
3210 && !matches!(expr, Expr::Variable(_) | Expr::Property(_, _))
3211 {
3212 (Expr::Variable(expr.to_string_repr()), alias)
3213 }
3214 else if let Expr::FunctionCall {
3216 window_spec: Some(_),
3217 ..
3218 } = &expr
3219 {
3220 let window_col_name = expr.to_string_repr();
3223 (Expr::Variable(window_col_name), alias)
3225 } else {
3226 (expr, alias)
3227 }
3228 })
3229 .collect()
3230 } else {
3231 projections
3232 };
3233
3234 plan = LogicalPlan::Project {
3235 input: Box::new(plan),
3236 projections: final_projections,
3237 };
3238 }
3239
3240 if return_clause.distinct {
3241 plan = LogicalPlan::Distinct {
3242 input: Box::new(plan),
3243 };
3244 }
3245
3246 Ok(plan)
3247 }
3248
3249 fn plan_single(&self, query: Statement, initial_vars: Vec<String>) -> Result<LogicalPlan> {
3250 let typed_vars: Vec<VariableInfo> = initial_vars
3251 .into_iter()
3252 .map(|name| VariableInfo::new(name, VariableType::Imported))
3253 .collect();
3254 self.plan_single_typed(query, typed_vars)
3255 }
3256
3257 fn rewrite_and_plan_typed(
3263 &self,
3264 query: Query,
3265 typed_vars: &[VariableInfo],
3266 ) -> Result<LogicalPlan> {
3267 let rewritten = crate::query::rewrite::rewrite_query(query)?;
3268 match rewritten {
3269 Query::Single(stmt) => self.plan_single_typed(stmt, typed_vars.to_vec()),
3270 other => self.plan_with_scope(other, vars_to_strings(typed_vars)),
3271 }
3272 }
3273
3274 fn plan_single_typed(
3275 &self,
3276 query: Statement,
3277 initial_vars: Vec<VariableInfo>,
3278 ) -> Result<LogicalPlan> {
3279 let mut plan = LogicalPlan::Empty;
3280
3281 if !initial_vars.is_empty() {
3282 let projections = initial_vars
3286 .iter()
3287 .map(|v| (Expr::Parameter(v.name.clone()), Some(v.name.clone())))
3288 .collect();
3289 plan = LogicalPlan::Project {
3290 input: Box::new(plan),
3291 projections,
3292 };
3293 }
3294
3295 let mut vars_in_scope: Vec<VariableInfo> = initial_vars;
3296 let mut create_introduced_vars: HashSet<String> = HashSet::new();
3300 let mut deleted_vars: HashSet<String> = HashSet::new();
3303
3304 let clause_count = query.clauses.len();
3305 for (clause_idx, clause) in query.clauses.into_iter().enumerate() {
3306 match clause {
3307 Clause::Match(match_clause) => {
3308 plan = self.plan_match_clause(&match_clause, plan, &mut vars_in_scope)?;
3309 }
3310 Clause::Unwind(unwind) => {
3311 plan = LogicalPlan::Unwind {
3312 input: Box::new(plan),
3313 expr: unwind.expr.clone(),
3314 variable: unwind.variable.clone(),
3315 };
3316 let unwind_out_type = infer_unwind_output_type(&unwind.expr, &vars_in_scope);
3317 add_var_to_scope(&mut vars_in_scope, &unwind.variable, unwind_out_type)?;
3318 }
3319 Clause::Call(call_clause) => {
3320 match &call_clause.kind {
3321 CallKind::Procedure {
3322 procedure,
3323 arguments,
3324 } => {
3325 for arg in arguments {
3327 if contains_aggregate_recursive(arg) {
3328 return Err(anyhow!(
3329 "SyntaxError: InvalidAggregation - Aggregation expressions are not allowed as arguments to procedure calls"
3330 ));
3331 }
3332 }
3333
3334 let has_yield_star = call_clause.yield_items.len() == 1
3335 && call_clause.yield_items[0].name == "*"
3336 && call_clause.yield_items[0].alias.is_none();
3337 if has_yield_star && clause_idx + 1 < clause_count {
3338 return Err(anyhow!(
3339 "SyntaxError: UnexpectedSyntax - YIELD * is only allowed in standalone procedure calls"
3340 ));
3341 }
3342
3343 let mut yield_names = Vec::new();
3345 for item in &call_clause.yield_items {
3346 if item.name == "*" {
3347 continue;
3348 }
3349 let output_name = item.alias.as_ref().unwrap_or(&item.name);
3350 if yield_names.contains(output_name) {
3351 return Err(anyhow!(
3352 "SyntaxError: VariableAlreadyBound - Variable '{}' already appears in YIELD clause",
3353 output_name
3354 ));
3355 }
3356 if clause_idx > 0
3358 && vars_in_scope.iter().any(|v| v.name == *output_name)
3359 {
3360 return Err(anyhow!(
3361 "SyntaxError: VariableAlreadyBound - Variable '{}' already declared in outer scope",
3362 output_name
3363 ));
3364 }
3365 yield_names.push(output_name.clone());
3366 }
3367
3368 let mut yields = Vec::new();
3369 for item in &call_clause.yield_items {
3370 if item.name == "*" {
3371 continue;
3372 }
3373 yields.push((item.name.clone(), item.alias.clone()));
3374 let var_name = item.alias.as_ref().unwrap_or(&item.name);
3375 add_var_to_scope(
3378 &mut vars_in_scope,
3379 var_name,
3380 VariableType::Imported,
3381 )?;
3382 }
3383 let procedure_name = if self.replacement_scans_enabled
3392 && !self.procedure_resolves(procedure)
3393 {
3394 use uni_plugin::traits::catalog::{
3395 Replacement, ReplacementRequest,
3396 };
3397 let qname = Self::qname_from_user(procedure);
3398 match self
3399 .consult_replacement_scan(ReplacementRequest::Procedure(&qname))
3400 {
3401 Some(Replacement::Procedure(new_qname)) => {
3402 let rewritten = new_qname.to_string();
3403 if !self.procedure_resolves(&rewritten) {
3404 return Err(anyhow!(
3405 "ReplacementScanProvider rerouted procedure \
3406 `{}` to `{}`, which also did not resolve",
3407 procedure,
3408 rewritten
3409 ));
3410 }
3411 tracing::debug!(
3412 target: "uni.plugin.registry",
3413 from = %procedure,
3414 to = %rewritten,
3415 "procedure rerouted via ReplacementScanProvider"
3416 );
3417 rewritten
3418 }
3419 Some(other) => {
3420 return Err(anyhow!(
3421 "ReplacementScanProvider returned wrong variant \
3422 for Procedure request `{}`: expected \
3423 `Procedure`, got {:?}",
3424 procedure,
3425 other
3426 ));
3427 }
3428 None => procedure.clone(),
3429 }
3430 } else {
3431 procedure.clone()
3432 };
3433 let proc_plan = LogicalPlan::ProcedureCall {
3434 procedure_name,
3435 arguments: arguments.clone(),
3436 yield_items: yields.clone(),
3437 };
3438
3439 if matches!(plan, LogicalPlan::Empty) {
3440 plan = proc_plan;
3442 } else if yields.is_empty() {
3443 } else {
3446 plan = LogicalPlan::Apply {
3448 input: Box::new(plan),
3449 subquery: Box::new(proc_plan),
3450 input_filter: None,
3451 };
3452 }
3453 }
3454 CallKind::Subquery(query) => {
3455 let subquery_plan =
3456 self.rewrite_and_plan_typed(*query.clone(), &vars_in_scope)?;
3457
3458 let subquery_vars = Self::collect_plan_variables(&subquery_plan);
3460
3461 for var in subquery_vars {
3463 if !is_var_in_scope(&vars_in_scope, &var) {
3464 add_var_to_scope(
3465 &mut vars_in_scope,
3466 &var,
3467 VariableType::Scalar,
3468 )?;
3469 }
3470 }
3471
3472 plan = LogicalPlan::SubqueryCall {
3473 input: Box::new(plan),
3474 subquery: Box::new(subquery_plan),
3475 };
3476 }
3477 }
3478 }
3479 Clause::Merge(merge_clause) => {
3480 validate_merge_clause(&merge_clause, &vars_in_scope)?;
3481 let merge_labels = collect_pattern_labels(&merge_clause.pattern);
3484 self.reject_virtual_label_writes(&merge_labels, "MERGE")?;
3485
3486 plan = LogicalPlan::Merge {
3487 input: Box::new(plan),
3488 pattern: merge_clause.pattern.clone(),
3489 on_match: Some(SetClause {
3490 items: merge_clause.on_match.clone(),
3491 }),
3492 on_create: Some(SetClause {
3493 items: merge_clause.on_create.clone(),
3494 }),
3495 };
3496
3497 for path in &merge_clause.pattern.paths {
3498 if let Some(path_var) = &path.variable
3499 && !path_var.is_empty()
3500 && !is_var_in_scope(&vars_in_scope, path_var)
3501 {
3502 add_var_to_scope(&mut vars_in_scope, path_var, VariableType::Path)?;
3503 }
3504 for element in &path.elements {
3505 if let PatternElement::Node(n) = element {
3506 if let Some(v) = &n.variable
3507 && !is_var_in_scope(&vars_in_scope, v)
3508 {
3509 add_var_to_scope(&mut vars_in_scope, v, VariableType::Node)?;
3510 }
3511 } else if let PatternElement::Relationship(r) = element
3512 && let Some(v) = &r.variable
3513 && !is_var_in_scope(&vars_in_scope, v)
3514 {
3515 add_var_to_scope(&mut vars_in_scope, v, VariableType::Edge)?;
3516 }
3517 }
3518 }
3519 }
3520 Clause::Create(create_clause) => {
3521 let create_labels = collect_pattern_labels(&create_clause.pattern);
3524 self.reject_virtual_label_writes(&create_labels, "CREATE")?;
3525 let mut create_vars: Vec<&str> = Vec::new();
3532 for path in &create_clause.pattern.paths {
3533 let is_standalone_node = path.elements.len() == 1;
3534 for element in &path.elements {
3535 match element {
3536 PatternElement::Node(n) => {
3537 validate_property_variables(
3538 &n.properties,
3539 &vars_in_scope,
3540 &create_vars,
3541 )?;
3542
3543 if let Some(v) = n.variable.as_deref()
3544 && !v.is_empty()
3545 {
3546 let is_creation =
3548 !n.labels.is_empty() || n.properties.is_some();
3549
3550 if is_creation {
3551 check_not_already_bound(
3552 v,
3553 &vars_in_scope,
3554 &create_vars,
3555 )?;
3556 create_vars.push(v);
3557 } else if is_standalone_node
3558 && is_var_in_scope(&vars_in_scope, v)
3559 && !create_introduced_vars.contains(v)
3560 {
3561 return Err(anyhow!(
3566 "SyntaxError: VariableAlreadyBound - '{}'",
3567 v
3568 ));
3569 } else if !create_vars.contains(&v) {
3570 create_vars.push(v);
3572 }
3573 }
3575 }
3576 PatternElement::Relationship(r) => {
3577 validate_property_variables(
3578 &r.properties,
3579 &vars_in_scope,
3580 &create_vars,
3581 )?;
3582
3583 if let Some(v) = r.variable.as_deref()
3584 && !v.is_empty()
3585 {
3586 check_not_already_bound(v, &vars_in_scope, &create_vars)?;
3587 create_vars.push(v);
3588 }
3589
3590 if r.types.len() != 1 {
3592 return Err(anyhow!(
3593 "SyntaxError: NoSingleRelationshipType - Exactly one relationship type required for CREATE"
3594 ));
3595 }
3596 if r.direction == Direction::Both {
3597 return Err(anyhow!(
3598 "SyntaxError: RequiresDirectedRelationship - Only directed relationships are supported in CREATE"
3599 ));
3600 }
3601 if r.range.is_some() {
3602 return Err(anyhow!(
3603 "SyntaxError: CreatingVarLength - Variable length relationships cannot be created"
3604 ));
3605 }
3606 }
3607 PatternElement::Parenthesized { .. } => {}
3608 }
3609 }
3610 }
3611
3612 match &mut plan {
3614 LogicalPlan::CreateBatch { patterns, .. } => {
3615 patterns.push(create_clause.pattern.clone());
3617 }
3618 LogicalPlan::Create { input, pattern } => {
3619 let first_pattern = pattern.clone();
3621 plan = LogicalPlan::CreateBatch {
3622 input: input.clone(),
3623 patterns: vec![first_pattern, create_clause.pattern.clone()],
3624 };
3625 }
3626 _ => {
3627 plan = LogicalPlan::Create {
3629 input: Box::new(plan),
3630 pattern: create_clause.pattern.clone(),
3631 };
3632 }
3633 }
3634 for path in &create_clause.pattern.paths {
3636 for element in &path.elements {
3637 match element {
3638 PatternElement::Node(n) => {
3639 if let Some(var) = &n.variable
3640 && !var.is_empty()
3641 {
3642 create_introduced_vars.insert(var.clone());
3643 add_var_to_scope(
3644 &mut vars_in_scope,
3645 var,
3646 VariableType::Node,
3647 )?;
3648 }
3649 }
3650 PatternElement::Relationship(r) => {
3651 if let Some(var) = &r.variable
3652 && !var.is_empty()
3653 {
3654 create_introduced_vars.insert(var.clone());
3655 add_var_to_scope(
3656 &mut vars_in_scope,
3657 var,
3658 VariableType::Edge,
3659 )?;
3660 }
3661 }
3662 PatternElement::Parenthesized { .. } => {
3663 }
3665 }
3666 }
3667 }
3668 }
3669 Clause::Set(set_clause) => {
3670 for item in &set_clause.items {
3672 match item {
3673 SetItem::Property { value, .. }
3674 | SetItem::Variable { value, .. }
3675 | SetItem::VariablePlus { value, .. } => {
3676 validate_expression_variables(value, &vars_in_scope)?;
3677 validate_expression(value, &vars_in_scope)?;
3678 if contains_pattern_predicate(value) {
3679 return Err(anyhow!(
3680 "SyntaxError: UnexpectedSyntax - Pattern predicates are not allowed in SET"
3681 ));
3682 }
3683 }
3684 SetItem::Labels { .. } => {}
3685 }
3686 }
3687 plan = LogicalPlan::Set {
3688 input: Box::new(plan),
3689 items: set_clause.items.clone(),
3690 };
3691 }
3692 Clause::Remove(remove_clause) => {
3693 plan = LogicalPlan::Remove {
3694 input: Box::new(plan),
3695 items: remove_clause.items.clone(),
3696 };
3697 }
3698 Clause::Delete(delete_clause) => {
3699 for item in &delete_clause.items {
3701 if matches!(item, Expr::LabelCheck { .. }) {
3703 return Err(anyhow!(
3704 "SyntaxError: InvalidDelete - DELETE requires a simple variable reference, not a label expression"
3705 ));
3706 }
3707 let vars_used = collect_expr_variables(item);
3708 if vars_used.is_empty() {
3710 return Err(anyhow!(
3711 "SyntaxError: InvalidArgumentType - DELETE requires node or relationship, not a literal expression"
3712 ));
3713 }
3714 for var in &vars_used {
3715 if find_var_in_scope(&vars_in_scope, var).is_none() {
3717 return Err(anyhow!(
3718 "SyntaxError: UndefinedVariable - Variable '{}' not defined",
3719 var
3720 ));
3721 }
3722 }
3723 if let Expr::Variable(name) = item
3728 && let Some(info) = find_var_in_scope(&vars_in_scope, name)
3729 && matches!(
3730 info.var_type,
3731 VariableType::Scalar | VariableType::ScalarLiteral
3732 )
3733 {
3734 return Err(anyhow!(
3735 "SyntaxError: InvalidArgumentType - DELETE requires node or relationship, '{}' is a scalar value",
3736 name
3737 ));
3738 }
3739 }
3740 for item in &delete_clause.items {
3742 if let Expr::Variable(name) = item {
3743 deleted_vars.insert(name.clone());
3744 }
3745 }
3746 plan = LogicalPlan::Delete {
3747 input: Box::new(plan),
3748 items: delete_clause.items.clone(),
3749 detach: delete_clause.detach,
3750 };
3751 }
3752 Clause::With(with_clause) => {
3753 let (new_plan, new_vars) =
3754 self.plan_with_clause(&with_clause, plan, &vars_in_scope)?;
3755 plan = new_plan;
3756 vars_in_scope = new_vars;
3757 }
3758 Clause::WithRecursive(with_recursive) => {
3759 plan = self.plan_with_recursive(&with_recursive, plan, &vars_in_scope)?;
3761 add_var_to_scope(
3763 &mut vars_in_scope,
3764 &with_recursive.name,
3765 VariableType::Scalar,
3766 )?;
3767 }
3768 Clause::Return(return_clause) => {
3769 if !deleted_vars.is_empty() {
3771 for item in &return_clause.items {
3772 if let ReturnItem::Expr { expr, .. } = item {
3773 validate_no_deleted_entity_access(expr, &deleted_vars)?;
3774 }
3775 }
3776 }
3777 plan = self.plan_return_clause(&return_clause, plan, &vars_in_scope)?;
3778 } }
3780 }
3781
3782 let plan = match &plan {
3787 LogicalPlan::Create { .. }
3788 | LogicalPlan::CreateBatch { .. }
3789 | LogicalPlan::Delete { .. }
3790 | LogicalPlan::Set { .. }
3791 | LogicalPlan::Remove { .. }
3792 | LogicalPlan::Merge { .. } => LogicalPlan::Limit {
3793 input: Box::new(plan),
3794 skip: None,
3795 fetch: Some(0),
3796 },
3797 _ => plan,
3798 };
3799
3800 Ok(plan)
3801 }
3802
3803 fn collect_properties_from_expr(expr: &Expr, collected: &mut Vec<Expr>) {
3804 match expr {
3805 Expr::Property(_, _)
3806 if !collected
3807 .iter()
3808 .any(|e| e.to_string_repr() == expr.to_string_repr()) =>
3809 {
3810 collected.push(expr.clone());
3811 }
3812 Expr::Property(_, _) => {}
3813 Expr::Variable(_) => {
3814 }
3816 Expr::BinaryOp { left, right, .. } => {
3817 Self::collect_properties_from_expr(left, collected);
3818 Self::collect_properties_from_expr(right, collected);
3819 }
3820 Expr::FunctionCall {
3821 args, window_spec, ..
3822 } => {
3823 for arg in args {
3824 Self::collect_properties_from_expr(arg, collected);
3825 }
3826 if let Some(spec) = window_spec {
3827 for partition_expr in &spec.partition_by {
3828 Self::collect_properties_from_expr(partition_expr, collected);
3829 }
3830 for sort_item in &spec.order_by {
3831 Self::collect_properties_from_expr(&sort_item.expr, collected);
3832 }
3833 }
3834 }
3835 Expr::List(items) => {
3836 for item in items {
3837 Self::collect_properties_from_expr(item, collected);
3838 }
3839 }
3840 Expr::UnaryOp { expr: e, .. }
3841 | Expr::IsNull(e)
3842 | Expr::IsNotNull(e)
3843 | Expr::IsUnique(e) => {
3844 Self::collect_properties_from_expr(e, collected);
3845 }
3846 Expr::Case {
3847 expr,
3848 when_then,
3849 else_expr,
3850 } => {
3851 if let Some(e) = expr {
3852 Self::collect_properties_from_expr(e, collected);
3853 }
3854 for (w, t) in when_then {
3855 Self::collect_properties_from_expr(w, collected);
3856 Self::collect_properties_from_expr(t, collected);
3857 }
3858 if let Some(e) = else_expr {
3859 Self::collect_properties_from_expr(e, collected);
3860 }
3861 }
3862 Expr::In { expr, list } => {
3863 Self::collect_properties_from_expr(expr, collected);
3864 Self::collect_properties_from_expr(list, collected);
3865 }
3866 Expr::ArrayIndex { array, index } => {
3867 Self::collect_properties_from_expr(array, collected);
3868 Self::collect_properties_from_expr(index, collected);
3869 }
3870 Expr::ArraySlice { array, start, end } => {
3871 Self::collect_properties_from_expr(array, collected);
3872 if let Some(s) = start {
3873 Self::collect_properties_from_expr(s, collected);
3874 }
3875 if let Some(e) = end {
3876 Self::collect_properties_from_expr(e, collected);
3877 }
3878 }
3879 _ => {}
3880 }
3881 }
3882
3883 fn collect_window_functions(expr: &Expr, collected: &mut Vec<Expr>) {
3884 if let Expr::FunctionCall { window_spec, .. } = expr {
3885 if window_spec.is_some() {
3887 if !collected
3888 .iter()
3889 .any(|e| e.to_string_repr() == expr.to_string_repr())
3890 {
3891 collected.push(expr.clone());
3892 }
3893 return;
3894 }
3895 }
3896
3897 match expr {
3898 Expr::BinaryOp { left, right, .. } => {
3899 Self::collect_window_functions(left, collected);
3900 Self::collect_window_functions(right, collected);
3901 }
3902 Expr::FunctionCall { args, .. } => {
3903 for arg in args {
3904 Self::collect_window_functions(arg, collected);
3905 }
3906 }
3907 Expr::List(items) => {
3908 for i in items {
3909 Self::collect_window_functions(i, collected);
3910 }
3911 }
3912 Expr::Map(items) => {
3913 for (_, i) in items {
3914 Self::collect_window_functions(i, collected);
3915 }
3916 }
3917 Expr::IsNull(e) | Expr::IsNotNull(e) | Expr::UnaryOp { expr: e, .. } => {
3918 Self::collect_window_functions(e, collected);
3919 }
3920 Expr::Case {
3921 expr,
3922 when_then,
3923 else_expr,
3924 } => {
3925 if let Some(e) = expr {
3926 Self::collect_window_functions(e, collected);
3927 }
3928 for (w, t) in when_then {
3929 Self::collect_window_functions(w, collected);
3930 Self::collect_window_functions(t, collected);
3931 }
3932 if let Some(e) = else_expr {
3933 Self::collect_window_functions(e, collected);
3934 }
3935 }
3936 Expr::Reduce {
3937 init, list, expr, ..
3938 } => {
3939 Self::collect_window_functions(init, collected);
3940 Self::collect_window_functions(list, collected);
3941 Self::collect_window_functions(expr, collected);
3942 }
3943 Expr::Quantifier {
3944 list, predicate, ..
3945 } => {
3946 Self::collect_window_functions(list, collected);
3947 Self::collect_window_functions(predicate, collected);
3948 }
3949 Expr::In { expr, list } => {
3950 Self::collect_window_functions(expr, collected);
3951 Self::collect_window_functions(list, collected);
3952 }
3953 Expr::ArrayIndex { array, index } => {
3954 Self::collect_window_functions(array, collected);
3955 Self::collect_window_functions(index, collected);
3956 }
3957 Expr::ArraySlice { array, start, end } => {
3958 Self::collect_window_functions(array, collected);
3959 if let Some(s) = start {
3960 Self::collect_window_functions(s, collected);
3961 }
3962 if let Some(e) = end {
3963 Self::collect_window_functions(e, collected);
3964 }
3965 }
3966 Expr::Property(e, _) => Self::collect_window_functions(e, collected),
3967 Expr::CountSubquery(_) | Expr::Exists { .. } => {}
3968 _ => {}
3969 }
3970 }
3971
3972 fn transform_window_expr_properties(expr: Expr) -> Expr {
3981 let Expr::FunctionCall {
3982 name,
3983 args,
3984 window_spec: Some(spec),
3985 distinct,
3986 } = expr
3987 else {
3988 return expr;
3989 };
3990
3991 let transformed_args = args
3994 .into_iter()
3995 .map(Self::transform_property_to_variable)
3996 .collect();
3997
3998 let transformed_partition_by = spec
4000 .partition_by
4001 .into_iter()
4002 .map(Self::transform_property_to_variable)
4003 .collect();
4004
4005 let transformed_order_by = spec
4006 .order_by
4007 .into_iter()
4008 .map(|item| SortItem {
4009 expr: Self::transform_property_to_variable(item.expr),
4010 ascending: item.ascending,
4011 })
4012 .collect();
4013
4014 Expr::FunctionCall {
4015 name,
4016 args: transformed_args,
4017 window_spec: Some(WindowSpec {
4018 partition_by: transformed_partition_by,
4019 order_by: transformed_order_by,
4020 }),
4021 distinct,
4022 }
4023 }
4024
4025 fn transform_property_to_variable(expr: Expr) -> Expr {
4029 let Expr::Property(base, prop) = expr else {
4030 return expr;
4031 };
4032
4033 match *base {
4034 Expr::Variable(var) => Expr::Variable(format!("{}.{}", var, prop)),
4035 other => Expr::Property(Box::new(Self::transform_property_to_variable(other)), prop),
4036 }
4037 }
4038
4039 fn transform_valid_at_to_function(expr: Expr) -> Expr {
4044 match expr {
4045 Expr::ValidAt {
4046 entity,
4047 timestamp,
4048 start_prop,
4049 end_prop,
4050 } => {
4051 let start = start_prop.unwrap_or_else(|| "valid_from".to_string());
4052 let end = end_prop.unwrap_or_else(|| "valid_to".to_string());
4053
4054 Expr::FunctionCall {
4055 name: "uni.temporal.validAt".to_string(),
4056 args: vec![
4057 Self::transform_valid_at_to_function(*entity),
4058 Expr::Literal(CypherLiteral::String(start)),
4059 Expr::Literal(CypherLiteral::String(end)),
4060 Self::transform_valid_at_to_function(*timestamp),
4061 ],
4062 distinct: false,
4063 window_spec: None,
4064 }
4065 }
4066 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
4068 left: Box::new(Self::transform_valid_at_to_function(*left)),
4069 op,
4070 right: Box::new(Self::transform_valid_at_to_function(*right)),
4071 },
4072 Expr::UnaryOp { op, expr } => Expr::UnaryOp {
4073 op,
4074 expr: Box::new(Self::transform_valid_at_to_function(*expr)),
4075 },
4076 Expr::FunctionCall {
4077 name,
4078 args,
4079 distinct,
4080 window_spec,
4081 } => Expr::FunctionCall {
4082 name,
4083 args: args
4084 .into_iter()
4085 .map(Self::transform_valid_at_to_function)
4086 .collect(),
4087 distinct,
4088 window_spec,
4089 },
4090 Expr::Property(base, prop) => {
4091 Expr::Property(Box::new(Self::transform_valid_at_to_function(*base)), prop)
4092 }
4093 Expr::List(items) => Expr::List(
4094 items
4095 .into_iter()
4096 .map(Self::transform_valid_at_to_function)
4097 .collect(),
4098 ),
4099 Expr::In { expr, list } => Expr::In {
4100 expr: Box::new(Self::transform_valid_at_to_function(*expr)),
4101 list: Box::new(Self::transform_valid_at_to_function(*list)),
4102 },
4103 Expr::IsNull(e) => Expr::IsNull(Box::new(Self::transform_valid_at_to_function(*e))),
4104 Expr::IsNotNull(e) => {
4105 Expr::IsNotNull(Box::new(Self::transform_valid_at_to_function(*e)))
4106 }
4107 Expr::IsUnique(e) => Expr::IsUnique(Box::new(Self::transform_valid_at_to_function(*e))),
4108 other => other,
4110 }
4111 }
4112
4113 fn rewrite_id_to_vid(expr: Expr) -> Expr {
4122 match expr {
4123 Expr::FunctionCall {
4124 name,
4125 args,
4126 distinct,
4127 window_spec,
4128 } if args.len() == 1 && Self::metadata_function_column(&name).is_some() => {
4129 if let Expr::Variable(ref var) = args[0] {
4130 let column = Self::metadata_function_column(&name).unwrap().to_string();
4131 Expr::Property(Box::new(Expr::Variable(var.clone())), column)
4132 } else {
4133 Expr::FunctionCall {
4134 name,
4135 args,
4136 distinct,
4137 window_spec,
4138 }
4139 }
4140 }
4141 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
4142 left: Box::new(Self::rewrite_id_to_vid(*left)),
4143 op,
4144 right: Box::new(Self::rewrite_id_to_vid(*right)),
4145 },
4146 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
4147 op,
4148 expr: Box::new(Self::rewrite_id_to_vid(*inner)),
4149 },
4150 other => other,
4151 }
4152 }
4153
4154 fn metadata_function_column(name: &str) -> Option<&'static str> {
4157 if name.eq_ignore_ascii_case("id") {
4158 Some("_vid")
4159 } else if name.eq_ignore_ascii_case("created_at") {
4160 Some("_created_at")
4161 } else if name.eq_ignore_ascii_case("updated_at") {
4162 Some("_updated_at")
4163 } else {
4164 None
4165 }
4166 }
4167
4168 fn plan_match_clause(
4170 &self,
4171 match_clause: &MatchClause,
4172 plan: LogicalPlan,
4173 vars_in_scope: &mut Vec<VariableInfo>,
4174 ) -> Result<LogicalPlan> {
4175 let mut plan = plan;
4176
4177 if match_clause.pattern.paths.is_empty() {
4178 return Err(anyhow!("Empty pattern"));
4179 }
4180
4181 let vars_before_pattern = vars_in_scope.len();
4183
4184 for path in &match_clause.pattern.paths {
4185 if let Some(mode) = &path.shortest_path_mode {
4186 plan =
4187 self.plan_shortest_path(path, plan, vars_in_scope, mode, vars_before_pattern)?;
4188 } else {
4189 plan = self.plan_path(
4190 path,
4191 plan,
4192 vars_in_scope,
4193 match_clause.optional,
4194 vars_before_pattern,
4195 )?;
4196 }
4197 }
4198
4199 let optional_vars: HashSet<String> = if match_clause.optional {
4201 vars_in_scope[vars_before_pattern..]
4202 .iter()
4203 .map(|v| v.name.clone())
4204 .collect()
4205 } else {
4206 HashSet::new()
4207 };
4208
4209 if let Some(predicate) = &match_clause.where_clause {
4211 plan = self.plan_where_clause(predicate, plan, vars_in_scope, optional_vars)?;
4212 }
4213
4214 Ok(plan)
4215 }
4216
4217 fn plan_shortest_path(
4219 &self,
4220 path: &PathPattern,
4221 plan: LogicalPlan,
4222 vars_in_scope: &mut Vec<VariableInfo>,
4223 mode: &ShortestPathMode,
4224 _vars_before_pattern: usize,
4225 ) -> Result<LogicalPlan> {
4226 let mut plan = plan;
4227 let elements = &path.elements;
4228
4229 if elements.len() < 3 || elements.len().is_multiple_of(2) {
4231 return Err(anyhow!(
4232 "shortestPath requires at least one relationship: (a)-[*]->(b)"
4233 ));
4234 }
4235
4236 let source_node = match &elements[0] {
4237 PatternElement::Node(n) => n,
4238 _ => return Err(anyhow!("ShortestPath must start with a node")),
4239 };
4240 let rel = match &elements[1] {
4241 PatternElement::Relationship(r) => r,
4242 _ => {
4243 return Err(anyhow!(
4244 "ShortestPath middle element must be a relationship"
4245 ));
4246 }
4247 };
4248 let target_node = match &elements[2] {
4249 PatternElement::Node(n) => n,
4250 _ => return Err(anyhow!("ShortestPath must end with a node")),
4251 };
4252
4253 let source_var = source_node
4254 .variable
4255 .clone()
4256 .ok_or_else(|| anyhow!("Source node must have variable in shortestPath"))?;
4257 let target_var = target_node
4258 .variable
4259 .clone()
4260 .ok_or_else(|| anyhow!("Target node must have variable in shortestPath"))?;
4261 let path_var = path
4262 .variable
4263 .clone()
4264 .ok_or_else(|| anyhow!("shortestPath must be assigned to a variable"))?;
4265
4266 let source_bound = is_var_in_scope(vars_in_scope, &source_var);
4267 let target_bound = is_var_in_scope(vars_in_scope, &target_var);
4268
4269 if !source_bound {
4271 plan = self.plan_unbound_node(source_node, &source_var, plan, false)?;
4272 } else if let Some(prop_filter) =
4273 self.properties_to_expr(&source_var, &source_node.properties)
4274 {
4275 plan = LogicalPlan::Filter {
4276 input: Box::new(plan),
4277 predicate: prop_filter,
4278 optional_variables: HashSet::new(),
4279 };
4280 }
4281
4282 let target_label_id = if !target_bound {
4284 let target_label_name = target_node
4286 .labels
4287 .first()
4288 .ok_or_else(|| anyhow!("Target node must have label if not already bound"))?;
4289 let target_label_id =
4294 if let Some(meta) = self.schema.get_label_case_insensitive(target_label_name) {
4295 meta.id
4296 } else if let Some((vid, _)) = self.allocate_virtual_label(target_label_name)? {
4297 vid
4298 } else {
4299 return Err(anyhow!("Label {} not found", target_label_name));
4300 };
4301
4302 let target_scan = LogicalPlan::Scan {
4303 label_id: target_label_id,
4304 labels: target_node.labels.names().to_vec(),
4305 variable: target_var.clone(),
4306 filter: self.properties_to_expr(&target_var, &target_node.properties),
4307 optional: false,
4308 };
4309
4310 plan = Self::join_with_plan(plan, target_scan);
4311 target_label_id
4312 } else {
4313 if let Some(prop_filter) = self.properties_to_expr(&target_var, &target_node.properties)
4314 {
4315 plan = LogicalPlan::Filter {
4316 input: Box::new(plan),
4317 predicate: prop_filter,
4318 optional_variables: HashSet::new(),
4319 };
4320 }
4321 0 };
4323
4324 let edge_type_ids = if rel.types.is_empty() {
4326 self.schema.all_edge_type_ids()
4328 } else {
4329 let mut ids = Vec::new();
4330 for type_name in &rel.types {
4331 let id = if let Some(meta) = self.schema.edge_types.get(type_name) {
4332 meta.id
4333 } else if let Some((vid, _)) = self.allocate_virtual_edge_type(type_name)? {
4334 vid
4335 } else {
4336 return Err(anyhow!("Edge type {} not found", type_name));
4337 };
4338 ids.push(id);
4339 }
4340 ids
4341 };
4342
4343 let min_hops = rel.range.as_ref().and_then(|r| r.min).unwrap_or(1);
4345 let max_hops = rel.range.as_ref().and_then(|r| r.max).unwrap_or(u32::MAX);
4346
4347 let sp_plan = match mode {
4348 ShortestPathMode::Shortest => LogicalPlan::ShortestPath {
4349 input: Box::new(plan),
4350 edge_type_ids,
4351 direction: rel.direction.clone(),
4352 source_variable: source_var.clone(),
4353 target_variable: target_var.clone(),
4354 target_label_id,
4355 path_variable: path_var.clone(),
4356 min_hops,
4357 max_hops,
4358 },
4359 ShortestPathMode::AllShortest => LogicalPlan::AllShortestPaths {
4360 input: Box::new(plan),
4361 edge_type_ids,
4362 direction: rel.direction.clone(),
4363 source_variable: source_var.clone(),
4364 target_variable: target_var.clone(),
4365 target_label_id,
4366 path_variable: path_var.clone(),
4367 min_hops,
4368 max_hops,
4369 },
4370 };
4371
4372 if !source_bound {
4373 add_var_to_scope(vars_in_scope, &source_var, VariableType::Node)?;
4374 }
4375 if !target_bound {
4376 add_var_to_scope(vars_in_scope, &target_var, VariableType::Node)?;
4377 }
4378 add_var_to_scope(vars_in_scope, &path_var, VariableType::Path)?;
4379
4380 Ok(sp_plan)
4381 }
4382 pub fn plan_pattern(
4387 &self,
4388 pattern: &Pattern,
4389 initial_vars: &[VariableInfo],
4390 ) -> Result<LogicalPlan> {
4391 let mut vars_in_scope: Vec<VariableInfo> = initial_vars.to_vec();
4392 let vars_before_pattern = vars_in_scope.len();
4393 let mut plan = LogicalPlan::Empty;
4394 for path in &pattern.paths {
4395 plan = self.plan_path(path, plan, &mut vars_in_scope, false, vars_before_pattern)?;
4396 }
4397 Ok(plan)
4398 }
4399
4400 fn plan_path(
4402 &self,
4403 path: &PathPattern,
4404 plan: LogicalPlan,
4405 vars_in_scope: &mut Vec<VariableInfo>,
4406 optional: bool,
4407 vars_before_pattern: usize,
4408 ) -> Result<LogicalPlan> {
4409 let mut plan = plan;
4410 let elements = &path.elements;
4411 let mut i = 0;
4412
4413 let path_variable = path.variable.clone();
4414
4415 if let Some(pv) = &path_variable
4417 && !pv.is_empty()
4418 && is_var_in_scope(vars_in_scope, pv)
4419 {
4420 return Err(anyhow!(
4421 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
4422 pv
4423 ));
4424 }
4425
4426 if let Some(pv) = &path_variable
4428 && !pv.is_empty()
4429 {
4430 for element in elements {
4431 match element {
4432 PatternElement::Node(n) => {
4433 if let Some(v) = &n.variable
4434 && v == pv
4435 {
4436 return Err(anyhow!(
4437 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
4438 pv
4439 ));
4440 }
4441 }
4442 PatternElement::Relationship(r) => {
4443 if let Some(v) = &r.variable
4444 && v == pv
4445 {
4446 return Err(anyhow!(
4447 "SyntaxError: VariableAlreadyBound - Variable '{}' already defined",
4448 pv
4449 ));
4450 }
4451 }
4452 PatternElement::Parenthesized { .. } => {}
4453 }
4454 }
4455 }
4456
4457 let mut optional_pattern_vars: HashSet<String> = if optional {
4460 let mut vars = HashSet::new();
4461 for element in elements {
4462 match element {
4463 PatternElement::Node(n) => {
4464 if let Some(v) = &n.variable
4465 && !v.is_empty()
4466 && !is_var_in_scope(vars_in_scope, v)
4467 {
4468 vars.insert(v.clone());
4469 }
4470 }
4471 PatternElement::Relationship(r) => {
4472 if let Some(v) = &r.variable
4473 && !v.is_empty()
4474 && !is_var_in_scope(vars_in_scope, v)
4475 {
4476 vars.insert(v.clone());
4477 }
4478 }
4479 PatternElement::Parenthesized { pattern, .. } => {
4480 for nested_elem in &pattern.elements {
4482 match nested_elem {
4483 PatternElement::Node(n) => {
4484 if let Some(v) = &n.variable
4485 && !v.is_empty()
4486 && !is_var_in_scope(vars_in_scope, v)
4487 {
4488 vars.insert(v.clone());
4489 }
4490 }
4491 PatternElement::Relationship(r) => {
4492 if let Some(v) = &r.variable
4493 && !v.is_empty()
4494 && !is_var_in_scope(vars_in_scope, v)
4495 {
4496 vars.insert(v.clone());
4497 }
4498 }
4499 _ => {}
4500 }
4501 }
4502 }
4503 }
4504 }
4505 if let Some(pv) = &path_variable
4507 && !pv.is_empty()
4508 {
4509 vars.insert(pv.clone());
4510 }
4511 vars
4512 } else {
4513 HashSet::new()
4514 };
4515
4516 let path_bound_edge_vars: HashSet<String> = {
4521 let mut bound = HashSet::new();
4522 for element in elements {
4523 if let PatternElement::Relationship(rel) = element
4524 && let Some(ref var_name) = rel.variable
4525 && !var_name.is_empty()
4526 && vars_in_scope[..vars_before_pattern]
4527 .iter()
4528 .any(|v| v.name == *var_name)
4529 {
4530 bound.insert(var_name.clone());
4531 }
4532 }
4533 bound
4534 };
4535
4536 let mut had_traverses = false;
4538 let mut single_node_variable: Option<String> = None;
4540 let mut path_node_vars: Vec<String> = Vec::new();
4542 let mut path_edge_vars: Vec<String> = Vec::new();
4543 let mut last_outer_node_var: Option<String> = None;
4546
4547 while i < elements.len() {
4549 let element = &elements[i];
4550 match element {
4551 PatternElement::Node(n) => {
4552 let mut variable = n.variable.clone().unwrap_or_default();
4553 if variable.is_empty() {
4554 variable = self.next_anon_var();
4555 }
4556 if single_node_variable.is_none() {
4558 single_node_variable = Some(variable.clone());
4559 }
4560 let is_bound =
4561 !variable.is_empty() && is_var_in_scope(vars_in_scope, &variable);
4562 if optional && !is_bound {
4563 optional_pattern_vars.insert(variable.clone());
4564 }
4565
4566 if is_bound {
4567 if let Some(info) = find_var_in_scope(vars_in_scope, &variable)
4569 && !info.var_type.is_compatible_with(VariableType::Node)
4570 {
4571 return Err(anyhow!(
4572 "SyntaxError: VariableTypeConflict - Variable '{}' already defined as {:?}, cannot use as Node",
4573 variable,
4574 info.var_type
4575 ));
4576 }
4577 if let Some(node_filter) =
4578 self.node_filter_expr(&variable, &n.labels, &n.properties)
4579 {
4580 plan = LogicalPlan::Filter {
4581 input: Box::new(plan),
4582 predicate: node_filter,
4583 optional_variables: HashSet::new(),
4584 };
4585 }
4586 } else {
4587 plan = self.plan_unbound_node(n, &variable, plan, optional)?;
4588 if !variable.is_empty() {
4589 add_var_to_scope(vars_in_scope, &variable, VariableType::Node)?;
4590 }
4591 }
4592
4593 if path_variable.is_some() && path_node_vars.is_empty() {
4595 path_node_vars.push(variable.clone());
4596 }
4597
4598 let mut current_source_var = variable;
4600 last_outer_node_var = Some(current_source_var.clone());
4601 i += 1;
4602 while i < elements.len() {
4603 if let PatternElement::Relationship(r) = &elements[i] {
4604 if i + 1 < elements.len() {
4605 let target_node_part = &elements[i + 1];
4606 if let PatternElement::Node(n_target) = target_node_part {
4607 let is_vlp = r.range.is_some();
4610 let traverse_path_var =
4611 if is_vlp { path_variable.clone() } else { None };
4612
4613 if is_vlp
4618 && let Some(pv) = path_variable.as_ref()
4619 && !path_node_vars.is_empty()
4620 {
4621 plan = LogicalPlan::BindPath {
4622 input: Box::new(plan),
4623 node_variables: std::mem::take(&mut path_node_vars),
4624 edge_variables: std::mem::take(&mut path_edge_vars),
4625 path_variable: pv.clone(),
4626 };
4627 if !is_var_in_scope(vars_in_scope, pv) {
4628 add_var_to_scope(
4629 vars_in_scope,
4630 pv,
4631 VariableType::Path,
4632 )?;
4633 }
4634 }
4635
4636 let target_was_bound =
4638 n_target.variable.as_ref().is_some_and(|v| {
4639 !v.is_empty() && is_var_in_scope(vars_in_scope, v)
4640 });
4641 let (new_plan, target_var, effective_target) = self
4642 .plan_traverse_with_source(
4643 plan,
4644 vars_in_scope,
4645 TraverseParams {
4646 rel: r,
4647 target_node: n_target,
4648 optional,
4649 path_variable: traverse_path_var,
4650 optional_pattern_vars: optional_pattern_vars
4651 .clone(),
4652 },
4653 ¤t_source_var,
4654 vars_before_pattern,
4655 &path_bound_edge_vars,
4656 )?;
4657 plan = new_plan;
4658 if optional && !target_was_bound {
4659 optional_pattern_vars.insert(target_var.clone());
4660 }
4661
4662 if path_variable.is_some() && !is_vlp {
4664 if let Some(ev) = &r.variable {
4670 path_edge_vars.push(ev.clone());
4671 } else {
4672 path_edge_vars
4673 .push(format!("__eid_to_{}", effective_target));
4674 }
4675 path_node_vars.push(target_var.clone());
4676 }
4677
4678 current_source_var = target_var;
4679 last_outer_node_var = Some(current_source_var.clone());
4680 had_traverses = true;
4681 i += 2;
4682 } else {
4683 return Err(anyhow!("Relationship must be followed by a node"));
4684 }
4685 } else {
4686 return Err(anyhow!("Relationship cannot be the last element"));
4687 }
4688 } else {
4689 break;
4690 }
4691 }
4692 }
4693 PatternElement::Relationship(_) => {
4694 return Err(anyhow!("Pattern must start with a node"));
4695 }
4696 PatternElement::Parenthesized { pattern, range } => {
4697 if pattern.elements.len() < 3 || pattern.elements.len() % 2 == 0 {
4700 return Err(anyhow!(
4701 "Quantified pattern must have node-relationship-node structure (odd number >= 3 elements)"
4702 ));
4703 }
4704
4705 let source_node = match &pattern.elements[0] {
4706 PatternElement::Node(n) => n,
4707 _ => return Err(anyhow!("Quantified pattern must start with a node")),
4708 };
4709
4710 let mut qpp_rels: Vec<(&RelationshipPattern, &NodePattern)> = Vec::new();
4712 for pair_idx in (1..pattern.elements.len()).step_by(2) {
4713 let rel = match &pattern.elements[pair_idx] {
4714 PatternElement::Relationship(r) => r,
4715 _ => {
4716 return Err(anyhow!(
4717 "Quantified pattern element at position {} must be a relationship",
4718 pair_idx
4719 ));
4720 }
4721 };
4722 let node = match &pattern.elements[pair_idx + 1] {
4723 PatternElement::Node(n) => n,
4724 _ => {
4725 return Err(anyhow!(
4726 "Quantified pattern element at position {} must be a node",
4727 pair_idx + 1
4728 ));
4729 }
4730 };
4731 if rel.range.is_some() {
4733 return Err(anyhow!(
4734 "Nested quantifiers not supported: ((a)-[:REL*n]->(b)){{m}}"
4735 ));
4736 }
4737 qpp_rels.push((rel, node));
4738 }
4739
4740 let inner_target_node = qpp_rels.last().unwrap().1;
4744 let outer_target_node = if i + 1 < elements.len() {
4745 match &elements[i + 1] {
4746 PatternElement::Node(n) => Some(n),
4747 _ => None,
4748 }
4749 } else {
4750 None
4751 };
4752 let target_node = outer_target_node.unwrap_or(inner_target_node);
4755
4756 let use_simple_vlp = qpp_rels.len() == 1
4759 && inner_target_node
4760 .labels
4761 .first()
4762 .and_then(|l| self.schema.get_label_case_insensitive(l))
4763 .is_none();
4764
4765 let source_variable = if let Some(ref outer_src) = last_outer_node_var {
4770 if let Some(prop_filter) =
4773 self.properties_to_expr(outer_src, &source_node.properties)
4774 {
4775 plan = LogicalPlan::Filter {
4776 input: Box::new(plan),
4777 predicate: prop_filter,
4778 optional_variables: HashSet::new(),
4779 };
4780 }
4781 outer_src.clone()
4782 } else {
4783 let sv = source_node
4784 .variable
4785 .clone()
4786 .filter(|v| !v.is_empty())
4787 .unwrap_or_else(|| self.next_anon_var());
4788
4789 if is_var_in_scope(vars_in_scope, &sv) {
4790 if let Some(prop_filter) =
4792 self.properties_to_expr(&sv, &source_node.properties)
4793 {
4794 plan = LogicalPlan::Filter {
4795 input: Box::new(plan),
4796 predicate: prop_filter,
4797 optional_variables: HashSet::new(),
4798 };
4799 }
4800 } else {
4801 plan = self.plan_unbound_node(source_node, &sv, plan, optional)?;
4803 add_var_to_scope(vars_in_scope, &sv, VariableType::Node)?;
4804 if optional {
4805 optional_pattern_vars.insert(sv.clone());
4806 }
4807 }
4808 sv
4809 };
4810
4811 if use_simple_vlp {
4812 let mut relationship = qpp_rels[0].0.clone();
4814 relationship.range = range.clone();
4815
4816 let target_was_bound = target_node
4817 .variable
4818 .as_ref()
4819 .is_some_and(|v| !v.is_empty() && is_var_in_scope(vars_in_scope, v));
4820 let (new_plan, target_var, _effective_target) = self
4821 .plan_traverse_with_source(
4822 plan,
4823 vars_in_scope,
4824 TraverseParams {
4825 rel: &relationship,
4826 target_node,
4827 optional,
4828 path_variable: path_variable.clone(),
4829 optional_pattern_vars: optional_pattern_vars.clone(),
4830 },
4831 &source_variable,
4832 vars_before_pattern,
4833 &path_bound_edge_vars,
4834 )?;
4835 plan = new_plan;
4836 if optional && !target_was_bound {
4837 optional_pattern_vars.insert(target_var);
4838 }
4839 } else {
4840 let mut qpp_step_infos = Vec::new();
4842 let mut all_edge_type_ids = Vec::new();
4843
4844 for (rel, node) in &qpp_rels {
4845 let mut step_edge_type_ids = Vec::new();
4846 if rel.types.is_empty() {
4847 step_edge_type_ids = self.schema.all_edge_type_ids();
4848 } else {
4849 for type_name in &rel.types {
4850 if let Some(edge_meta) = self.schema.edge_types.get(type_name) {
4851 step_edge_type_ids.push(edge_meta.id);
4852 }
4853 }
4854 }
4855 all_edge_type_ids.extend_from_slice(&step_edge_type_ids);
4856
4857 let target_label = node.labels.first().and_then(|l| {
4858 self.schema.get_label_case_insensitive(l).map(|_| l.clone())
4859 });
4860
4861 qpp_step_infos.push(QppStepInfo {
4862 edge_type_ids: step_edge_type_ids,
4863 direction: rel.direction.clone(),
4864 target_label,
4865 });
4866 }
4867
4868 all_edge_type_ids.sort_unstable();
4870 all_edge_type_ids.dedup();
4871
4872 let hops_per_iter = qpp_step_infos.len();
4874 const QPP_DEFAULT_MAX_HOPS: usize = 100;
4875 let (min_iter, max_iter) = if let Some(range) = range {
4876 let min = range.min.unwrap_or(1) as usize;
4877 let max = range
4878 .max
4879 .map(|m| m as usize)
4880 .unwrap_or(QPP_DEFAULT_MAX_HOPS / hops_per_iter);
4881 (min, max)
4882 } else {
4883 (1, 1)
4884 };
4885 let min_hops = min_iter * hops_per_iter;
4886 let max_hops = max_iter * hops_per_iter;
4887
4888 let target_variable = target_node
4890 .variable
4891 .clone()
4892 .filter(|v| !v.is_empty())
4893 .unwrap_or_else(|| self.next_anon_var());
4894
4895 let target_is_bound = is_var_in_scope(vars_in_scope, &target_variable);
4896
4897 let target_label_meta = target_node
4899 .labels
4900 .first()
4901 .and_then(|l| self.schema.get_label_case_insensitive(l));
4902
4903 let mut scope_match_variables: HashSet<String> = vars_in_scope
4905 [vars_before_pattern..]
4906 .iter()
4907 .map(|v| v.name.clone())
4908 .collect();
4909 scope_match_variables.insert(target_variable.clone());
4910
4911 let rebound_target_var = if target_is_bound {
4913 Some(target_variable.clone())
4914 } else {
4915 None
4916 };
4917 let effective_target_var = if let Some(ref bv) = rebound_target_var {
4918 format!("__rebound_{}", bv)
4919 } else {
4920 target_variable.clone()
4921 };
4922
4923 plan = LogicalPlan::Traverse {
4924 input: Box::new(plan),
4925 edge_type_ids: all_edge_type_ids,
4926 direction: qpp_rels[0].0.direction.clone(),
4927 source_variable: source_variable.to_string(),
4928 target_variable: effective_target_var.clone(),
4929 target_label_id: target_label_meta.map(|m| m.id).unwrap_or(0),
4930 step_variable: None, min_hops,
4932 max_hops,
4933 optional,
4934 target_filter: self.node_filter_expr(
4935 &target_variable,
4936 &target_node.labels,
4937 &target_node.properties,
4938 ),
4939 path_variable: path_variable.clone(),
4940 edge_properties: HashSet::new(),
4941 is_variable_length: true,
4942 optional_pattern_vars: optional_pattern_vars.clone(),
4943 scope_match_variables,
4944 edge_filter_expr: None,
4945 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
4946 qpp_steps: Some(qpp_step_infos),
4947 };
4948
4949 if let Some(ref btv) = rebound_target_var {
4951 let filter_pred = Expr::BinaryOp {
4953 left: Box::new(Expr::Property(
4954 Box::new(Expr::Variable(effective_target_var.clone())),
4955 "_vid".to_string(),
4956 )),
4957 op: BinaryOp::Eq,
4958 right: Box::new(Expr::Property(
4959 Box::new(Expr::Variable(btv.clone())),
4960 "_vid".to_string(),
4961 )),
4962 };
4963 plan = LogicalPlan::Filter {
4964 input: Box::new(plan),
4965 predicate: filter_pred,
4966 optional_variables: if optional {
4967 optional_pattern_vars.clone()
4968 } else {
4969 HashSet::new()
4970 },
4971 };
4972 }
4973
4974 if !target_is_bound {
4976 add_var_to_scope(vars_in_scope, &target_variable, VariableType::Node)?;
4977 }
4978
4979 if let Some(ref pv) = path_variable
4981 && !pv.is_empty()
4982 && !is_var_in_scope(vars_in_scope, pv)
4983 {
4984 add_var_to_scope(vars_in_scope, pv, VariableType::Path)?;
4985 }
4986 }
4987 had_traverses = true;
4988
4989 if outer_target_node.is_some() {
4991 i += 2; } else {
4993 i += 1;
4994 }
4995 }
4996 }
4997 }
4998
4999 if let Some(ref path_var) = path_variable
5002 && !path_var.is_empty()
5003 && !had_traverses
5004 && let Some(node_var) = single_node_variable
5005 {
5006 plan = LogicalPlan::BindZeroLengthPath {
5007 input: Box::new(plan),
5008 node_variable: node_var,
5009 path_variable: path_var.clone(),
5010 };
5011 add_var_to_scope(vars_in_scope, path_var, VariableType::Path)?;
5012 }
5013
5014 if let Some(ref path_var) = path_variable
5016 && !path_var.is_empty()
5017 && had_traverses
5018 && !path_node_vars.is_empty()
5019 && !is_var_in_scope(vars_in_scope, path_var)
5020 {
5021 plan = LogicalPlan::BindPath {
5022 input: Box::new(plan),
5023 node_variables: path_node_vars,
5024 edge_variables: path_edge_vars,
5025 path_variable: path_var.clone(),
5026 };
5027 add_var_to_scope(vars_in_scope, path_var, VariableType::Path)?;
5028 }
5029
5030 Ok(plan)
5031 }
5032
5033 fn plan_traverse_with_source(
5040 &self,
5041 plan: LogicalPlan,
5042 vars_in_scope: &mut Vec<VariableInfo>,
5043 params: TraverseParams<'_>,
5044 source_variable: &str,
5045 vars_before_pattern: usize,
5046 path_bound_edge_vars: &HashSet<String>,
5047 ) -> Result<(LogicalPlan, String, String)> {
5048 if let Some(Expr::Parameter(_)) = ¶ms.rel.properties {
5050 return Err(anyhow!(
5051 "SyntaxError: InvalidParameterUse - Parameters cannot be used as relationship predicates"
5052 ));
5053 }
5054
5055 let mut edge_type_ids = Vec::new();
5056 let mut dst_labels = Vec::new();
5057 let mut unknown_types = Vec::new();
5058
5059 if params.rel.types.is_empty() {
5060 edge_type_ids = self.schema.all_edge_type_ids();
5063 for meta in self.schema.edge_types.values() {
5064 dst_labels.extend(meta.dst_labels.iter().cloned());
5065 }
5066 } else {
5067 for type_name in ¶ms.rel.types {
5068 if let Some(edge_meta) = self.schema.edge_types.get(type_name) {
5069 edge_type_ids.push(edge_meta.id);
5071 dst_labels.extend(edge_meta.dst_labels.iter().cloned());
5072 } else if let Some((vid, _)) = self.allocate_virtual_edge_type(type_name)? {
5073 edge_type_ids.push(vid);
5079 } else {
5080 unknown_types.push(type_name.clone());
5082 }
5083 }
5084 }
5085
5086 edge_type_ids.sort_unstable();
5088 edge_type_ids.dedup();
5089 unknown_types.sort_unstable();
5090 unknown_types.dedup();
5091
5092 let mut target_variable = params.target_node.variable.clone().unwrap_or_default();
5093 if target_variable.is_empty() {
5094 target_variable = self.next_anon_var();
5095 }
5096 let target_is_bound =
5097 !target_variable.is_empty() && is_var_in_scope(vars_in_scope, &target_variable);
5098
5099 if let Some(rel_var) = ¶ms.rel.variable
5102 && !rel_var.is_empty()
5103 && rel_var == &target_variable
5104 {
5105 return Err(anyhow!(
5106 "SyntaxError: VariableTypeConflict - Variable '{}' already defined as relationship, cannot use as node",
5107 rel_var
5108 ));
5109 }
5110
5111 let mut bound_edge_var: Option<String> = None;
5116 let mut bound_edge_list_var: Option<String> = None;
5117 if let Some(rel_var) = ¶ms.rel.variable
5118 && !rel_var.is_empty()
5119 && let Some(info) = find_var_in_scope(vars_in_scope, rel_var)
5120 {
5121 let is_from_previous_clause = vars_in_scope[..vars_before_pattern]
5122 .iter()
5123 .any(|v| v.name == *rel_var);
5124
5125 if info.var_type == VariableType::Edge {
5126 if is_from_previous_clause {
5128 bound_edge_var = Some(rel_var.clone());
5131 } else {
5132 return Err(anyhow!(
5134 "SyntaxError: RelationshipUniquenessViolation - Relationship variable '{}' is already used in this pattern",
5135 rel_var
5136 ));
5137 }
5138 } else if params.rel.range.is_some()
5139 && is_from_previous_clause
5140 && matches!(
5141 info.var_type,
5142 VariableType::Scalar | VariableType::ScalarLiteral
5143 )
5144 {
5145 bound_edge_list_var = Some(rel_var.clone());
5148 } else if !info.var_type.is_compatible_with(VariableType::Edge) {
5149 return Err(anyhow!(
5150 "SyntaxError: VariableTypeConflict - Variable '{}' already defined as {:?}, cannot use as relationship",
5151 rel_var,
5152 info.var_type
5153 ));
5154 }
5155 }
5156
5157 if target_is_bound
5160 && let Some(info) = find_var_in_scope(vars_in_scope, &target_variable)
5161 && !info.var_type.is_compatible_with(VariableType::Node)
5162 {
5163 return Err(anyhow!(
5164 "SyntaxError: VariableTypeConflict - Variable '{}' already defined as {:?}, cannot use as Node",
5165 target_variable,
5166 info.var_type
5167 ));
5168 }
5169
5170 if !unknown_types.is_empty() && edge_type_ids.is_empty() {
5174 let is_variable_length = params.rel.range.is_some();
5177
5178 const DEFAULT_MAX_HOPS: usize = 100;
5179 let (min_hops, max_hops) = if let Some(range) = ¶ms.rel.range {
5180 let min = range.min.unwrap_or(1) as usize;
5181 let max = range.max.map(|m| m as usize).unwrap_or(DEFAULT_MAX_HOPS);
5182 (min, max)
5183 } else {
5184 (1, 1)
5185 };
5186
5187 let step_var = params.rel.variable.clone();
5193 let path_var = params.path_variable.clone();
5194
5195 let mut scope_match_variables: HashSet<String> = vars_in_scope[vars_before_pattern..]
5197 .iter()
5198 .map(|v| v.name.clone())
5199 .collect();
5200 if let Some(ref sv) = step_var {
5201 if bound_edge_var.is_none() {
5205 scope_match_variables.insert(sv.clone());
5206 }
5207 }
5208 scope_match_variables.insert(target_variable.clone());
5209 scope_match_variables.extend(
5215 path_bound_edge_vars
5216 .iter()
5217 .filter(|v| bound_edge_var.as_ref() != Some(*v))
5218 .cloned(),
5219 );
5220
5221 let mut plan = LogicalPlan::TraverseMainByType {
5222 type_names: unknown_types,
5223 input: Box::new(plan),
5224 direction: params.rel.direction.clone(),
5225 source_variable: source_variable.to_string(),
5226 target_variable: target_variable.clone(),
5227 step_variable: step_var.clone(),
5228 min_hops,
5229 max_hops,
5230 optional: params.optional,
5231 target_filter: self.node_filter_expr(
5232 &target_variable,
5233 ¶ms.target_node.labels,
5234 ¶ms.target_node.properties,
5235 ),
5236 path_variable: path_var.clone(),
5237 is_variable_length,
5238 optional_pattern_vars: params.optional_pattern_vars.clone(),
5239 scope_match_variables,
5240 edge_filter_expr: if is_variable_length {
5241 let filter_var = step_var
5242 .clone()
5243 .unwrap_or_else(|| "__anon_edge".to_string());
5244 self.properties_to_expr(&filter_var, ¶ms.rel.properties)
5245 } else {
5246 None
5247 },
5248 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
5249 };
5250
5251 if target_is_bound
5255 && let Some(info) = find_var_in_scope(vars_in_scope, &target_variable)
5256 && info.var_type == VariableType::Imported
5257 {
5258 plan = Self::wrap_with_bound_target_filter(plan, &target_variable);
5259 }
5260
5261 if !is_variable_length
5266 && let Some(edge_var_name) = step_var.as_ref()
5267 && let Some(edge_prop_filter) =
5268 self.properties_to_expr(edge_var_name, ¶ms.rel.properties)
5269 {
5270 let filter_optional_vars = if params.optional {
5271 params.optional_pattern_vars.clone()
5272 } else {
5273 HashSet::new()
5274 };
5275 plan = LogicalPlan::Filter {
5276 input: Box::new(plan),
5277 predicate: edge_prop_filter,
5278 optional_variables: filter_optional_vars,
5279 };
5280 }
5281
5282 if let Some(sv) = &step_var {
5284 add_var_to_scope(vars_in_scope, sv, VariableType::Edge)?;
5285 if is_variable_length
5286 && let Some(info) = vars_in_scope.iter_mut().find(|v| v.name == *sv)
5287 {
5288 info.is_vlp = true;
5289 }
5290 }
5291 if let Some(pv) = &path_var
5292 && !is_var_in_scope(vars_in_scope, pv)
5293 {
5294 add_var_to_scope(vars_in_scope, pv, VariableType::Path)?;
5295 }
5296 if !is_var_in_scope(vars_in_scope, &target_variable) {
5297 add_var_to_scope(vars_in_scope, &target_variable, VariableType::Node)?;
5298 }
5299
5300 return Ok((plan, target_variable.clone(), target_variable));
5301 }
5302
5303 if !unknown_types.is_empty() {
5306 return Err(anyhow!(
5307 "Mixed known and unknown edge types not yet supported. Unknown: {:?}",
5308 unknown_types
5309 ));
5310 }
5311
5312 let mut virtual_target_label_id: Option<u16> = None;
5319 let target_label_meta = if let Some(label_name) = params.target_node.labels.first() {
5320 match self.schema.get_label_case_insensitive(label_name) {
5323 Some(meta) => Some(meta),
5324 None => {
5325 if let Some((vid, _)) = self.allocate_virtual_label(label_name)? {
5326 virtual_target_label_id = Some(vid);
5327 }
5328 None
5329 }
5330 }
5331 } else if !target_is_bound {
5332 let unique_dsts: Vec<_> = dst_labels
5334 .into_iter()
5335 .collect::<HashSet<_>>()
5336 .into_iter()
5337 .collect();
5338 if unique_dsts.len() == 1 {
5339 let label_name = &unique_dsts[0];
5340 self.schema.get_label_case_insensitive(label_name)
5341 } else {
5342 None
5346 }
5347 } else {
5348 None
5349 };
5350
5351 let is_variable_length = params.rel.range.is_some();
5353
5354 const DEFAULT_MAX_HOPS: usize = 100;
5357 let (min_hops, max_hops) = if let Some(range) = ¶ms.rel.range {
5358 let min = range.min.unwrap_or(1) as usize;
5359 let max = range.max.map(|m| m as usize).unwrap_or(DEFAULT_MAX_HOPS);
5360 (min, max)
5361 } else {
5362 (1, 1)
5363 };
5364
5365 let step_var = params.rel.variable.clone();
5370 let path_var = params.path_variable.clone();
5371
5372 let rebound_var = bound_edge_var
5375 .as_ref()
5376 .or(bound_edge_list_var.as_ref())
5377 .cloned();
5378 let effective_step_var = if let Some(ref bv) = rebound_var {
5379 Some(format!("__rebound_{}", bv))
5380 } else {
5381 step_var.clone()
5382 };
5383
5384 let rebound_target_var = if target_is_bound && !target_variable.is_empty() {
5388 let is_imported = find_var_in_scope(vars_in_scope, &target_variable)
5389 .map(|info| info.var_type == VariableType::Imported)
5390 .unwrap_or(false);
5391 if !is_imported {
5392 Some(target_variable.clone())
5393 } else {
5394 None
5395 }
5396 } else {
5397 None
5398 };
5399
5400 let effective_target_var = if let Some(ref bv) = rebound_target_var {
5401 format!("__rebound_{}", bv)
5402 } else {
5403 target_variable.clone()
5404 };
5405
5406 let mut scope_match_variables: HashSet<String> = vars_in_scope[vars_before_pattern..]
5412 .iter()
5413 .map(|v| v.name.clone())
5414 .collect();
5415 if let Some(ref sv) = effective_step_var {
5417 scope_match_variables.insert(sv.clone());
5418 }
5419 scope_match_variables.insert(effective_target_var.clone());
5421 scope_match_variables.extend(path_bound_edge_vars.iter().cloned());
5424
5425 let mut plan = LogicalPlan::Traverse {
5426 input: Box::new(plan),
5427 edge_type_ids,
5428 direction: params.rel.direction.clone(),
5429 source_variable: source_variable.to_string(),
5430 target_variable: effective_target_var.clone(),
5431 target_label_id: target_label_meta
5432 .map(|m| m.id)
5433 .or(virtual_target_label_id)
5434 .unwrap_or(0),
5435 step_variable: effective_step_var.clone(),
5436 min_hops,
5437 max_hops,
5438 optional: params.optional,
5439 target_filter: self.node_filter_expr(
5440 &target_variable,
5441 ¶ms.target_node.labels,
5442 ¶ms.target_node.properties,
5443 ),
5444 path_variable: path_var.clone(),
5445 edge_properties: HashSet::new(),
5446 is_variable_length,
5447 optional_pattern_vars: params.optional_pattern_vars.clone(),
5448 scope_match_variables,
5449 edge_filter_expr: if is_variable_length {
5450 let filter_var = effective_step_var
5456 .clone()
5457 .unwrap_or_else(|| "__anon_edge".to_string());
5458 self.properties_to_expr(&filter_var, ¶ms.rel.properties)
5459 } else {
5460 None
5461 },
5462 path_mode: crate::query::df_graph::nfa::PathMode::Trail,
5463 qpp_steps: None,
5464 };
5465
5466 let filter_optional_vars = if params.optional {
5469 params.optional_pattern_vars.clone()
5470 } else {
5471 HashSet::new()
5472 };
5473
5474 if !is_variable_length
5478 && let Some(edge_var_name) = effective_step_var.as_ref()
5479 && let Some(edge_prop_filter) =
5480 self.properties_to_expr(edge_var_name, ¶ms.rel.properties)
5481 {
5482 plan = LogicalPlan::Filter {
5483 input: Box::new(plan),
5484 predicate: edge_prop_filter,
5485 optional_variables: filter_optional_vars.clone(),
5486 };
5487 }
5488
5489 if target_is_bound
5493 && let Some(info) = find_var_in_scope(vars_in_scope, &target_variable)
5494 && info.var_type == VariableType::Imported
5495 {
5496 plan = Self::wrap_with_bound_target_filter(plan, &target_variable);
5497 }
5498
5499 if let Some(ref bv) = bound_edge_var {
5501 let temp_var = format!("__rebound_{}", bv);
5502 let bound_check = Expr::BinaryOp {
5503 left: Box::new(Expr::Property(
5504 Box::new(Expr::Variable(temp_var)),
5505 "_eid".to_string(),
5506 )),
5507 op: BinaryOp::Eq,
5508 right: Box::new(Expr::Property(
5509 Box::new(Expr::Variable(bv.clone())),
5510 "_eid".to_string(),
5511 )),
5512 };
5513 plan = LogicalPlan::Filter {
5514 input: Box::new(plan),
5515 predicate: bound_check,
5516 optional_variables: filter_optional_vars.clone(),
5517 };
5518 }
5519
5520 if let Some(ref bv) = bound_edge_list_var {
5523 let temp_var = format!("__rebound_{}", bv);
5524 let temp_eids = Expr::ListComprehension {
5525 variable: "__rebound_edge".to_string(),
5526 list: Box::new(Expr::Variable(temp_var)),
5527 where_clause: None,
5528 map_expr: Box::new(Expr::FunctionCall {
5529 name: "toInteger".to_string(),
5530 args: vec![Expr::Property(
5531 Box::new(Expr::Variable("__rebound_edge".to_string())),
5532 "_eid".to_string(),
5533 )],
5534 distinct: false,
5535 window_spec: None,
5536 }),
5537 };
5538 let bound_eids = Expr::ListComprehension {
5539 variable: "__bound_edge".to_string(),
5540 list: Box::new(Expr::Variable(bv.clone())),
5541 where_clause: None,
5542 map_expr: Box::new(Expr::FunctionCall {
5543 name: "toInteger".to_string(),
5544 args: vec![Expr::Property(
5545 Box::new(Expr::Variable("__bound_edge".to_string())),
5546 "_eid".to_string(),
5547 )],
5548 distinct: false,
5549 window_spec: None,
5550 }),
5551 };
5552 let bound_list_check = Expr::BinaryOp {
5553 left: Box::new(temp_eids),
5554 op: BinaryOp::Eq,
5555 right: Box::new(bound_eids),
5556 };
5557 plan = LogicalPlan::Filter {
5558 input: Box::new(plan),
5559 predicate: bound_list_check,
5560 optional_variables: filter_optional_vars.clone(),
5561 };
5562 }
5563
5564 if let Some(ref bv) = rebound_target_var {
5567 let temp_var = format!("__rebound_{}", bv);
5568 let bound_check = Expr::BinaryOp {
5569 left: Box::new(Expr::Property(
5570 Box::new(Expr::Variable(temp_var.clone())),
5571 "_vid".to_string(),
5572 )),
5573 op: BinaryOp::Eq,
5574 right: Box::new(Expr::Property(
5575 Box::new(Expr::Variable(bv.clone())),
5576 "_vid".to_string(),
5577 )),
5578 };
5579 let mut rebound_filter_vars = filter_optional_vars;
5586 if params.optional {
5587 rebound_filter_vars.insert(temp_var);
5588 }
5589 plan = LogicalPlan::Filter {
5590 input: Box::new(plan),
5591 predicate: bound_check,
5592 optional_variables: rebound_filter_vars,
5593 };
5594 }
5595
5596 if let Some(sv) = &step_var
5599 && bound_edge_var.is_none()
5600 && bound_edge_list_var.is_none()
5601 {
5602 add_var_to_scope(vars_in_scope, sv, VariableType::Edge)?;
5603 if is_variable_length
5604 && let Some(info) = vars_in_scope.iter_mut().find(|v| v.name == *sv)
5605 {
5606 info.is_vlp = true;
5607 }
5608 }
5609 if let Some(pv) = &path_var
5610 && !is_var_in_scope(vars_in_scope, pv)
5611 {
5612 add_var_to_scope(vars_in_scope, pv, VariableType::Path)?;
5613 }
5614 if !is_var_in_scope(vars_in_scope, &target_variable) {
5615 add_var_to_scope(vars_in_scope, &target_variable, VariableType::Node)?;
5616 }
5617
5618 Ok((plan, target_variable, effective_target_var))
5619 }
5620
5621 fn join_with_plan(existing: LogicalPlan, new: LogicalPlan) -> LogicalPlan {
5626 if matches!(existing, LogicalPlan::Empty) {
5627 new
5628 } else {
5629 LogicalPlan::CrossJoin {
5630 left: Box::new(existing),
5631 right: Box::new(new),
5632 }
5633 }
5634 }
5635
5636 fn split_node_property_filters_for_scan(
5643 &self,
5644 variable: &str,
5645 properties: &Option<Expr>,
5646 ) -> (Option<Expr>, Option<Expr>) {
5647 let entries = match properties {
5648 Some(Expr::Map(entries)) => entries,
5649 _ => return (None, None),
5650 };
5651
5652 if entries.is_empty() {
5653 return (None, None);
5654 }
5655
5656 let mut pushdown_entries = Vec::new();
5657 let mut residual_entries = Vec::new();
5658
5659 for (prop, val_expr) in entries {
5660 let vars = collect_expr_variables(val_expr);
5661 if vars.iter().all(|v| v == variable) {
5662 pushdown_entries.push((prop.clone(), val_expr.clone()));
5663 } else {
5664 residual_entries.push((prop.clone(), val_expr.clone()));
5665 }
5666 }
5667
5668 let pushdown_map = if pushdown_entries.is_empty() {
5669 None
5670 } else {
5671 Some(Expr::Map(pushdown_entries))
5672 };
5673 let residual_map = if residual_entries.is_empty() {
5674 None
5675 } else {
5676 Some(Expr::Map(residual_entries))
5677 };
5678
5679 (
5680 self.properties_to_expr(variable, &pushdown_map),
5681 self.properties_to_expr(variable, &residual_map),
5682 )
5683 }
5684
5685 fn label_branches_share_property_schema(&self, labels: &[String]) -> bool {
5703 if labels.len() < 2 {
5704 return true;
5705 }
5706 let mut iter = labels.iter();
5707 let first = iter.next().expect("len >= 2");
5708 let Some(first_props) = self.schema.properties.get(first) else {
5709 return false;
5710 };
5711 for label in iter {
5712 let Some(props) = self.schema.properties.get(label) else {
5713 return false;
5714 };
5715 if props.len() != first_props.len() {
5716 return false;
5717 }
5718 for (name, meta) in first_props {
5719 let Some(other_meta) = props.get(name) else {
5720 return false;
5721 };
5722 if meta.r#type != other_meta.r#type {
5723 return false;
5724 }
5725 }
5726 }
5727 true
5728 }
5729
5730 fn plan_unbound_node(
5732 &self,
5733 node: &NodePattern,
5734 variable: &str,
5735 plan: LogicalPlan,
5736 optional: bool,
5737 ) -> Result<LogicalPlan> {
5738 let properties = match &node.properties {
5740 Some(Expr::Map(entries)) => entries.as_slice(),
5741 Some(Expr::Parameter(_)) => {
5742 return Err(anyhow!(
5743 "SyntaxError: InvalidParameterUse - Parameters cannot be used as node predicates"
5744 ));
5745 }
5746 Some(_) => return Err(anyhow!("Node properties must be a Map")),
5747 None => &[],
5748 };
5749
5750 let has_existing_scope = !matches!(plan, LogicalPlan::Empty);
5751
5752 let apply_residual_filter = |input: LogicalPlan, residual: Option<Expr>| -> LogicalPlan {
5753 if let Some(predicate) = residual {
5754 LogicalPlan::Filter {
5755 input: Box::new(input),
5756 predicate,
5757 optional_variables: HashSet::new(),
5758 }
5759 } else {
5760 input
5761 }
5762 };
5763
5764 let (node_scan_filter, node_residual_filter) = if has_existing_scope {
5765 self.split_node_property_filters_for_scan(variable, &node.properties)
5766 } else {
5767 (self.properties_to_expr(variable, &node.properties), None)
5768 };
5769
5770 if node.labels.is_empty() {
5772 if let Some((_, ext_id_value)) = properties.iter().find(|(k, _)| k == "ext_id") {
5774 let ext_id = match ext_id_value {
5776 Expr::Literal(CypherLiteral::String(s)) => s.clone(),
5777 _ => {
5778 return Err(anyhow!("ext_id must be a string literal for direct lookup"));
5779 }
5780 };
5781
5782 let remaining_props: Vec<_> = properties
5784 .iter()
5785 .filter(|(k, _)| k != "ext_id")
5786 .cloned()
5787 .collect();
5788
5789 let remaining_expr = if remaining_props.is_empty() {
5790 None
5791 } else {
5792 Some(Expr::Map(remaining_props))
5793 };
5794
5795 let (prop_filter, residual_filter) = if has_existing_scope {
5796 self.split_node_property_filters_for_scan(variable, &remaining_expr)
5797 } else {
5798 (self.properties_to_expr(variable, &remaining_expr), None)
5799 };
5800
5801 let ext_id_lookup = LogicalPlan::ExtIdLookup {
5802 variable: variable.to_string(),
5803 ext_id,
5804 filter: prop_filter,
5805 optional,
5806 };
5807
5808 let joined = Self::join_with_plan(plan, ext_id_lookup);
5809 return Ok(apply_residual_filter(joined, residual_filter));
5810 }
5811
5812 let scan_all = LogicalPlan::ScanAll {
5814 variable: variable.to_string(),
5815 filter: node_scan_filter,
5816 optional,
5817 };
5818
5819 let joined = Self::join_with_plan(plan, scan_all);
5820 return Ok(apply_residual_filter(joined, node_residual_filter));
5821 }
5822
5823 if node.labels.is_proper_disjunction() {
5835 let label_names: Vec<String> = node.labels.names().to_vec();
5836
5837 let use_main_table_branches = !self.label_branches_share_property_schema(&label_names);
5854
5855 let mut branches: Vec<LogicalPlan> = Vec::with_capacity(label_names.len());
5856 for label_name in &label_names {
5857 let branch = if use_main_table_branches {
5858 LogicalPlan::ScanMainByLabels {
5859 labels: vec![label_name.clone()],
5860 variable: variable.to_string(),
5861 filter: node_scan_filter.clone(),
5862 optional,
5863 }
5864 } else {
5865 let meta = self
5866 .schema
5867 .get_label_case_insensitive(label_name)
5868 .expect("share_property_schema true implies all labels in schema");
5869 LogicalPlan::Scan {
5870 label_id: meta.id,
5871 labels: vec![label_name.clone()],
5872 variable: variable.to_string(),
5873 filter: node_scan_filter.clone(),
5874 optional,
5875 }
5876 };
5877 branches.push(branch);
5878 }
5879 let mut iter = branches.into_iter();
5882 let mut union_plan = iter
5883 .next()
5884 .expect("is_proper_disjunction implies at least 2 labels");
5885 for next in iter {
5886 union_plan = LogicalPlan::Union {
5887 left: Box::new(union_plan),
5888 right: Box::new(next),
5889 all: false,
5890 };
5891 }
5892 let joined = Self::join_with_plan(plan, union_plan);
5893 return Ok(apply_residual_filter(joined, node_residual_filter));
5894 }
5895
5896 let label_name = &node.labels[0];
5898
5899 if let Some(label_meta) = self.schema.get_label_case_insensitive(label_name) {
5901 let scan = LogicalPlan::Scan {
5903 label_id: label_meta.id,
5904 labels: node.labels.names().to_vec(),
5905 variable: variable.to_string(),
5906 filter: node_scan_filter,
5907 optional,
5908 };
5909
5910 let joined = Self::join_with_plan(plan, scan);
5911 Ok(apply_residual_filter(joined, node_residual_filter))
5912 } else {
5913 if let Some((virtual_id, _)) = self.allocate_virtual_label(label_name)? {
5921 let scan = LogicalPlan::Scan {
5922 label_id: virtual_id,
5923 labels: node.labels.names().to_vec(),
5924 variable: variable.to_string(),
5925 filter: node_scan_filter,
5926 optional,
5927 };
5928 let joined = Self::join_with_plan(plan, scan);
5929 return Ok(apply_residual_filter(joined, node_residual_filter));
5930 }
5931 if self.replacement_scans_enabled {
5932 return Err(anyhow!(
5933 "Label `{}` is not defined in schema and no \
5934 CatalogProvider or ReplacementScanProvider claimed it; \
5935 strict-mode (replacement_scans=true) requires the label \
5936 to resolve",
5937 label_name
5938 ));
5939 }
5940
5941 let scan_main = LogicalPlan::ScanMainByLabels {
5942 labels: node.labels.names().to_vec(),
5943 variable: variable.to_string(),
5944 filter: node_scan_filter,
5945 optional,
5946 };
5947
5948 let joined = Self::join_with_plan(plan, scan_main);
5949 Ok(apply_residual_filter(joined, node_residual_filter))
5950 }
5951 }
5952
5953 fn plan_where_clause(
5958 &self,
5959 predicate: &Expr,
5960 plan: LogicalPlan,
5961 vars_in_scope: &[VariableInfo],
5962 optional_vars: HashSet<String>,
5963 ) -> Result<LogicalPlan> {
5964 validate_no_aggregation_in_where(predicate)?;
5966
5967 validate_expression_variables(predicate, vars_in_scope)?;
5969
5970 validate_expression(predicate, vars_in_scope)?;
5972
5973 if let Expr::Variable(var_name) = predicate
5975 && let Some(info) = find_var_in_scope(vars_in_scope, var_name)
5976 && matches!(
5977 info.var_type,
5978 VariableType::Node | VariableType::Edge | VariableType::Path
5979 )
5980 {
5981 return Err(anyhow!(
5982 "SyntaxError: InvalidArgumentType - Type mismatch: expected Boolean but was {:?}",
5983 info.var_type
5984 ));
5985 }
5986
5987 let mut plan = plan;
5988
5989 let transformed_predicate = Self::transform_valid_at_to_function(predicate.clone());
5991
5992 let transformed_predicate = Self::rewrite_id_to_vid(transformed_predicate);
5994
5995 let mut current_predicate =
5996 self.rewrite_predicates_using_indexes(&transformed_predicate, &plan, vars_in_scope)?;
5997
5998 if let Some(extraction) = extract_vector_similarity(¤t_predicate) {
6000 let vs = &extraction.predicate;
6001 if Self::find_scan_label_id(&plan, &vs.variable).is_some() {
6002 plan = Self::replace_scan_with_knn(
6003 plan,
6004 &vs.variable,
6005 &vs.property,
6006 vs.query.clone(),
6007 vs.threshold,
6008 );
6009 if let Some(residual) = extraction.residual {
6010 current_predicate = residual;
6011 } else {
6012 current_predicate = Expr::TRUE;
6013 }
6014 }
6015 }
6016
6017 let conjuncts = Self::split_and_conjuncts(¤t_predicate);
6028 let mut keep: Vec<Expr> = Vec::with_capacity(conjuncts.len());
6029 for conj in conjuncts {
6030 let mut consumed = false;
6031 for var in vars_in_scope {
6032 if optional_vars.contains(&var.name) {
6033 continue;
6034 }
6035 if Self::is_scan_all_for(&plan, &var.name)
6037 && let Some(labels) = try_label_or_to_union(&conj, &var.name)
6038 {
6039 plan = self.replace_scan_all_with_label_union(plan, &var.name, &labels, false);
6040 consumed = true;
6041 break;
6042 }
6043 if let Some(types) = try_type_or_to_union(&conj, &var.name)
6045 && Self::merge_traverse_types_for(&plan, &var.name, &types).is_some()
6046 {
6047 let mut ids: Vec<u32> = Vec::with_capacity(types.len());
6048 let mut all_known = true;
6049 for t in &types {
6050 match self.schema.edge_types.get(t) {
6051 Some(meta) => ids.push(meta.id),
6052 None => {
6053 all_known = false;
6054 break;
6055 }
6056 }
6057 }
6058 if all_known {
6059 plan = Self::set_traverse_edge_type_ids(plan, &var.name, ids);
6060 consumed = true;
6061 break;
6062 }
6063 }
6064 }
6065 if !consumed {
6066 keep.push(conj);
6067 }
6068 }
6069 current_predicate = Self::combine_predicates(keep).unwrap_or(Expr::TRUE);
6070
6071 for var in vars_in_scope {
6076 if optional_vars.contains(&var.name) {
6078 continue;
6079 }
6080
6081 if Self::find_scan_label_id(&plan, &var.name).is_some() {
6083 let (pushable, residual) =
6084 Self::extract_variable_predicates(¤t_predicate, &var.name);
6085
6086 for pred in pushable {
6087 plan = Self::push_predicate_to_scan(plan, &var.name, pred);
6088 }
6089
6090 if let Some(r) = residual {
6091 current_predicate = r;
6092 } else {
6093 current_predicate = Expr::TRUE;
6094 }
6095 } else if Self::is_traverse_target(&plan, &var.name) {
6096 let (pushable, residual) =
6098 Self::extract_variable_predicates(¤t_predicate, &var.name);
6099
6100 for pred in pushable {
6101 plan = Self::push_predicate_to_traverse(plan, &var.name, pred);
6102 }
6103
6104 if let Some(r) = residual {
6105 current_predicate = r;
6106 } else {
6107 current_predicate = Expr::TRUE;
6108 }
6109 }
6110 }
6111
6112 plan = Self::push_predicates_to_apply(plan, &mut current_predicate);
6115
6116 if !current_predicate.is_true_literal() {
6118 plan = LogicalPlan::Filter {
6119 input: Box::new(plan),
6120 predicate: current_predicate,
6121 optional_variables: optional_vars,
6122 };
6123 }
6124
6125 Ok(plan)
6126 }
6127
6128 fn rewrite_predicates_using_indexes(
6129 &self,
6130 predicate: &Expr,
6131 plan: &LogicalPlan,
6132 vars_in_scope: &[VariableInfo],
6133 ) -> Result<Expr> {
6134 let mut rewritten = predicate.clone();
6135
6136 for var in vars_in_scope {
6137 if let Some(label_id) = Self::find_scan_label_id(plan, &var.name) {
6138 let label_name = self.schema.label_name_by_id(label_id).map(str::to_owned);
6140
6141 if let Some(label) = label_name
6142 && let Some(props) = self.schema.properties.get(&label)
6143 {
6144 for (gen_col, meta) in props {
6145 if meta.generation_expression.is_some() {
6146 if let Some(schema_expr) =
6148 self.gen_expr_cache.get(&(label.clone(), gen_col.clone()))
6149 {
6150 rewritten = Self::replace_expression(
6152 rewritten,
6153 schema_expr,
6154 &var.name,
6155 gen_col,
6156 );
6157 }
6158 }
6159 }
6160 }
6161 }
6162 }
6163 Ok(rewritten)
6164 }
6165
6166 fn replace_expression(expr: Expr, schema_expr: &Expr, query_var: &str, gen_col: &str) -> Expr {
6167 let schema_var = schema_expr.extract_variable();
6169
6170 if let Some(s_var) = schema_var {
6171 let target_expr = schema_expr.substitute_variable(&s_var, query_var);
6172
6173 if expr == target_expr {
6174 return Expr::Property(
6175 Box::new(Expr::Variable(query_var.to_string())),
6176 gen_col.to_string(),
6177 );
6178 }
6179 }
6180
6181 match expr {
6183 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
6184 left: Box::new(Self::replace_expression(
6185 *left,
6186 schema_expr,
6187 query_var,
6188 gen_col,
6189 )),
6190 op,
6191 right: Box::new(Self::replace_expression(
6192 *right,
6193 schema_expr,
6194 query_var,
6195 gen_col,
6196 )),
6197 },
6198 Expr::UnaryOp { op, expr } => Expr::UnaryOp {
6199 op,
6200 expr: Box::new(Self::replace_expression(
6201 *expr,
6202 schema_expr,
6203 query_var,
6204 gen_col,
6205 )),
6206 },
6207 Expr::FunctionCall {
6208 name,
6209 args,
6210 distinct,
6211 window_spec,
6212 } => Expr::FunctionCall {
6213 name,
6214 args: args
6215 .into_iter()
6216 .map(|a| Self::replace_expression(a, schema_expr, query_var, gen_col))
6217 .collect(),
6218 distinct,
6219 window_spec,
6220 },
6221 Expr::IsNull(expr) => Expr::IsNull(Box::new(Self::replace_expression(
6222 *expr,
6223 schema_expr,
6224 query_var,
6225 gen_col,
6226 ))),
6227 Expr::IsNotNull(expr) => Expr::IsNotNull(Box::new(Self::replace_expression(
6228 *expr,
6229 schema_expr,
6230 query_var,
6231 gen_col,
6232 ))),
6233 Expr::IsUnique(expr) => Expr::IsUnique(Box::new(Self::replace_expression(
6234 *expr,
6235 schema_expr,
6236 query_var,
6237 gen_col,
6238 ))),
6239 Expr::ArrayIndex {
6240 array: e,
6241 index: idx,
6242 } => Expr::ArrayIndex {
6243 array: Box::new(Self::replace_expression(
6244 *e,
6245 schema_expr,
6246 query_var,
6247 gen_col,
6248 )),
6249 index: Box::new(Self::replace_expression(
6250 *idx,
6251 schema_expr,
6252 query_var,
6253 gen_col,
6254 )),
6255 },
6256 Expr::ArraySlice { array, start, end } => Expr::ArraySlice {
6257 array: Box::new(Self::replace_expression(
6258 *array,
6259 schema_expr,
6260 query_var,
6261 gen_col,
6262 )),
6263 start: start.map(|s| {
6264 Box::new(Self::replace_expression(
6265 *s,
6266 schema_expr,
6267 query_var,
6268 gen_col,
6269 ))
6270 }),
6271 end: end.map(|e| {
6272 Box::new(Self::replace_expression(
6273 *e,
6274 schema_expr,
6275 query_var,
6276 gen_col,
6277 ))
6278 }),
6279 },
6280 Expr::List(exprs) => Expr::List(
6281 exprs
6282 .into_iter()
6283 .map(|e| Self::replace_expression(e, schema_expr, query_var, gen_col))
6284 .collect(),
6285 ),
6286 Expr::Map(entries) => Expr::Map(
6287 entries
6288 .into_iter()
6289 .map(|(k, v)| {
6290 (
6291 k,
6292 Self::replace_expression(v, schema_expr, query_var, gen_col),
6293 )
6294 })
6295 .collect(),
6296 ),
6297 Expr::Property(e, prop) => Expr::Property(
6298 Box::new(Self::replace_expression(
6299 *e,
6300 schema_expr,
6301 query_var,
6302 gen_col,
6303 )),
6304 prop,
6305 ),
6306 Expr::Case {
6307 expr: case_expr,
6308 when_then,
6309 else_expr,
6310 } => Expr::Case {
6311 expr: case_expr.map(|e| {
6312 Box::new(Self::replace_expression(
6313 *e,
6314 schema_expr,
6315 query_var,
6316 gen_col,
6317 ))
6318 }),
6319 when_then: when_then
6320 .into_iter()
6321 .map(|(w, t)| {
6322 (
6323 Self::replace_expression(w, schema_expr, query_var, gen_col),
6324 Self::replace_expression(t, schema_expr, query_var, gen_col),
6325 )
6326 })
6327 .collect(),
6328 else_expr: else_expr.map(|e| {
6329 Box::new(Self::replace_expression(
6330 *e,
6331 schema_expr,
6332 query_var,
6333 gen_col,
6334 ))
6335 }),
6336 },
6337 Expr::Reduce {
6338 accumulator,
6339 init,
6340 variable: reduce_var,
6341 list,
6342 expr: reduce_expr,
6343 } => Expr::Reduce {
6344 accumulator,
6345 init: Box::new(Self::replace_expression(
6346 *init,
6347 schema_expr,
6348 query_var,
6349 gen_col,
6350 )),
6351 variable: reduce_var,
6352 list: Box::new(Self::replace_expression(
6353 *list,
6354 schema_expr,
6355 query_var,
6356 gen_col,
6357 )),
6358 expr: Box::new(Self::replace_expression(
6359 *reduce_expr,
6360 schema_expr,
6361 query_var,
6362 gen_col,
6363 )),
6364 },
6365
6366 _ => expr,
6368 }
6369 }
6370
6371 fn is_scan_all_for(plan: &LogicalPlan, variable: &str) -> bool {
6377 match plan {
6378 LogicalPlan::ScanAll { variable: var, .. } => var == variable,
6379 LogicalPlan::Filter { input, .. }
6380 | LogicalPlan::Project { input, .. }
6381 | LogicalPlan::Sort { input, .. }
6382 | LogicalPlan::Limit { input, .. }
6383 | LogicalPlan::Aggregate { input, .. }
6384 | LogicalPlan::Apply { input, .. }
6385 | LogicalPlan::Traverse { input, .. } => Self::is_scan_all_for(input, variable),
6386 LogicalPlan::CrossJoin { left, right } => {
6387 Self::is_scan_all_for(left, variable) || Self::is_scan_all_for(right, variable)
6388 }
6389 LogicalPlan::Union { left, right, .. } => {
6390 Self::is_scan_all_for(left, variable) || Self::is_scan_all_for(right, variable)
6391 }
6392 _ => false,
6393 }
6394 }
6395
6396 fn replace_scan_all_with_label_union(
6401 &self,
6402 plan: LogicalPlan,
6403 variable: &str,
6404 labels: &[String],
6405 optional: bool,
6406 ) -> LogicalPlan {
6407 match plan {
6408 LogicalPlan::ScanAll {
6409 variable: var,
6410 filter,
6411 optional: scan_optional,
6412 } if var == variable => {
6413 let use_main_table_branches = !self.label_branches_share_property_schema(labels);
6419
6420 let mut branches: Vec<LogicalPlan> = Vec::with_capacity(labels.len());
6421 for label in labels {
6422 let branch = if use_main_table_branches {
6423 LogicalPlan::ScanMainByLabels {
6424 labels: vec![label.clone()],
6425 variable: variable.to_string(),
6426 filter: filter.clone(),
6427 optional: scan_optional || optional,
6428 }
6429 } else {
6430 let meta = self
6431 .schema
6432 .get_label_case_insensitive(label)
6433 .expect("share_property_schema true implies all labels in schema");
6434 LogicalPlan::Scan {
6435 label_id: meta.id,
6436 labels: vec![label.clone()],
6437 variable: variable.to_string(),
6438 filter: filter.clone(),
6439 optional: scan_optional || optional,
6440 }
6441 };
6442 branches.push(branch);
6443 }
6444 let mut iter = branches.into_iter();
6445 let mut union_plan = iter.next().expect("at least one label");
6446 for next in iter {
6447 union_plan = LogicalPlan::Union {
6448 left: Box::new(union_plan),
6449 right: Box::new(next),
6450 all: false,
6451 };
6452 }
6453 union_plan
6454 }
6455 LogicalPlan::Filter {
6456 input,
6457 predicate,
6458 optional_variables,
6459 } => LogicalPlan::Filter {
6460 input: Box::new(
6461 self.replace_scan_all_with_label_union(*input, variable, labels, optional),
6462 ),
6463 predicate,
6464 optional_variables,
6465 },
6466 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
6467 input: Box::new(
6468 self.replace_scan_all_with_label_union(*input, variable, labels, optional),
6469 ),
6470 projections,
6471 },
6472 LogicalPlan::CrossJoin { left, right } => {
6473 if Self::is_scan_all_for(&left, variable) {
6474 LogicalPlan::CrossJoin {
6475 left: Box::new(
6476 self.replace_scan_all_with_label_union(
6477 *left, variable, labels, optional,
6478 ),
6479 ),
6480 right,
6481 }
6482 } else {
6483 LogicalPlan::CrossJoin {
6484 left,
6485 right: Box::new(
6486 self.replace_scan_all_with_label_union(
6487 *right, variable, labels, optional,
6488 ),
6489 ),
6490 }
6491 }
6492 }
6493 LogicalPlan::Traverse {
6494 input,
6495 edge_type_ids,
6496 direction,
6497 source_variable,
6498 target_variable,
6499 target_label_id,
6500 step_variable,
6501 min_hops,
6502 max_hops,
6503 optional: trav_optional,
6504 target_filter,
6505 path_variable,
6506 edge_properties,
6507 is_variable_length,
6508 optional_pattern_vars,
6509 scope_match_variables,
6510 edge_filter_expr,
6511 path_mode,
6512 qpp_steps,
6513 } => LogicalPlan::Traverse {
6514 input: Box::new(
6515 self.replace_scan_all_with_label_union(*input, variable, labels, optional),
6516 ),
6517 edge_type_ids,
6518 direction,
6519 source_variable,
6520 target_variable,
6521 target_label_id,
6522 step_variable,
6523 min_hops,
6524 max_hops,
6525 optional: trav_optional,
6526 target_filter,
6527 path_variable,
6528 edge_properties,
6529 is_variable_length,
6530 optional_pattern_vars,
6531 scope_match_variables,
6532 edge_filter_expr,
6533 path_mode,
6534 qpp_steps,
6535 },
6536 other => other,
6537 }
6538 }
6539
6540 fn merge_traverse_types_for(
6545 plan: &LogicalPlan,
6546 edge_var: &str,
6547 _types: &[String],
6548 ) -> Option<()> {
6549 match plan {
6550 LogicalPlan::Traverse {
6551 step_variable,
6552 input,
6553 ..
6554 } => {
6555 if step_variable.as_deref() == Some(edge_var) {
6556 Some(())
6557 } else {
6558 Self::merge_traverse_types_for(input, edge_var, _types)
6559 }
6560 }
6561 LogicalPlan::Filter { input, .. }
6562 | LogicalPlan::Project { input, .. }
6563 | LogicalPlan::Sort { input, .. }
6564 | LogicalPlan::Limit { input, .. }
6565 | LogicalPlan::Aggregate { input, .. }
6566 | LogicalPlan::Apply { input, .. } => {
6567 Self::merge_traverse_types_for(input, edge_var, _types)
6568 }
6569 LogicalPlan::CrossJoin { left, right } | LogicalPlan::Union { left, right, .. } => {
6570 Self::merge_traverse_types_for(left, edge_var, _types)
6571 .or_else(|| Self::merge_traverse_types_for(right, edge_var, _types))
6572 }
6573 _ => None,
6574 }
6575 }
6576
6577 fn set_traverse_edge_type_ids(
6580 plan: LogicalPlan,
6581 edge_var: &str,
6582 new_ids: Vec<u32>,
6583 ) -> LogicalPlan {
6584 match plan {
6585 LogicalPlan::Traverse {
6586 input,
6587 edge_type_ids,
6588 direction,
6589 source_variable,
6590 target_variable,
6591 target_label_id,
6592 step_variable,
6593 min_hops,
6594 max_hops,
6595 optional,
6596 target_filter,
6597 path_variable,
6598 edge_properties,
6599 is_variable_length,
6600 optional_pattern_vars,
6601 scope_match_variables,
6602 edge_filter_expr,
6603 path_mode,
6604 qpp_steps,
6605 } => {
6606 let matches_var = step_variable.as_deref() == Some(edge_var);
6607 let recursed_input = if matches_var {
6608 input
6609 } else {
6610 Box::new(Self::set_traverse_edge_type_ids(
6611 *input,
6612 edge_var,
6613 new_ids.clone(),
6614 ))
6615 };
6616 LogicalPlan::Traverse {
6617 input: recursed_input,
6618 edge_type_ids: if matches_var { new_ids } else { edge_type_ids },
6619 direction,
6620 source_variable,
6621 target_variable,
6622 target_label_id,
6623 step_variable,
6624 min_hops,
6625 max_hops,
6626 optional,
6627 target_filter,
6628 path_variable,
6629 edge_properties,
6630 is_variable_length,
6631 optional_pattern_vars,
6632 scope_match_variables,
6633 edge_filter_expr,
6634 path_mode,
6635 qpp_steps,
6636 }
6637 }
6638 LogicalPlan::Filter {
6639 input,
6640 predicate,
6641 optional_variables,
6642 } => LogicalPlan::Filter {
6643 input: Box::new(Self::set_traverse_edge_type_ids(*input, edge_var, new_ids)),
6644 predicate,
6645 optional_variables,
6646 },
6647 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
6648 input: Box::new(Self::set_traverse_edge_type_ids(*input, edge_var, new_ids)),
6649 projections,
6650 },
6651 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
6652 left: Box::new(Self::set_traverse_edge_type_ids(
6653 *left,
6654 edge_var,
6655 new_ids.clone(),
6656 )),
6657 right: Box::new(Self::set_traverse_edge_type_ids(*right, edge_var, new_ids)),
6658 },
6659 other => other,
6660 }
6661 }
6662
6663 fn is_traverse_target(plan: &LogicalPlan, variable: &str) -> bool {
6665 match plan {
6666 LogicalPlan::Traverse {
6667 target_variable,
6668 input,
6669 ..
6670 } => target_variable == variable || Self::is_traverse_target(input, variable),
6671 LogicalPlan::Filter { input, .. }
6672 | LogicalPlan::Project { input, .. }
6673 | LogicalPlan::Sort { input, .. }
6674 | LogicalPlan::Limit { input, .. }
6675 | LogicalPlan::Aggregate { input, .. }
6676 | LogicalPlan::Apply { input, .. } => Self::is_traverse_target(input, variable),
6677 LogicalPlan::CrossJoin { left, right } => {
6678 Self::is_traverse_target(left, variable)
6679 || Self::is_traverse_target(right, variable)
6680 }
6681 _ => false,
6682 }
6683 }
6684
6685 fn push_predicate_to_traverse(
6687 plan: LogicalPlan,
6688 variable: &str,
6689 predicate: Expr,
6690 ) -> LogicalPlan {
6691 match plan {
6692 LogicalPlan::Traverse {
6693 input,
6694 edge_type_ids,
6695 direction,
6696 source_variable,
6697 target_variable,
6698 target_label_id,
6699 step_variable,
6700 min_hops,
6701 max_hops,
6702 optional,
6703 target_filter,
6704 path_variable,
6705 edge_properties,
6706 is_variable_length,
6707 optional_pattern_vars,
6708 scope_match_variables,
6709 edge_filter_expr,
6710 path_mode,
6711 qpp_steps,
6712 } => {
6713 if target_variable == variable {
6714 let new_filter = match target_filter {
6716 Some(existing) => Some(Expr::BinaryOp {
6717 left: Box::new(existing),
6718 op: BinaryOp::And,
6719 right: Box::new(predicate),
6720 }),
6721 None => Some(predicate),
6722 };
6723 LogicalPlan::Traverse {
6724 input,
6725 edge_type_ids,
6726 direction,
6727 source_variable,
6728 target_variable,
6729 target_label_id,
6730 step_variable,
6731 min_hops,
6732 max_hops,
6733 optional,
6734 target_filter: new_filter,
6735 path_variable,
6736 edge_properties,
6737 is_variable_length,
6738 optional_pattern_vars,
6739 scope_match_variables,
6740 edge_filter_expr,
6741 path_mode,
6742 qpp_steps,
6743 }
6744 } else {
6745 LogicalPlan::Traverse {
6747 input: Box::new(Self::push_predicate_to_traverse(
6748 *input, variable, predicate,
6749 )),
6750 edge_type_ids,
6751 direction,
6752 source_variable,
6753 target_variable,
6754 target_label_id,
6755 step_variable,
6756 min_hops,
6757 max_hops,
6758 optional,
6759 target_filter,
6760 path_variable,
6761 edge_properties,
6762 is_variable_length,
6763 optional_pattern_vars,
6764 scope_match_variables,
6765 edge_filter_expr,
6766 path_mode,
6767 qpp_steps,
6768 }
6769 }
6770 }
6771 LogicalPlan::Filter {
6772 input,
6773 predicate: p,
6774 optional_variables: opt_vars,
6775 } => LogicalPlan::Filter {
6776 input: Box::new(Self::push_predicate_to_traverse(
6777 *input, variable, predicate,
6778 )),
6779 predicate: p,
6780 optional_variables: opt_vars,
6781 },
6782 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
6783 input: Box::new(Self::push_predicate_to_traverse(
6784 *input, variable, predicate,
6785 )),
6786 projections,
6787 },
6788 LogicalPlan::CrossJoin { left, right } => {
6789 if Self::is_traverse_target(&left, variable) {
6791 LogicalPlan::CrossJoin {
6792 left: Box::new(Self::push_predicate_to_traverse(
6793 *left, variable, predicate,
6794 )),
6795 right,
6796 }
6797 } else {
6798 LogicalPlan::CrossJoin {
6799 left,
6800 right: Box::new(Self::push_predicate_to_traverse(
6801 *right, variable, predicate,
6802 )),
6803 }
6804 }
6805 }
6806 other => other,
6807 }
6808 }
6809
6810 fn plan_with_clause(
6812 &self,
6813 with_clause: &WithClause,
6814 plan: LogicalPlan,
6815 vars_in_scope: &[VariableInfo],
6816 ) -> Result<(LogicalPlan, Vec<VariableInfo>)> {
6817 let mut plan = plan;
6818 let mut group_by: Vec<Expr> = Vec::new();
6819 let mut aggregates: Vec<Expr> = Vec::new();
6820 let mut compound_agg_exprs: Vec<Expr> = Vec::new();
6821 let mut has_agg = false;
6822 let mut projections = Vec::new();
6823 let mut new_vars: Vec<VariableInfo> = Vec::new();
6824 let mut projected_aggregate_reprs: HashSet<String> = HashSet::new();
6825 let mut projected_simple_reprs: HashSet<String> = HashSet::new();
6826 let mut projected_aliases: HashSet<String> = HashSet::new();
6827 let mut has_unaliased_non_variable_expr = false;
6828
6829 for item in &with_clause.items {
6830 match item {
6831 ReturnItem::All => {
6832 for v in vars_in_scope {
6834 projections.push((Expr::Variable(v.name.clone()), Some(v.name.clone())));
6835 projected_aliases.insert(v.name.clone());
6836 projected_simple_reprs.insert(v.name.clone());
6837 }
6838 new_vars.extend(vars_in_scope.iter().cloned());
6839 }
6840 ReturnItem::Expr { expr, alias, .. } => {
6841 if matches!(expr, Expr::Wildcard) {
6842 for v in vars_in_scope {
6843 projections
6844 .push((Expr::Variable(v.name.clone()), Some(v.name.clone())));
6845 projected_aliases.insert(v.name.clone());
6846 projected_simple_reprs.insert(v.name.clone());
6847 }
6848 new_vars.extend(vars_in_scope.iter().cloned());
6849 } else {
6850 validate_expression_variables(expr, vars_in_scope)?;
6852 validate_expression(expr, vars_in_scope)?;
6853 if contains_pattern_predicate(expr) {
6855 return Err(anyhow!(
6856 "SyntaxError: UnexpectedSyntax - Pattern predicates are not allowed in WITH"
6857 ));
6858 }
6859
6860 projections.push((expr.clone(), alias.clone()));
6861 if expr.is_aggregate() && !is_compound_aggregate(expr) {
6862 has_agg = true;
6864 aggregates.push(expr.clone());
6865 projected_aggregate_reprs.insert(expr.to_string_repr());
6866 } else if !is_window_function(expr)
6867 && (expr.is_aggregate() || contains_aggregate_recursive(expr))
6868 {
6869 has_agg = true;
6871 compound_agg_exprs.push(expr.clone());
6872 for inner in extract_inner_aggregates(expr) {
6873 let repr = inner.to_string_repr();
6874 if !projected_aggregate_reprs.contains(&repr) {
6875 aggregates.push(inner);
6876 projected_aggregate_reprs.insert(repr);
6877 }
6878 }
6879 } else if !group_by.contains(expr) {
6880 group_by.push(expr.clone());
6881 if matches!(expr, Expr::Variable(_) | Expr::Property(_, _)) {
6882 projected_simple_reprs.insert(expr.to_string_repr());
6883 }
6884 }
6885
6886 if let Some(a) = alias {
6889 if projected_aliases.contains(a) {
6890 return Err(anyhow!(
6891 "SyntaxError: ColumnNameConflict - Duplicate column name '{}' in WITH",
6892 a
6893 ));
6894 }
6895 let inferred = infer_with_output_type(expr, vars_in_scope);
6896 new_vars.push(VariableInfo::new(a.clone(), inferred));
6897 projected_aliases.insert(a.clone());
6898 } else if let Expr::Variable(v) = expr {
6899 if projected_aliases.contains(v) {
6900 return Err(anyhow!(
6901 "SyntaxError: ColumnNameConflict - Duplicate column name '{}' in WITH",
6902 v
6903 ));
6904 }
6905 if let Some(existing) = find_var_in_scope(vars_in_scope, v) {
6907 new_vars.push(existing.clone());
6908 } else {
6909 new_vars.push(VariableInfo::new(v.clone(), VariableType::Scalar));
6910 }
6911 projected_aliases.insert(v.clone());
6912 } else {
6913 has_unaliased_non_variable_expr = true;
6914 }
6915 }
6916 }
6917 }
6918 }
6919
6920 let projected_names: HashSet<&str> = new_vars.iter().map(|v| v.name.as_str()).collect();
6923 let mut passthrough_extras: Vec<String> = Vec::new();
6924 let mut seen_passthrough: HashSet<String> = HashSet::new();
6925
6926 if let Some(predicate) = &with_clause.where_clause {
6927 for name in collect_expr_variables(predicate) {
6928 if !projected_names.contains(name.as_str())
6929 && find_var_in_scope(vars_in_scope, &name).is_some()
6930 && seen_passthrough.insert(name.clone())
6931 {
6932 passthrough_extras.push(name);
6933 }
6934 }
6935 }
6936
6937 if !has_agg && let Some(order_by) = &with_clause.order_by {
6940 for item in order_by {
6941 for name in collect_expr_variables(&item.expr) {
6942 if !projected_names.contains(name.as_str())
6943 && find_var_in_scope(vars_in_scope, &name).is_some()
6944 && seen_passthrough.insert(name.clone())
6945 {
6946 passthrough_extras.push(name);
6947 }
6948 }
6949 }
6950 }
6951
6952 let needs_cleanup = !passthrough_extras.is_empty();
6953 for extra in &passthrough_extras {
6954 projections.push((Expr::Variable(extra.clone()), Some(extra.clone())));
6955 }
6956
6957 if has_agg {
6960 let group_by_reprs: HashSet<String> =
6961 group_by.iter().map(|e| e.to_string_repr()).collect();
6962 for expr in &compound_agg_exprs {
6963 let mut refs = Vec::new();
6964 collect_non_aggregate_refs(expr, false, &mut refs);
6965 for r in &refs {
6966 let is_covered = match r {
6967 NonAggregateRef::Var(v) => group_by_reprs.contains(v),
6968 NonAggregateRef::Property { repr, .. } => group_by_reprs.contains(repr),
6969 };
6970 if !is_covered {
6971 return Err(anyhow!(
6972 "SyntaxError: AmbiguousAggregationExpression - Expression mixes aggregation with non-grouped reference"
6973 ));
6974 }
6975 }
6976 }
6977 }
6978
6979 if has_agg {
6980 plan = LogicalPlan::Aggregate {
6981 input: Box::new(plan),
6982 group_by,
6983 aggregates,
6984 };
6985
6986 let rename_projections: Vec<(Expr, Option<String>)> = projections
6989 .iter()
6990 .map(|(expr, alias)| {
6991 if expr.is_aggregate() && !is_compound_aggregate(expr) {
6992 (Expr::Variable(aggregate_column_name(expr)), alias.clone())
6994 } else if is_compound_aggregate(expr)
6995 || (!expr.is_aggregate() && contains_aggregate_recursive(expr))
6996 {
6997 (replace_aggregates_with_columns(expr), alias.clone())
7000 } else {
7001 (Expr::Variable(expr.to_string_repr()), alias.clone())
7002 }
7003 })
7004 .collect();
7005 plan = LogicalPlan::Project {
7006 input: Box::new(plan),
7007 projections: rename_projections,
7008 };
7009 } else if !projections.is_empty() {
7010 plan = LogicalPlan::Project {
7011 input: Box::new(plan),
7012 projections: projections.clone(),
7013 };
7014 }
7015
7016 if let Some(predicate) = &with_clause.where_clause {
7018 plan = LogicalPlan::Filter {
7019 input: Box::new(plan),
7020 predicate: predicate.clone(),
7021 optional_variables: HashSet::new(),
7022 };
7023 }
7024
7025 if let Some(order_by) = &with_clause.order_by {
7029 let with_order_aliases: HashMap<String, Expr> = projections
7032 .iter()
7033 .flat_map(|(expr, alias)| {
7034 let output_col = if let Some(a) = alias {
7035 a.clone()
7036 } else if expr.is_aggregate() && !is_compound_aggregate(expr) {
7037 aggregate_column_name(expr)
7038 } else {
7039 expr.to_string_repr()
7040 };
7041
7042 let mut entries = Vec::new();
7043 if let Some(a) = alias {
7045 entries.push((a.clone(), Expr::Variable(output_col.clone())));
7046 }
7047 entries.push((expr.to_string_repr(), Expr::Variable(output_col)));
7049 entries
7050 })
7051 .collect();
7052
7053 let order_by_scope: Vec<VariableInfo> = {
7054 let mut scope = new_vars.clone();
7055 for v in vars_in_scope {
7056 if !is_var_in_scope(&scope, &v.name) {
7057 scope.push(v.clone());
7058 }
7059 }
7060 scope
7061 };
7062 for item in order_by {
7063 validate_expression_variables(&item.expr, &order_by_scope)?;
7064 validate_expression(&item.expr, &order_by_scope)?;
7065 let has_aggregate_in_item = contains_aggregate_recursive(&item.expr);
7066 if has_aggregate_in_item && !has_agg {
7067 return Err(anyhow!(
7068 "SyntaxError: InvalidAggregation - Aggregation functions not allowed in ORDER BY of WITH"
7069 ));
7070 }
7071 if has_agg && has_aggregate_in_item {
7072 validate_with_order_by_aggregate_item(
7073 &item.expr,
7074 &projected_aggregate_reprs,
7075 &projected_simple_reprs,
7076 &projected_aliases,
7077 )?;
7078 }
7079 }
7080 let rewritten_order_by: Vec<SortItem> = order_by
7081 .iter()
7082 .map(|item| {
7083 let mut expr =
7084 rewrite_order_by_expr_with_aliases(&item.expr, &with_order_aliases);
7085 if has_agg {
7086 expr = replace_aggregates_with_columns(&expr);
7089 expr = rewrite_order_by_expr_with_aliases(&expr, &with_order_aliases);
7092 }
7093 SortItem {
7094 expr,
7095 ascending: item.ascending,
7096 }
7097 })
7098 .collect();
7099 plan = LogicalPlan::Sort {
7100 input: Box::new(plan),
7101 order_by: rewritten_order_by,
7102 };
7103 }
7104
7105 if has_unaliased_non_variable_expr {
7110 return Err(anyhow!(
7111 "SyntaxError: NoExpressionAlias - All non-variable expressions in WITH must be aliased"
7112 ));
7113 }
7114
7115 let skip = with_clause
7117 .skip
7118 .as_ref()
7119 .map(|e| parse_non_negative_integer(e, "SKIP", &self.params))
7120 .transpose()?
7121 .flatten();
7122 let fetch = with_clause
7123 .limit
7124 .as_ref()
7125 .map(|e| parse_non_negative_integer(e, "LIMIT", &self.params))
7126 .transpose()?
7127 .flatten();
7128
7129 if skip.is_some() || fetch.is_some() {
7130 plan = LogicalPlan::Limit {
7131 input: Box::new(plan),
7132 skip,
7133 fetch,
7134 };
7135 }
7136
7137 if needs_cleanup {
7139 let cleanup_projections: Vec<(Expr, Option<String>)> = new_vars
7140 .iter()
7141 .map(|v| (Expr::Variable(v.name.clone()), Some(v.name.clone())))
7142 .collect();
7143 plan = LogicalPlan::Project {
7144 input: Box::new(plan),
7145 projections: cleanup_projections,
7146 };
7147 }
7148
7149 if with_clause.distinct {
7150 plan = LogicalPlan::Distinct {
7151 input: Box::new(plan),
7152 };
7153 }
7154
7155 Ok((plan, new_vars))
7156 }
7157
7158 fn plan_with_recursive(
7159 &self,
7160 with_recursive: &WithRecursiveClause,
7161 _prev_plan: LogicalPlan,
7162 vars_in_scope: &[VariableInfo],
7163 ) -> Result<LogicalPlan> {
7164 match &*with_recursive.query {
7166 Query::Union { left, right, .. } => {
7167 let initial_plan = self.rewrite_and_plan_typed(*left.clone(), vars_in_scope)?;
7169
7170 let mut recursive_scope = vars_in_scope.to_vec();
7173 recursive_scope.push(VariableInfo::new(
7174 with_recursive.name.clone(),
7175 VariableType::Scalar,
7176 ));
7177 let recursive_plan =
7178 self.rewrite_and_plan_typed(*right.clone(), &recursive_scope)?;
7179
7180 Ok(LogicalPlan::RecursiveCTE {
7181 cte_name: with_recursive.name.clone(),
7182 initial: Box::new(initial_plan),
7183 recursive: Box::new(recursive_plan),
7184 })
7185 }
7186 _ => Err(anyhow::anyhow!(
7187 "WITH RECURSIVE requires a UNION query with anchor and recursive parts"
7188 )),
7189 }
7190 }
7191
7192 pub fn properties_to_expr(&self, variable: &str, properties: &Option<Expr>) -> Option<Expr> {
7193 let entries = match properties {
7194 Some(Expr::Map(entries)) => entries,
7195 _ => return None,
7196 };
7197
7198 if entries.is_empty() {
7199 return None;
7200 }
7201 let mut final_expr = None;
7202 for (prop, val_expr) in entries {
7203 let eq_expr = Expr::BinaryOp {
7204 left: Box::new(Expr::Property(
7205 Box::new(Expr::Variable(variable.to_string())),
7206 prop.clone(),
7207 )),
7208 op: BinaryOp::Eq,
7209 right: Box::new(val_expr.clone()),
7210 };
7211
7212 if let Some(e) = final_expr {
7213 final_expr = Some(Expr::BinaryOp {
7214 left: Box::new(e),
7215 op: BinaryOp::And,
7216 right: Box::new(eq_expr),
7217 });
7218 } else {
7219 final_expr = Some(eq_expr);
7220 }
7221 }
7222 final_expr
7223 }
7224
7225 pub fn node_filter_expr(
7230 &self,
7231 variable: &str,
7232 labels: &[String],
7233 properties: &Option<Expr>,
7234 ) -> Option<Expr> {
7235 let mut final_expr = None;
7236
7237 for label in labels {
7239 let label_check = Expr::FunctionCall {
7240 name: "hasLabel".to_string(),
7241 args: vec![
7242 Expr::Variable(variable.to_string()),
7243 Expr::Literal(CypherLiteral::String(label.clone())),
7244 ],
7245 distinct: false,
7246 window_spec: None,
7247 };
7248
7249 final_expr = match final_expr {
7250 Some(e) => Some(Expr::BinaryOp {
7251 left: Box::new(e),
7252 op: BinaryOp::And,
7253 right: Box::new(label_check),
7254 }),
7255 None => Some(label_check),
7256 };
7257 }
7258
7259 if let Some(prop_expr) = self.properties_to_expr(variable, properties) {
7261 final_expr = match final_expr {
7262 Some(e) => Some(Expr::BinaryOp {
7263 left: Box::new(e),
7264 op: BinaryOp::And,
7265 right: Box::new(prop_expr),
7266 }),
7267 None => Some(prop_expr),
7268 };
7269 }
7270
7271 final_expr
7272 }
7273
7274 fn wrap_with_bound_target_filter(plan: LogicalPlan, target_variable: &str) -> LogicalPlan {
7279 let bound_check = Expr::BinaryOp {
7285 left: Box::new(Expr::Property(
7286 Box::new(Expr::Variable(target_variable.to_string())),
7287 "_vid".to_string(),
7288 )),
7289 op: BinaryOp::Eq,
7290 right: Box::new(Expr::Variable(format!("{}._vid", target_variable))),
7291 };
7292 LogicalPlan::Filter {
7293 input: Box::new(plan),
7294 predicate: bound_check,
7295 optional_variables: HashSet::new(),
7296 }
7297 }
7298
7299 fn replace_scan_with_knn(
7301 plan: LogicalPlan,
7302 variable: &str,
7303 property: &str,
7304 query: Expr,
7305 threshold: Option<f32>,
7306 ) -> LogicalPlan {
7307 match plan {
7308 LogicalPlan::Scan {
7309 label_id,
7310 labels,
7311 variable: scan_var,
7312 filter,
7313 optional,
7314 } => {
7315 if scan_var == variable {
7316 let knn = LogicalPlan::VectorKnn {
7324 label_id,
7325 variable: variable.to_string(),
7326 property: property.to_string(),
7327 query,
7328 k: 100, threshold,
7330 };
7331
7332 if let Some(f) = filter {
7333 LogicalPlan::Filter {
7334 input: Box::new(knn),
7335 predicate: f,
7336 optional_variables: HashSet::new(),
7337 }
7338 } else {
7339 knn
7340 }
7341 } else {
7342 LogicalPlan::Scan {
7343 label_id,
7344 labels,
7345 variable: scan_var,
7346 filter,
7347 optional,
7348 }
7349 }
7350 }
7351 LogicalPlan::Filter {
7352 input,
7353 predicate,
7354 optional_variables,
7355 } => LogicalPlan::Filter {
7356 input: Box::new(Self::replace_scan_with_knn(
7357 *input, variable, property, query, threshold,
7358 )),
7359 predicate,
7360 optional_variables,
7361 },
7362 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
7363 input: Box::new(Self::replace_scan_with_knn(
7364 *input, variable, property, query, threshold,
7365 )),
7366 projections,
7367 },
7368 LogicalPlan::Limit { input, skip, fetch } => {
7369 LogicalPlan::Limit {
7374 input: Box::new(Self::replace_scan_with_knn(
7375 *input, variable, property, query, threshold,
7376 )),
7377 skip,
7378 fetch,
7379 }
7380 }
7381 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
7382 left: Box::new(Self::replace_scan_with_knn(
7383 *left,
7384 variable,
7385 property,
7386 query.clone(),
7387 threshold,
7388 )),
7389 right: Box::new(Self::replace_scan_with_knn(
7390 *right, variable, property, query, threshold,
7391 )),
7392 },
7393 other => other,
7394 }
7395 }
7396
7397 fn find_scan_label_id(plan: &LogicalPlan, variable: &str) -> Option<u16> {
7399 match plan {
7400 LogicalPlan::Scan {
7401 label_id,
7402 variable: var,
7403 ..
7404 } if var == variable => Some(*label_id),
7405 LogicalPlan::ScanAll { variable: var, .. } if var == variable => Some(0),
7406 LogicalPlan::Filter { input, .. }
7407 | LogicalPlan::Project { input, .. }
7408 | LogicalPlan::Sort { input, .. }
7409 | LogicalPlan::Limit { input, .. }
7410 | LogicalPlan::Aggregate { input, .. }
7411 | LogicalPlan::Apply { input, .. } => Self::find_scan_label_id(input, variable),
7412 LogicalPlan::CrossJoin { left, right } => Self::find_scan_label_id(left, variable)
7413 .or_else(|| Self::find_scan_label_id(right, variable)),
7414 LogicalPlan::Traverse { input, .. } => Self::find_scan_label_id(input, variable),
7415 _ => None,
7416 }
7417 }
7418
7419 fn push_predicate_to_scan(plan: LogicalPlan, variable: &str, predicate: Expr) -> LogicalPlan {
7421 match plan {
7422 LogicalPlan::Scan {
7423 label_id,
7424 labels,
7425 variable: var,
7426 filter,
7427 optional,
7428 } if var == variable => {
7429 let new_filter = match filter {
7431 Some(existing) => Some(Expr::BinaryOp {
7432 left: Box::new(existing),
7433 op: BinaryOp::And,
7434 right: Box::new(predicate),
7435 }),
7436 None => Some(predicate),
7437 };
7438 LogicalPlan::Scan {
7439 label_id,
7440 labels,
7441 variable: var,
7442 filter: new_filter,
7443 optional,
7444 }
7445 }
7446 LogicalPlan::ScanAll {
7447 variable: var,
7448 filter,
7449 optional,
7450 } if var == variable => {
7451 let new_filter = match filter {
7452 Some(existing) => Some(Expr::BinaryOp {
7453 left: Box::new(existing),
7454 op: BinaryOp::And,
7455 right: Box::new(predicate),
7456 }),
7457 None => Some(predicate),
7458 };
7459 LogicalPlan::ScanAll {
7460 variable: var,
7461 filter: new_filter,
7462 optional,
7463 }
7464 }
7465 LogicalPlan::Filter {
7466 input,
7467 predicate: p,
7468 optional_variables: opt_vars,
7469 } => LogicalPlan::Filter {
7470 input: Box::new(Self::push_predicate_to_scan(*input, variable, predicate)),
7471 predicate: p,
7472 optional_variables: opt_vars,
7473 },
7474 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
7475 input: Box::new(Self::push_predicate_to_scan(*input, variable, predicate)),
7476 projections,
7477 },
7478 LogicalPlan::CrossJoin { left, right } => {
7479 if Self::find_scan_label_id(&left, variable).is_some() {
7481 LogicalPlan::CrossJoin {
7482 left: Box::new(Self::push_predicate_to_scan(*left, variable, predicate)),
7483 right,
7484 }
7485 } else {
7486 LogicalPlan::CrossJoin {
7487 left,
7488 right: Box::new(Self::push_predicate_to_scan(*right, variable, predicate)),
7489 }
7490 }
7491 }
7492 LogicalPlan::Traverse {
7493 input,
7494 edge_type_ids,
7495 direction,
7496 source_variable,
7497 target_variable,
7498 target_label_id,
7499 step_variable,
7500 min_hops,
7501 max_hops,
7502 optional,
7503 target_filter,
7504 path_variable,
7505 edge_properties,
7506 is_variable_length,
7507 optional_pattern_vars,
7508 scope_match_variables,
7509 edge_filter_expr,
7510 path_mode,
7511 qpp_steps,
7512 } => LogicalPlan::Traverse {
7513 input: Box::new(Self::push_predicate_to_scan(*input, variable, predicate)),
7514 edge_type_ids,
7515 direction,
7516 source_variable,
7517 target_variable,
7518 target_label_id,
7519 step_variable,
7520 min_hops,
7521 max_hops,
7522 optional,
7523 target_filter,
7524 path_variable,
7525 edge_properties,
7526 is_variable_length,
7527 optional_pattern_vars,
7528 scope_match_variables,
7529 edge_filter_expr,
7530 path_mode,
7531 qpp_steps,
7532 },
7533 other => other,
7534 }
7535 }
7536
7537 fn extract_variable_predicates(predicate: &Expr, variable: &str) -> (Vec<Expr>, Option<Expr>) {
7539 let analyzer = PredicateAnalyzer::new();
7540 let analysis = analyzer.analyze(predicate, variable);
7541
7542 let residual = if analysis.residual.is_empty() {
7544 None
7545 } else {
7546 let mut iter = analysis.residual.into_iter();
7547 let first = iter.next().unwrap();
7548 Some(iter.fold(first, |acc, e| Expr::BinaryOp {
7549 left: Box::new(acc),
7550 op: BinaryOp::And,
7551 right: Box::new(e),
7552 }))
7553 };
7554
7555 (analysis.pushable, residual)
7556 }
7557
7558 fn split_and_conjuncts(expr: &Expr) -> Vec<Expr> {
7564 match expr {
7565 Expr::BinaryOp {
7566 left,
7567 op: BinaryOp::And,
7568 right,
7569 } => {
7570 let mut result = Self::split_and_conjuncts(left);
7571 result.extend(Self::split_and_conjuncts(right));
7572 result
7573 }
7574 _ => vec![expr.clone()],
7575 }
7576 }
7577
7578 fn combine_predicates(predicates: Vec<Expr>) -> Option<Expr> {
7580 if predicates.is_empty() {
7581 return None;
7582 }
7583 let mut result = predicates[0].clone();
7584 for pred in predicates.iter().skip(1) {
7585 result = Expr::BinaryOp {
7586 left: Box::new(result),
7587 op: BinaryOp::And,
7588 right: Box::new(pred.clone()),
7589 };
7590 }
7591 Some(result)
7592 }
7593
7594 fn collect_expr_variables(expr: &Expr) -> HashSet<String> {
7596 let mut vars = HashSet::new();
7597 Self::collect_expr_variables_impl(expr, &mut vars);
7598 vars
7599 }
7600
7601 fn collect_expr_variables_impl(expr: &Expr, vars: &mut HashSet<String>) {
7602 match expr {
7603 Expr::Variable(name) => {
7604 vars.insert(name.clone());
7605 }
7606 Expr::Property(inner, _) => {
7607 if let Expr::Variable(name) = inner.as_ref() {
7608 vars.insert(name.clone());
7609 } else {
7610 Self::collect_expr_variables_impl(inner, vars);
7611 }
7612 }
7613 Expr::BinaryOp { left, right, .. } => {
7614 Self::collect_expr_variables_impl(left, vars);
7615 Self::collect_expr_variables_impl(right, vars);
7616 }
7617 Expr::UnaryOp { expr, .. } => Self::collect_expr_variables_impl(expr, vars),
7618 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::collect_expr_variables_impl(e, vars),
7619 Expr::FunctionCall { args, .. } => {
7620 for arg in args {
7621 Self::collect_expr_variables_impl(arg, vars);
7622 }
7623 }
7624 Expr::List(items) => {
7625 for item in items {
7626 Self::collect_expr_variables_impl(item, vars);
7627 }
7628 }
7629 Expr::Case {
7630 expr,
7631 when_then,
7632 else_expr,
7633 } => {
7634 if let Some(e) = expr {
7635 Self::collect_expr_variables_impl(e, vars);
7636 }
7637 for (w, t) in when_then {
7638 Self::collect_expr_variables_impl(w, vars);
7639 Self::collect_expr_variables_impl(t, vars);
7640 }
7641 if let Some(e) = else_expr {
7642 Self::collect_expr_variables_impl(e, vars);
7643 }
7644 }
7645 Expr::LabelCheck { expr, .. } => Self::collect_expr_variables_impl(expr, vars),
7646 _ => {}
7649 }
7650 }
7651
7652 fn collect_plan_variables(plan: &LogicalPlan) -> HashSet<String> {
7654 let mut vars = HashSet::new();
7655 Self::collect_plan_variables_impl(plan, &mut vars);
7656 vars
7657 }
7658
7659 fn collect_plan_variables_impl(plan: &LogicalPlan, vars: &mut HashSet<String>) {
7660 match plan {
7661 LogicalPlan::Scan { variable, .. } => {
7662 vars.insert(variable.clone());
7663 }
7664 LogicalPlan::Traverse {
7665 target_variable,
7666 step_variable,
7667 input,
7668 path_variable,
7669 ..
7670 } => {
7671 vars.insert(target_variable.clone());
7672 if let Some(sv) = step_variable {
7673 vars.insert(sv.clone());
7674 }
7675 if let Some(pv) = path_variable {
7676 vars.insert(pv.clone());
7677 }
7678 Self::collect_plan_variables_impl(input, vars);
7679 }
7680 LogicalPlan::Filter { input, .. } => Self::collect_plan_variables_impl(input, vars),
7681 LogicalPlan::Project { input, projections } => {
7682 for (expr, alias) in projections {
7683 if let Some(a) = alias {
7684 vars.insert(a.clone());
7685 } else if let Expr::Variable(v) = expr {
7686 vars.insert(v.clone());
7687 }
7688 }
7689 Self::collect_plan_variables_impl(input, vars);
7690 }
7691 LogicalPlan::Apply {
7692 input, subquery, ..
7693 } => {
7694 Self::collect_plan_variables_impl(input, vars);
7695 Self::collect_plan_variables_impl(subquery, vars);
7696 }
7697 LogicalPlan::CrossJoin { left, right } => {
7698 Self::collect_plan_variables_impl(left, vars);
7699 Self::collect_plan_variables_impl(right, vars);
7700 }
7701 LogicalPlan::Unwind {
7702 input, variable, ..
7703 } => {
7704 vars.insert(variable.clone());
7705 Self::collect_plan_variables_impl(input, vars);
7706 }
7707 LogicalPlan::Aggregate { input, .. } => {
7708 Self::collect_plan_variables_impl(input, vars);
7709 }
7710 LogicalPlan::Distinct { input } => {
7711 Self::collect_plan_variables_impl(input, vars);
7712 }
7713 LogicalPlan::Sort { input, .. } => {
7714 Self::collect_plan_variables_impl(input, vars);
7715 }
7716 LogicalPlan::Limit { input, .. } => {
7717 Self::collect_plan_variables_impl(input, vars);
7718 }
7719 LogicalPlan::VectorKnn { variable, .. } => {
7720 vars.insert(variable.clone());
7721 }
7722 LogicalPlan::ProcedureCall { yield_items, .. } => {
7723 for (name, alias) in yield_items {
7724 vars.insert(alias.clone().unwrap_or_else(|| name.clone()));
7725 }
7726 }
7727 LogicalPlan::ShortestPath {
7728 input,
7729 path_variable,
7730 ..
7731 } => {
7732 vars.insert(path_variable.clone());
7733 Self::collect_plan_variables_impl(input, vars);
7734 }
7735 LogicalPlan::AllShortestPaths {
7736 input,
7737 path_variable,
7738 ..
7739 } => {
7740 vars.insert(path_variable.clone());
7741 Self::collect_plan_variables_impl(input, vars);
7742 }
7743 LogicalPlan::RecursiveCTE {
7744 initial, recursive, ..
7745 } => {
7746 Self::collect_plan_variables_impl(initial, vars);
7747 Self::collect_plan_variables_impl(recursive, vars);
7748 }
7749 LogicalPlan::SubqueryCall {
7750 input, subquery, ..
7751 } => {
7752 Self::collect_plan_variables_impl(input, vars);
7753 Self::collect_plan_variables_impl(subquery, vars);
7754 }
7755 _ => {}
7756 }
7757 }
7758
7759 fn extract_apply_input_predicates(
7762 predicate: &Expr,
7763 input_variables: &HashSet<String>,
7764 subquery_new_variables: &HashSet<String>,
7765 ) -> (Vec<Expr>, Vec<Expr>) {
7766 let conjuncts = Self::split_and_conjuncts(predicate);
7767 let mut input_preds = Vec::new();
7768 let mut remaining = Vec::new();
7769
7770 for conj in conjuncts {
7771 let vars = Self::collect_expr_variables(&conj);
7772
7773 let refs_input_only = vars.iter().all(|v| input_variables.contains(v));
7775 let refs_any_subquery = vars.iter().any(|v| subquery_new_variables.contains(v));
7776
7777 if refs_input_only && !refs_any_subquery && !vars.is_empty() {
7778 input_preds.push(conj);
7779 } else {
7780 remaining.push(conj);
7781 }
7782 }
7783
7784 (input_preds, remaining)
7785 }
7786
7787 fn push_predicates_to_apply(plan: LogicalPlan, current_predicate: &mut Expr) -> LogicalPlan {
7790 match plan {
7791 LogicalPlan::Apply {
7792 input,
7793 subquery,
7794 input_filter,
7795 } => {
7796 let input_vars = Self::collect_plan_variables(&input);
7798
7799 let subquery_vars = Self::collect_plan_variables(&subquery);
7801 let new_subquery_vars: HashSet<String> =
7802 subquery_vars.difference(&input_vars).cloned().collect();
7803
7804 let (input_preds, remaining) = Self::extract_apply_input_predicates(
7806 current_predicate,
7807 &input_vars,
7808 &new_subquery_vars,
7809 );
7810
7811 *current_predicate = if remaining.is_empty() {
7813 Expr::TRUE
7814 } else {
7815 Self::combine_predicates(remaining).unwrap()
7816 };
7817
7818 let new_input_filter = if input_preds.is_empty() {
7820 input_filter
7821 } else {
7822 let extracted = Self::combine_predicates(input_preds).unwrap();
7823 match input_filter {
7824 Some(existing) => Some(Expr::BinaryOp {
7825 left: Box::new(existing),
7826 op: BinaryOp::And,
7827 right: Box::new(extracted),
7828 }),
7829 None => Some(extracted),
7830 }
7831 };
7832
7833 let new_input = Self::push_predicates_to_apply(*input, current_predicate);
7835
7836 LogicalPlan::Apply {
7837 input: Box::new(new_input),
7838 subquery,
7839 input_filter: new_input_filter,
7840 }
7841 }
7842 LogicalPlan::Filter {
7844 input,
7845 predicate,
7846 optional_variables,
7847 } => LogicalPlan::Filter {
7848 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7849 predicate,
7850 optional_variables,
7851 },
7852 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
7853 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7854 projections,
7855 },
7856 LogicalPlan::Sort { input, order_by } => LogicalPlan::Sort {
7857 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7858 order_by,
7859 },
7860 LogicalPlan::Limit { input, skip, fetch } => LogicalPlan::Limit {
7861 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7862 skip,
7863 fetch,
7864 },
7865 LogicalPlan::Aggregate {
7866 input,
7867 group_by,
7868 aggregates,
7869 } => LogicalPlan::Aggregate {
7870 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7871 group_by,
7872 aggregates,
7873 },
7874 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
7875 left: Box::new(Self::push_predicates_to_apply(*left, current_predicate)),
7876 right: Box::new(Self::push_predicates_to_apply(*right, current_predicate)),
7877 },
7878 LogicalPlan::Traverse {
7879 input,
7880 edge_type_ids,
7881 direction,
7882 source_variable,
7883 target_variable,
7884 target_label_id,
7885 step_variable,
7886 min_hops,
7887 max_hops,
7888 optional,
7889 target_filter,
7890 path_variable,
7891 edge_properties,
7892 is_variable_length,
7893 optional_pattern_vars,
7894 scope_match_variables,
7895 edge_filter_expr,
7896 path_mode,
7897 qpp_steps,
7898 } => LogicalPlan::Traverse {
7899 input: Box::new(Self::push_predicates_to_apply(*input, current_predicate)),
7900 edge_type_ids,
7901 direction,
7902 source_variable,
7903 target_variable,
7904 target_label_id,
7905 step_variable,
7906 min_hops,
7907 max_hops,
7908 optional,
7909 target_filter,
7910 path_variable,
7911 edge_properties,
7912 is_variable_length,
7913 optional_pattern_vars,
7914 scope_match_variables,
7915 edge_filter_expr,
7916 path_mode,
7917 qpp_steps,
7918 },
7919 other => other,
7920 }
7921 }
7922}
7923
7924pub fn aggregate_column_name(expr: &Expr) -> String {
7931 expr.to_string_repr()
7932}
7933
7934#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7936pub struct ExplainOutput {
7937 pub plan_text: String,
7939 pub index_usage: Vec<IndexUsage>,
7941 pub cost_estimates: CostEstimates,
7943 pub warnings: Vec<String>,
7945 pub suggestions: Vec<IndexSuggestion>,
7947}
7948
7949#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7951pub struct IndexSuggestion {
7952 pub label_or_type: String,
7954 pub property: String,
7956 pub index_type: String,
7958 pub reason: String,
7960 pub create_statement: String,
7962}
7963
7964#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7966pub struct IndexUsage {
7967 pub label_or_type: String,
7968 pub property: String,
7969 pub index_type: String,
7970 pub used: bool,
7972 pub reason: Option<String>,
7974}
7975
7976#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7978pub struct CostEstimates {
7979 pub estimated_rows: f64,
7981 pub estimated_cost: f64,
7983}
7984
7985impl QueryPlanner {
7986 pub fn explain_plan(&self, ast: Query) -> Result<ExplainOutput> {
7988 let plan = self.plan(ast)?;
7989 self.explain_logical_plan(&plan)
7990 }
7991
7992 pub fn explain_logical_plan(&self, plan: &LogicalPlan) -> Result<ExplainOutput> {
7994 let index_usage = self.analyze_index_usage(plan)?;
7995 let cost_estimates = self.estimate_costs(plan)?;
7996 let suggestions = self.collect_index_suggestions(plan);
7997 let warnings = Vec::new();
7998 let plan_text = format!("{:#?}", plan);
7999
8000 Ok(ExplainOutput {
8001 plan_text,
8002 index_usage,
8003 cost_estimates,
8004 warnings,
8005 suggestions,
8006 })
8007 }
8008
8009 fn analyze_index_usage(&self, plan: &LogicalPlan) -> Result<Vec<IndexUsage>> {
8010 let mut usage = Vec::new();
8011 self.collect_index_usage(plan, &mut usage);
8012 Ok(usage)
8013 }
8014
8015 fn collect_index_usage(&self, plan: &LogicalPlan, usage: &mut Vec<IndexUsage>) {
8016 match plan {
8017 LogicalPlan::Scan {
8018 label_id,
8019 filter: Some(filter),
8020 ..
8021 } => {
8022 if let Some(label_name) = self.schema.label_name_by_id(*label_id) {
8026 let analyzer = crate::query::pushdown::IndexAwareAnalyzer::new(&self.schema);
8027 if let LogicalPlan::Scan { variable, .. } = plan {
8030 let strategy = analyzer.analyze(filter, variable, *label_id);
8031 for prop in strategy.hash_index_columns {
8032 usage.push(IndexUsage {
8033 label_or_type: label_name.to_string(),
8034 property: prop,
8035 index_type: "HASH".to_string(),
8036 used: true,
8037 reason: Some(
8038 "Hash index point lookup pushed into Lance scan".to_string(),
8039 ),
8040 });
8041 }
8042 }
8043 }
8044 }
8045 LogicalPlan::Scan { .. } => {}
8046 LogicalPlan::VectorKnn {
8047 label_id, property, ..
8048 } => {
8049 let label_name = self.schema.label_name_by_id(*label_id).unwrap_or("?");
8050 usage.push(IndexUsage {
8051 label_or_type: label_name.to_string(),
8052 property: property.clone(),
8053 index_type: "VECTOR".to_string(),
8054 used: true,
8055 reason: None,
8056 });
8057 }
8058 LogicalPlan::Explain { plan } => self.collect_index_usage(plan, usage),
8059 LogicalPlan::Filter { input, .. } => self.collect_index_usage(input, usage),
8060 LogicalPlan::Project { input, .. } => self.collect_index_usage(input, usage),
8061 LogicalPlan::Limit { input, .. } => self.collect_index_usage(input, usage),
8062 LogicalPlan::Sort { input, .. } => self.collect_index_usage(input, usage),
8063 LogicalPlan::Aggregate { input, .. } => self.collect_index_usage(input, usage),
8064 LogicalPlan::Traverse { input, .. } => self.collect_index_usage(input, usage),
8065 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
8066 self.collect_index_usage(left, usage);
8067 self.collect_index_usage(right, usage);
8068 }
8069 _ => {}
8070 }
8071 }
8072
8073 fn estimate_costs(&self, _plan: &LogicalPlan) -> Result<CostEstimates> {
8074 Ok(CostEstimates {
8075 estimated_rows: 100.0,
8076 estimated_cost: 10.0,
8077 })
8078 }
8079
8080 fn collect_index_suggestions(&self, plan: &LogicalPlan) -> Vec<IndexSuggestion> {
8086 let mut suggestions = Vec::new();
8087 self.collect_temporal_suggestions(plan, &mut suggestions);
8088 suggestions
8089 }
8090
8091 fn collect_temporal_suggestions(
8093 &self,
8094 plan: &LogicalPlan,
8095 suggestions: &mut Vec<IndexSuggestion>,
8096 ) {
8097 match plan {
8098 LogicalPlan::Filter {
8099 input, predicate, ..
8100 } => {
8101 self.detect_temporal_pattern(predicate, suggestions);
8103 self.collect_temporal_suggestions(input, suggestions);
8105 }
8106 LogicalPlan::Explain { plan } => self.collect_temporal_suggestions(plan, suggestions),
8107 LogicalPlan::Project { input, .. } => {
8108 self.collect_temporal_suggestions(input, suggestions)
8109 }
8110 LogicalPlan::Limit { input, .. } => {
8111 self.collect_temporal_suggestions(input, suggestions)
8112 }
8113 LogicalPlan::Sort { input, .. } => {
8114 self.collect_temporal_suggestions(input, suggestions)
8115 }
8116 LogicalPlan::Aggregate { input, .. } => {
8117 self.collect_temporal_suggestions(input, suggestions)
8118 }
8119 LogicalPlan::Traverse { input, .. } => {
8120 self.collect_temporal_suggestions(input, suggestions)
8121 }
8122 LogicalPlan::Union { left, right, .. } | LogicalPlan::CrossJoin { left, right } => {
8123 self.collect_temporal_suggestions(left, suggestions);
8124 self.collect_temporal_suggestions(right, suggestions);
8125 }
8126 _ => {}
8127 }
8128 }
8129
8130 fn detect_temporal_pattern(&self, expr: &Expr, suggestions: &mut Vec<IndexSuggestion>) {
8136 match expr {
8137 Expr::FunctionCall { name, args, .. }
8139 if (name.eq_ignore_ascii_case("uni.temporal.validAt")
8140 || name.eq_ignore_ascii_case("validAt"))
8141 && args.len() >= 2 =>
8142 {
8143 let start_prop = if let Some(Expr::Literal(CypherLiteral::String(s))) = args.get(1)
8145 {
8146 s.clone()
8147 } else {
8148 "valid_from".to_string()
8149 };
8150
8151 if let Some(var) = args.first().and_then(|e| e.extract_variable()) {
8153 self.suggest_temporal_index(&var, &start_prop, suggestions);
8154 }
8155 }
8156
8157 Expr::BinaryOp {
8159 left,
8160 op: BinaryOp::And,
8161 right,
8162 } => {
8163 if let Expr::BinaryOp {
8165 left: prop_expr,
8166 op: BinaryOp::LtEq,
8167 ..
8168 } = left.as_ref()
8169 && let Expr::Property(base, prop_name) = prop_expr.as_ref()
8170 && (prop_name == "valid_from"
8171 || prop_name.contains("start")
8172 || prop_name.contains("from")
8173 || prop_name.contains("begin"))
8174 && let Some(var) = base.extract_variable()
8175 {
8176 self.suggest_temporal_index(&var, prop_name, suggestions);
8177 }
8178
8179 self.detect_temporal_pattern(left.as_ref(), suggestions);
8181 self.detect_temporal_pattern(right.as_ref(), suggestions);
8182 }
8183
8184 Expr::BinaryOp { left, right, .. } => {
8186 self.detect_temporal_pattern(left.as_ref(), suggestions);
8187 self.detect_temporal_pattern(right.as_ref(), suggestions);
8188 }
8189
8190 _ => {}
8191 }
8192 }
8193
8194 fn suggest_temporal_index(
8196 &self,
8197 _variable: &str,
8198 property: &str,
8199 suggestions: &mut Vec<IndexSuggestion>,
8200 ) {
8201 let mut has_index = false;
8204
8205 for index in &self.schema.indexes {
8206 if let IndexDefinition::Scalar(config) = index
8207 && config.properties.contains(&property.to_string())
8208 {
8209 has_index = true;
8210 break;
8211 }
8212 }
8213
8214 if !has_index {
8215 let already_suggested = suggestions.iter().any(|s| s.property == property);
8217 if !already_suggested {
8218 suggestions.push(IndexSuggestion {
8219 label_or_type: "(detected from temporal query)".to_string(),
8220 property: property.to_string(),
8221 index_type: "SCALAR (BTree)".to_string(),
8222 reason: format!(
8223 "Temporal queries using '{}' can benefit from a scalar index for range scans",
8224 property
8225 ),
8226 create_statement: format!(
8227 "CREATE INDEX idx_{} FOR (n:YourLabel) ON (n.{})",
8228 property, property
8229 ),
8230 });
8231 }
8232 }
8233 }
8234
8235 fn normalize_expression_for_storage(expr: &Expr) -> String {
8240 match expr {
8241 Expr::Property(base, prop) if matches!(**base, Expr::Variable(_)) => prop.clone(),
8242 _ => {
8243 let expr_str = expr.to_string_repr();
8245 Self::strip_variable_prefix(&expr_str)
8246 }
8247 }
8248 }
8249
8250 fn strip_variable_prefix(expr_str: &str) -> String {
8253 use regex::Regex;
8254 let re = Regex::new(r"\b\w+\.(\w+)").unwrap();
8256 re.replace_all(expr_str, "$1").to_string()
8257 }
8258
8259 fn plan_schema_command(&self, cmd: SchemaCommand) -> Result<LogicalPlan> {
8261 match cmd {
8262 SchemaCommand::CreateVectorIndex(c) => {
8263 let opt = |key: &str| {
8265 c.options
8266 .get(key)
8267 .and_then(|v| v.as_str())
8268 .and_then(|s| s.parse::<u32>().ok())
8269 };
8270 let opt_u8 = |key: &str| -> Option<u8> {
8271 c.options
8272 .get(key)
8273 .and_then(|v| v.as_str())
8274 .and_then(|s| s.parse::<u8>().ok())
8275 };
8276 let index_type = match c.options.get("type").and_then(|v| v.as_str()) {
8277 Some("flat") => VectorIndexType::Flat,
8278 Some("ivf_flat") => VectorIndexType::IvfFlat {
8279 num_partitions: opt("partitions").unwrap_or(256),
8280 },
8281 Some("ivf_sq") => VectorIndexType::IvfSq {
8282 num_partitions: opt("partitions").unwrap_or(256),
8283 },
8284 Some("ivf_rq") => VectorIndexType::IvfRq {
8285 num_partitions: opt("partitions").unwrap_or(256),
8286 num_bits: opt_u8("num_bits"),
8287 },
8288 Some("hnsw_flat") => VectorIndexType::HnswFlat {
8289 m: opt("m").unwrap_or(16),
8290 ef_construction: opt("ef_construction").unwrap_or(200),
8291 num_partitions: opt("partitions"),
8292 },
8293 Some("hnsw") | Some("hnsw_sq") => VectorIndexType::HnswSq {
8294 m: opt("m").unwrap_or(16),
8295 ef_construction: opt("ef_construction").unwrap_or(200),
8296 num_partitions: opt("partitions"),
8297 },
8298 Some("hnsw_pq") => VectorIndexType::HnswPq {
8299 m: opt("m").unwrap_or(16),
8300 ef_construction: opt("ef_construction").unwrap_or(200),
8301 num_sub_vectors: opt("sub_vectors").unwrap_or(16),
8302 num_partitions: opt("partitions"),
8303 },
8304 _ => VectorIndexType::IvfPq {
8305 num_partitions: opt("partitions").unwrap_or(256),
8306 num_sub_vectors: opt("sub_vectors").unwrap_or(16),
8307 bits_per_subvector: opt_u8("num_bits").unwrap_or(8),
8308 },
8309 };
8310
8311 let embedding_config = if let Some(emb_val) = c.options.get("embedding") {
8313 Self::parse_embedding_config(emb_val)?
8314 } else {
8315 None
8316 };
8317
8318 let config = VectorIndexConfig {
8319 name: c.name,
8320 label: c.label,
8321 property: c.property,
8322 metric: DistanceMetric::Cosine,
8323 index_type,
8324 embedding_config,
8325 metadata: Default::default(),
8326 };
8327 Ok(LogicalPlan::CreateVectorIndex {
8328 config,
8329 if_not_exists: c.if_not_exists,
8330 })
8331 }
8332 SchemaCommand::CreateFullTextIndex(cfg) => Ok(LogicalPlan::CreateFullTextIndex {
8333 config: FullTextIndexConfig {
8334 name: cfg.name,
8335 label: cfg.label,
8336 properties: cfg.properties,
8337 tokenizer: TokenizerConfig::Standard,
8338 with_positions: true,
8339 metadata: Default::default(),
8340 },
8341 if_not_exists: cfg.if_not_exists,
8342 }),
8343 SchemaCommand::CreateScalarIndex(cfg) => {
8344 let properties: Vec<String> = cfg
8346 .expressions
8347 .iter()
8348 .map(Self::normalize_expression_for_storage)
8349 .collect();
8350
8351 Ok(LogicalPlan::CreateScalarIndex {
8352 config: ScalarIndexConfig {
8353 name: cfg.name,
8354 label: cfg.label,
8355 properties,
8356 index_type: ScalarIndexType::BTree,
8357 where_clause: cfg.where_clause.map(|e| e.to_string_repr()),
8358 metadata: Default::default(),
8359 },
8360 if_not_exists: cfg.if_not_exists,
8361 })
8362 }
8363 SchemaCommand::CreateJsonFtsIndex(cfg) => {
8364 let with_positions = cfg
8365 .options
8366 .get("with_positions")
8367 .and_then(|v| v.as_bool())
8368 .unwrap_or(false);
8369 Ok(LogicalPlan::CreateJsonFtsIndex {
8370 config: JsonFtsIndexConfig {
8371 name: cfg.name,
8372 label: cfg.label,
8373 column: cfg.column,
8374 paths: Vec::new(),
8375 with_positions,
8376 metadata: Default::default(),
8377 },
8378 if_not_exists: cfg.if_not_exists,
8379 })
8380 }
8381 SchemaCommand::DropIndex(drop) => Ok(LogicalPlan::DropIndex {
8382 name: drop.name,
8383 if_exists: false, }),
8385 SchemaCommand::CreateConstraint(c) => Ok(LogicalPlan::CreateConstraint(c)),
8386 SchemaCommand::DropConstraint(c) => Ok(LogicalPlan::DropConstraint(c)),
8387 SchemaCommand::CreateLabel(c) => Ok(LogicalPlan::CreateLabel(c)),
8388 SchemaCommand::CreateEdgeType(c) => Ok(LogicalPlan::CreateEdgeType(c)),
8389 SchemaCommand::AlterLabel(c) => Ok(LogicalPlan::AlterLabel(c)),
8390 SchemaCommand::AlterEdgeType(c) => Ok(LogicalPlan::AlterEdgeType(c)),
8391 SchemaCommand::DropLabel(c) => Ok(LogicalPlan::DropLabel(c)),
8392 SchemaCommand::DropEdgeType(c) => Ok(LogicalPlan::DropEdgeType(c)),
8393 SchemaCommand::ShowConstraints(c) => Ok(LogicalPlan::ShowConstraints(c)),
8394 SchemaCommand::ShowIndexes(c) => Ok(LogicalPlan::ShowIndexes { filter: c.filter }),
8395 SchemaCommand::ShowDatabase => Ok(LogicalPlan::ShowDatabase),
8396 SchemaCommand::ShowConfig => Ok(LogicalPlan::ShowConfig),
8397 SchemaCommand::ShowStatistics => Ok(LogicalPlan::ShowStatistics),
8398 SchemaCommand::Vacuum => Ok(LogicalPlan::Vacuum),
8399 SchemaCommand::Checkpoint => Ok(LogicalPlan::Checkpoint),
8400 SchemaCommand::Backup { path } => Ok(LogicalPlan::Backup {
8401 destination: path,
8402 options: HashMap::new(),
8403 }),
8404 SchemaCommand::CopyTo(cmd) => Ok(LogicalPlan::CopyTo {
8405 label: cmd.label,
8406 path: cmd.path,
8407 format: cmd.format,
8408 options: cmd.options,
8409 }),
8410 SchemaCommand::CopyFrom(cmd) => Ok(LogicalPlan::CopyFrom {
8411 label: cmd.label,
8412 path: cmd.path,
8413 format: cmd.format,
8414 options: cmd.options,
8415 }),
8416 }
8417 }
8418
8419 fn parse_embedding_config(emb_val: &Value) -> Result<Option<EmbeddingConfig>> {
8420 let obj = emb_val
8421 .as_object()
8422 .ok_or_else(|| anyhow!("embedding option must be an object"))?;
8423
8424 let alias = obj
8426 .get("alias")
8427 .and_then(|v| v.as_str())
8428 .ok_or_else(|| anyhow!("embedding.alias is required"))?;
8429
8430 let source_properties = obj
8432 .get("source")
8433 .and_then(|v| v.as_array())
8434 .ok_or_else(|| anyhow!("embedding.source is required and must be an array"))?
8435 .iter()
8436 .filter_map(|v| v.as_str().map(|s| s.to_string()))
8437 .collect::<Vec<_>>();
8438
8439 if source_properties.is_empty() {
8440 return Err(anyhow!(
8441 "embedding.source must contain at least one property"
8442 ));
8443 }
8444
8445 let batch_size = obj
8446 .get("batch_size")
8447 .and_then(|v| v.as_u64())
8448 .map(|v| v as usize)
8449 .unwrap_or(32);
8450
8451 let document_prefix = obj
8452 .get("document_prefix")
8453 .and_then(|v| v.as_str())
8454 .map(|s| s.to_string());
8455
8456 let query_prefix = obj
8457 .get("query_prefix")
8458 .and_then(|v| v.as_str())
8459 .map(|s| s.to_string());
8460
8461 Ok(Some(EmbeddingConfig {
8462 alias: alias.to_string(),
8463 source_properties,
8464 batch_size,
8465 document_prefix,
8466 query_prefix,
8467 }))
8468 }
8469}
8470
8471pub fn collect_properties_from_plan(plan: &LogicalPlan) -> HashMap<String, HashSet<String>> {
8478 let mut properties: HashMap<String, HashSet<String>> = HashMap::new();
8479 collect_properties_recursive(plan, &mut properties);
8480 properties
8481}
8482
8483fn collect_properties_recursive(
8485 plan: &LogicalPlan,
8486 properties: &mut HashMap<String, HashSet<String>>,
8487) {
8488 match plan {
8489 LogicalPlan::Window {
8490 input,
8491 window_exprs,
8492 } => {
8493 for expr in window_exprs {
8495 collect_properties_from_expr_into(expr, properties);
8496 }
8497 collect_properties_recursive(input, properties);
8498 }
8499 LogicalPlan::Project { input, projections } => {
8500 for (expr, _alias) in projections {
8501 collect_properties_from_expr_into(expr, properties);
8502 }
8503 collect_properties_recursive(input, properties);
8504 }
8505 LogicalPlan::Sort { input, order_by } => {
8506 for sort_item in order_by {
8507 collect_properties_from_expr_into(&sort_item.expr, properties);
8508 }
8509 collect_properties_recursive(input, properties);
8510 }
8511 LogicalPlan::Filter {
8512 input, predicate, ..
8513 } => {
8514 collect_properties_from_expr_into(predicate, properties);
8515 collect_properties_recursive(input, properties);
8516 }
8517 LogicalPlan::Aggregate {
8518 input,
8519 group_by,
8520 aggregates,
8521 } => {
8522 for expr in group_by {
8523 collect_properties_from_expr_into(expr, properties);
8524 }
8525 for expr in aggregates {
8526 collect_properties_from_expr_into(expr, properties);
8527 }
8528 collect_properties_recursive(input, properties);
8529 }
8530 LogicalPlan::Scan {
8531 filter: Some(expr), ..
8532 } => {
8533 collect_properties_from_expr_into(expr, properties);
8534 }
8535 LogicalPlan::Scan { filter: None, .. } => {}
8536 LogicalPlan::ExtIdLookup {
8537 filter: Some(expr), ..
8538 } => {
8539 collect_properties_from_expr_into(expr, properties);
8540 }
8541 LogicalPlan::ExtIdLookup { filter: None, .. } => {}
8542 LogicalPlan::ScanAll {
8543 filter: Some(expr), ..
8544 } => {
8545 collect_properties_from_expr_into(expr, properties);
8546 }
8547 LogicalPlan::ScanAll { filter: None, .. } => {}
8548 LogicalPlan::ScanMainByLabels {
8549 filter: Some(expr), ..
8550 } => {
8551 collect_properties_from_expr_into(expr, properties);
8552 }
8553 LogicalPlan::ScanMainByLabels { filter: None, .. } => {}
8554 LogicalPlan::TraverseMainByType {
8555 input,
8556 target_filter,
8557 ..
8558 } => {
8559 if let Some(expr) = target_filter {
8560 collect_properties_from_expr_into(expr, properties);
8561 }
8562 collect_properties_recursive(input, properties);
8563 }
8564 LogicalPlan::Traverse {
8565 input,
8566 target_filter,
8567 step_variable: _,
8568 ..
8569 } => {
8570 if let Some(expr) = target_filter {
8571 collect_properties_from_expr_into(expr, properties);
8572 }
8573 collect_properties_recursive(input, properties);
8577 }
8578 LogicalPlan::Unwind { input, expr, .. } => {
8579 collect_properties_from_expr_into(expr, properties);
8580 collect_properties_recursive(input, properties);
8581 }
8582 LogicalPlan::Create { input, pattern } => {
8583 mark_pattern_variables(pattern, properties);
8588 collect_properties_recursive(input, properties);
8589 }
8590 LogicalPlan::CreateBatch { input, patterns } => {
8591 for pattern in patterns {
8592 mark_pattern_variables(pattern, properties);
8593 }
8594 collect_properties_recursive(input, properties);
8595 }
8596 LogicalPlan::Merge {
8597 input,
8598 pattern,
8599 on_match,
8600 on_create,
8601 } => {
8602 mark_pattern_variables(pattern, properties);
8603 if let Some(set_clause) = on_match {
8604 mark_set_item_variables(&set_clause.items, properties);
8605 }
8606 if let Some(set_clause) = on_create {
8607 mark_set_item_variables(&set_clause.items, properties);
8608 }
8609 collect_properties_recursive(input, properties);
8610 }
8611 LogicalPlan::Set { input, items } => {
8612 mark_set_item_variables(items, properties);
8613 collect_properties_recursive(input, properties);
8614 }
8615 LogicalPlan::Remove { input, items } => {
8616 for item in items {
8617 match item {
8618 RemoveItem::Property(expr) => {
8619 collect_properties_from_expr_into(expr, properties);
8622 if let Expr::Property(base, _) = expr
8623 && let Expr::Variable(var) = base.as_ref()
8624 {
8625 properties
8626 .entry(var.clone())
8627 .or_default()
8628 .insert("*".to_string());
8629 }
8630 }
8631 RemoveItem::Labels { variable, .. } => {
8632 properties
8634 .entry(variable.clone())
8635 .or_default()
8636 .insert("*".to_string());
8637 }
8638 }
8639 }
8640 collect_properties_recursive(input, properties);
8641 }
8642 LogicalPlan::Delete { input, items, .. } => {
8643 for expr in items {
8644 collect_properties_from_expr_into(expr, properties);
8645 }
8646 collect_properties_recursive(input, properties);
8647 }
8648 LogicalPlan::Foreach {
8649 input, list, body, ..
8650 } => {
8651 collect_properties_from_expr_into(list, properties);
8652 for plan in body {
8653 collect_properties_recursive(plan, properties);
8654 }
8655 collect_properties_recursive(input, properties);
8656 }
8657 LogicalPlan::Limit { input, .. } => {
8658 collect_properties_recursive(input, properties);
8659 }
8660 LogicalPlan::CrossJoin { left, right } => {
8661 collect_properties_recursive(left, properties);
8662 collect_properties_recursive(right, properties);
8663 }
8664 LogicalPlan::Apply {
8665 input,
8666 subquery,
8667 input_filter,
8668 } => {
8669 if let Some(expr) = input_filter {
8670 collect_properties_from_expr_into(expr, properties);
8671 }
8672 collect_properties_recursive(input, properties);
8673 collect_properties_recursive(subquery, properties);
8674 }
8675 LogicalPlan::Union { left, right, .. } => {
8676 collect_properties_recursive(left, properties);
8677 collect_properties_recursive(right, properties);
8678 }
8679 LogicalPlan::RecursiveCTE {
8680 initial, recursive, ..
8681 } => {
8682 collect_properties_recursive(initial, properties);
8683 collect_properties_recursive(recursive, properties);
8684 }
8685 LogicalPlan::ProcedureCall { arguments, .. } => {
8686 for arg in arguments {
8687 collect_properties_from_expr_into(arg, properties);
8688 }
8689 }
8690 LogicalPlan::VectorKnn { query, .. } => {
8691 collect_properties_from_expr_into(query, properties);
8692 }
8693 LogicalPlan::InvertedIndexLookup { terms, .. } => {
8694 collect_properties_from_expr_into(terms, properties);
8695 }
8696 LogicalPlan::ShortestPath { input, .. } => {
8697 collect_properties_recursive(input, properties);
8698 }
8699 LogicalPlan::AllShortestPaths { input, .. } => {
8700 collect_properties_recursive(input, properties);
8701 }
8702 LogicalPlan::Distinct { input } => {
8703 collect_properties_recursive(input, properties);
8704 }
8705 LogicalPlan::QuantifiedPattern {
8706 input,
8707 pattern_plan,
8708 ..
8709 } => {
8710 collect_properties_recursive(input, properties);
8711 collect_properties_recursive(pattern_plan, properties);
8712 }
8713 LogicalPlan::BindZeroLengthPath { input, .. } => {
8714 collect_properties_recursive(input, properties);
8715 }
8716 LogicalPlan::BindPath { input, .. } => {
8717 collect_properties_recursive(input, properties);
8718 }
8719 LogicalPlan::SubqueryCall { input, subquery } => {
8720 collect_properties_recursive(input, properties);
8721 collect_properties_recursive(subquery, properties);
8722 }
8723 LogicalPlan::LocyProject {
8724 input, projections, ..
8725 } => {
8726 for (expr, _alias) in projections {
8727 match expr {
8728 Expr::Variable(name) if !name.contains('.') => {
8732 properties
8733 .entry(name.clone())
8734 .or_default()
8735 .insert("_vid".to_string());
8736 }
8737 _ => collect_properties_from_expr_into(expr, properties),
8738 }
8739 }
8740 collect_properties_recursive(input, properties);
8741 }
8742 LogicalPlan::LocyFold {
8743 input,
8744 fold_bindings,
8745 ..
8746 } => {
8747 for (_name, expr) in fold_bindings {
8748 collect_properties_from_expr_into(expr, properties);
8749 }
8750 collect_properties_recursive(input, properties);
8751 }
8752 LogicalPlan::LocyBestBy {
8753 input, criteria, ..
8754 } => {
8755 for (expr, _asc) in criteria {
8756 collect_properties_from_expr_into(expr, properties);
8757 }
8758 collect_properties_recursive(input, properties);
8759 }
8760 LogicalPlan::LocyPriority { input, .. } => {
8761 collect_properties_recursive(input, properties);
8762 }
8763 LogicalPlan::LocyModelInvoke { input, .. } => {
8764 collect_properties_recursive(input, properties);
8770 }
8771 _ => {}
8773 }
8774}
8775
8776fn mark_set_item_variables(items: &[SetItem], properties: &mut HashMap<String, HashSet<String>>) {
8778 for item in items {
8779 match item {
8780 SetItem::Property { expr, value } => {
8781 collect_properties_from_expr_into(expr, properties);
8793 collect_properties_from_expr_into(value, properties);
8794 if let Expr::Property(base, _) = expr
8795 && let Expr::Variable(var) = base.as_ref()
8796 {
8797 properties
8798 .entry(var.clone())
8799 .or_default()
8800 .insert(STRUCT_ONLY_SENTINEL.to_string());
8801 }
8802 }
8803 SetItem::Labels { variable, .. } => {
8804 properties
8806 .entry(variable.clone())
8807 .or_default()
8808 .insert("*".to_string());
8809 }
8810 SetItem::Variable { variable, value } | SetItem::VariablePlus { variable, value } => {
8811 properties
8813 .entry(variable.clone())
8814 .or_default()
8815 .insert("*".to_string());
8816 collect_properties_from_expr_into(value, properties);
8817 }
8818 }
8819 }
8820}
8821
8822fn mark_pattern_variables(pattern: &Pattern, properties: &mut HashMap<String, HashSet<String>>) {
8827 for path in &pattern.paths {
8828 if let Some(ref v) = path.variable {
8829 properties
8830 .entry(v.clone())
8831 .or_default()
8832 .insert("*".to_string());
8833 }
8834 for element in &path.elements {
8835 match element {
8836 PatternElement::Node(n) => {
8837 if let Some(ref v) = n.variable {
8838 properties
8839 .entry(v.clone())
8840 .or_default()
8841 .insert("*".to_string());
8842 }
8843 if let Some(ref props) = n.properties {
8845 collect_properties_from_expr_into(props, properties);
8846 }
8847 }
8848 PatternElement::Relationship(r) => {
8849 if let Some(ref v) = r.variable {
8850 properties
8851 .entry(v.clone())
8852 .or_default()
8853 .insert("*".to_string());
8854 }
8855 if let Some(ref props) = r.properties {
8856 collect_properties_from_expr_into(props, properties);
8857 }
8858 }
8859 PatternElement::Parenthesized { pattern, .. } => {
8860 let sub = Pattern {
8861 paths: vec![pattern.as_ref().clone()],
8862 };
8863 mark_pattern_variables(&sub, properties);
8864 }
8865 }
8866 }
8867 }
8868}
8869
8870fn collect_properties_from_expr_into(
8872 expr: &Expr,
8873 properties: &mut HashMap<String, HashSet<String>>,
8874) {
8875 match expr {
8876 Expr::PatternComprehension {
8877 where_clause,
8878 map_expr,
8879 ..
8880 } => {
8881 if let Some(where_expr) = where_clause {
8885 collect_properties_from_expr_into(where_expr, properties);
8886 }
8887 collect_properties_from_expr_into(map_expr, properties);
8888 }
8889 Expr::Variable(name) => {
8890 if let Some((var, prop)) = name.split_once('.') {
8892 properties
8893 .entry(var.to_string())
8894 .or_default()
8895 .insert(prop.to_string());
8896 } else {
8897 properties
8899 .entry(name.clone())
8900 .or_default()
8901 .insert("*".to_string());
8902 }
8903 }
8904 Expr::Property(base, name) => {
8905 if let Expr::Variable(var) = base.as_ref() {
8907 properties
8908 .entry(var.clone())
8909 .or_default()
8910 .insert(name.clone());
8911 } else {
8914 collect_properties_from_expr_into(base, properties);
8916 }
8917 }
8918 Expr::BinaryOp { left, right, .. } => {
8919 collect_properties_from_expr_into(left, properties);
8920 collect_properties_from_expr_into(right, properties);
8921 }
8922 Expr::FunctionCall {
8923 name,
8924 args,
8925 window_spec,
8926 ..
8927 } => {
8928 analyze_function_property_requirements(name, args, properties);
8930
8931 for arg in args {
8933 collect_properties_from_expr_into(arg, properties);
8934 }
8935
8936 if let Some(spec) = window_spec {
8938 for part_expr in &spec.partition_by {
8939 collect_properties_from_expr_into(part_expr, properties);
8940 }
8941 for sort_item in &spec.order_by {
8942 collect_properties_from_expr_into(&sort_item.expr, properties);
8943 }
8944 }
8945 }
8946 Expr::UnaryOp { expr, .. } => {
8947 collect_properties_from_expr_into(expr, properties);
8948 }
8949 Expr::List(items) => {
8950 for item in items {
8951 collect_properties_from_expr_into(item, properties);
8952 }
8953 }
8954 Expr::Map(entries) => {
8955 for (_key, value) in entries {
8956 collect_properties_from_expr_into(value, properties);
8957 }
8958 }
8959 Expr::ListComprehension {
8960 list,
8961 where_clause,
8962 map_expr,
8963 ..
8964 } => {
8965 collect_properties_from_expr_into(list, properties);
8966 if let Some(where_expr) = where_clause {
8967 collect_properties_from_expr_into(where_expr, properties);
8968 }
8969 collect_properties_from_expr_into(map_expr, properties);
8970 }
8971 Expr::Case {
8972 expr,
8973 when_then,
8974 else_expr,
8975 } => {
8976 if let Some(scrutinee_expr) = expr {
8977 collect_properties_from_expr_into(scrutinee_expr, properties);
8978 }
8979 for (when, then) in when_then {
8980 collect_properties_from_expr_into(when, properties);
8981 collect_properties_from_expr_into(then, properties);
8982 }
8983 if let Some(default_expr) = else_expr {
8984 collect_properties_from_expr_into(default_expr, properties);
8985 }
8986 }
8987 Expr::Quantifier {
8988 list, predicate, ..
8989 } => {
8990 collect_properties_from_expr_into(list, properties);
8991 collect_properties_from_expr_into(predicate, properties);
8992 }
8993 Expr::Reduce {
8994 init, list, expr, ..
8995 } => {
8996 collect_properties_from_expr_into(init, properties);
8997 collect_properties_from_expr_into(list, properties);
8998 collect_properties_from_expr_into(expr, properties);
8999 }
9000 Expr::Exists { query, .. } => {
9001 collect_properties_from_subquery(query, properties);
9006 }
9007 Expr::CountSubquery(query) | Expr::CollectSubquery(query) => {
9008 collect_properties_from_subquery(query, properties);
9009 }
9010 Expr::IsNull(expr) | Expr::IsNotNull(expr) | Expr::IsUnique(expr) => {
9011 collect_properties_from_expr_into(expr, properties);
9012 }
9013 Expr::In { expr, list } => {
9014 collect_properties_from_expr_into(expr, properties);
9015 collect_properties_from_expr_into(list, properties);
9016 }
9017 Expr::ArrayIndex { array, index } => {
9018 if let Expr::Variable(var) = array.as_ref() {
9019 if let Expr::Literal(CypherLiteral::String(prop_name)) = index.as_ref() {
9020 properties
9022 .entry(var.clone())
9023 .or_default()
9024 .insert(prop_name.clone());
9025 } else {
9026 properties
9028 .entry(var.clone())
9029 .or_default()
9030 .insert("*".to_string());
9031 }
9032 }
9033 collect_properties_from_expr_into(array, properties);
9034 collect_properties_from_expr_into(index, properties);
9035 }
9036 Expr::ArraySlice { array, start, end } => {
9037 collect_properties_from_expr_into(array, properties);
9038 if let Some(start_expr) = start {
9039 collect_properties_from_expr_into(start_expr, properties);
9040 }
9041 if let Some(end_expr) = end {
9042 collect_properties_from_expr_into(end_expr, properties);
9043 }
9044 }
9045 Expr::ValidAt {
9046 entity,
9047 timestamp,
9048 start_prop,
9049 end_prop,
9050 } => {
9051 if let Expr::Variable(var) = entity.as_ref() {
9053 if let Some(prop) = start_prop {
9054 properties
9055 .entry(var.clone())
9056 .or_default()
9057 .insert(prop.clone());
9058 }
9059 if let Some(prop) = end_prop {
9060 properties
9061 .entry(var.clone())
9062 .or_default()
9063 .insert(prop.clone());
9064 }
9065 }
9066 collect_properties_from_expr_into(entity, properties);
9067 collect_properties_from_expr_into(timestamp, properties);
9068 }
9069 Expr::MapProjection { base, items } => {
9070 collect_properties_from_expr_into(base, properties);
9071 for item in items {
9072 match item {
9073 uni_cypher::ast::MapProjectionItem::Property(prop) => {
9074 if let Expr::Variable(var) = base.as_ref() {
9075 properties
9076 .entry(var.clone())
9077 .or_default()
9078 .insert(prop.clone());
9079 }
9080 }
9081 uni_cypher::ast::MapProjectionItem::AllProperties => {
9082 if let Expr::Variable(var) = base.as_ref() {
9083 properties
9084 .entry(var.clone())
9085 .or_default()
9086 .insert("*".to_string());
9087 }
9088 }
9089 uni_cypher::ast::MapProjectionItem::LiteralEntry(_, expr) => {
9090 collect_properties_from_expr_into(expr, properties);
9091 }
9092 uni_cypher::ast::MapProjectionItem::Variable(_) => {}
9093 }
9094 }
9095 }
9096 Expr::LabelCheck { expr, .. } => {
9097 collect_properties_from_expr_into(expr, properties);
9098 }
9099 Expr::Parameter(name) => {
9103 properties
9104 .entry(name.clone())
9105 .or_default()
9106 .insert("*".to_string());
9107 }
9108 Expr::Literal(_) | Expr::Wildcard => {}
9110 }
9111}
9112
9113fn collect_properties_from_subquery(
9119 query: &Query,
9120 properties: &mut HashMap<String, HashSet<String>>,
9121) {
9122 match query {
9123 Query::Single(stmt) => {
9124 for clause in &stmt.clauses {
9125 match clause {
9126 Clause::Match(m) => {
9127 if let Some(ref wc) = m.where_clause {
9128 collect_properties_from_expr_into(wc, properties);
9129 }
9130 }
9131 Clause::With(w) => {
9132 for item in &w.items {
9133 if let ReturnItem::Expr { expr, .. } = item {
9134 collect_properties_from_expr_into(expr, properties);
9135 }
9136 }
9137 if let Some(ref wc) = w.where_clause {
9138 collect_properties_from_expr_into(wc, properties);
9139 }
9140 }
9141 Clause::Return(r) => {
9142 for item in &r.items {
9143 if let ReturnItem::Expr { expr, .. } = item {
9144 collect_properties_from_expr_into(expr, properties);
9145 }
9146 }
9147 }
9148 _ => {}
9149 }
9150 }
9151 }
9152 Query::Union { left, right, .. } => {
9153 collect_properties_from_subquery(left, properties);
9154 collect_properties_from_subquery(right, properties);
9155 }
9156 _ => {}
9157 }
9158}
9159
9160fn analyze_function_property_requirements(
9170 name: &str,
9171 args: &[Expr],
9172 properties: &mut HashMap<String, HashSet<String>>,
9173) {
9174 use crate::query::function_props::get_function_spec;
9175
9176 fn mark_wildcard(var: &str, properties: &mut HashMap<String, HashSet<String>>) {
9178 properties
9179 .entry(var.to_string())
9180 .or_default()
9181 .insert("*".to_string());
9182 }
9183
9184 if name.eq_ignore_ascii_case("created_at") || name.eq_ignore_ascii_case("updated_at") {
9187 if let Some(Expr::Variable(var)) = args.first() {
9188 let col = if name.eq_ignore_ascii_case("created_at") {
9189 "_created_at"
9190 } else {
9191 "_updated_at"
9192 };
9193 properties
9194 .entry(var.clone())
9195 .or_default()
9196 .insert(col.to_string());
9197 }
9198 return;
9199 }
9200
9201 let Some(spec) = get_function_spec(name) else {
9202 for arg in args {
9204 if let Expr::Variable(var) = arg {
9205 mark_wildcard(var, properties);
9206 }
9207 }
9208 return;
9209 };
9210
9211 for &(prop_arg_idx, entity_arg_idx) in spec.property_name_args {
9213 let entity_arg = args.get(entity_arg_idx);
9214 let prop_arg = args.get(prop_arg_idx);
9215
9216 match (entity_arg, prop_arg) {
9217 (Some(Expr::Variable(var)), Some(Expr::Literal(CypherLiteral::String(prop)))) => {
9218 properties
9219 .entry(var.clone())
9220 .or_default()
9221 .insert(prop.clone());
9222 }
9223 (Some(Expr::Variable(var)), Some(Expr::Parameter(_))) => {
9224 mark_wildcard(var, properties);
9226 }
9227 _ => {}
9228 }
9229 }
9230
9231 if spec.needs_full_entity {
9233 for &idx in spec.entity_args {
9234 if let Some(Expr::Variable(var)) = args.get(idx) {
9235 mark_wildcard(var, properties);
9236 }
9237 }
9238 }
9239}
9240
9241pub trait ForkIndexLookup {
9250 fn fork_index_for(
9251 &self,
9252 label: &str,
9253 column: &str,
9254 ) -> Option<uni_store::fork::ForkLocalIndexKind>;
9255
9256 fn fork_index_for_label_id(
9263 &self,
9264 _label_id: u16,
9265 _column: &str,
9266 ) -> Option<uni_store::fork::ForkLocalIndexKind> {
9267 None
9268 }
9269}
9270
9271impl ForkIndexLookup for uni_store::storage::StorageManager {
9272 fn fork_index_for(
9273 &self,
9274 label: &str,
9275 column: &str,
9276 ) -> Option<uni_store::fork::ForkLocalIndexKind> {
9277 self.fork_index_exists(label, column)
9278 }
9279
9280 fn fork_index_for_label_id(
9281 &self,
9282 label_id: u16,
9283 column: &str,
9284 ) -> Option<uni_store::fork::ForkLocalIndexKind> {
9285 let schema = self.schema_manager().schema();
9286 let label_name = schema.label_name_by_id(label_id)?;
9287 self.fork_index_exists(label_name, column)
9288 }
9289}
9290
9291#[must_use]
9301pub fn rewrite_for_fork_fusion<L: ForkIndexLookup>(plan: LogicalPlan, lookup: &L) -> LogicalPlan {
9302 rewrite_node(plan, lookup)
9303}
9304
9305fn rewrite_node<L: ForkIndexLookup>(plan: LogicalPlan, lookup: &L) -> LogicalPlan {
9306 match plan {
9307 LogicalPlan::Scan {
9308 label_id,
9309 labels,
9310 variable,
9311 filter,
9312 optional,
9313 } => {
9314 let kind = if labels.len() == 1
9318 && let Some(col) = filter
9319 .as_ref()
9320 .and_then(|f| equality_target_column(f, &variable))
9321 && let Some(idx_kind) = lookup.fork_index_for(&labels[0], &col)
9322 {
9323 into_fusion_kind(idx_kind)
9324 } else {
9325 None
9326 };
9327 match kind {
9328 Some(kind) => LogicalPlan::FusedIndexScan {
9329 label_id,
9330 labels,
9331 variable,
9332 filter,
9333 optional,
9334 kind,
9335 },
9336 None => LogicalPlan::Scan {
9337 label_id,
9338 labels,
9339 variable,
9340 filter,
9341 optional,
9342 },
9343 }
9344 }
9345 LogicalPlan::ProcedureCall {
9358 procedure_name,
9359 arguments,
9360 yield_items,
9361 } => {
9362 let kind = procedure_call_fusion_kind(&procedure_name, &arguments, lookup);
9363 let inner = LogicalPlan::ProcedureCall {
9364 procedure_name,
9365 arguments,
9366 yield_items,
9367 };
9368 match kind {
9369 Some(kind) => LogicalPlan::FusedIndexScanWrapped {
9370 inner: Box::new(inner),
9371 kind,
9372 },
9373 None => inner,
9374 }
9375 }
9376 LogicalPlan::VectorKnn {
9377 label_id,
9378 variable,
9379 property,
9380 query,
9381 k,
9382 threshold,
9383 } => {
9384 if let Some(idx_kind) = lookup.fork_index_for_label_id(label_id, &property)
9385 && let Some(kind) = into_fusion_kind(idx_kind)
9386 {
9387 LogicalPlan::FusedIndexScanWrapped {
9388 inner: Box::new(LogicalPlan::VectorKnn {
9389 label_id,
9390 variable,
9391 property,
9392 query,
9393 k,
9394 threshold,
9395 }),
9396 kind,
9397 }
9398 } else {
9399 LogicalPlan::VectorKnn {
9400 label_id,
9401 variable,
9402 property,
9403 query,
9404 k,
9405 threshold,
9406 }
9407 }
9408 }
9409 LogicalPlan::InvertedIndexLookup {
9410 label_id,
9411 variable,
9412 property,
9413 terms,
9414 } => {
9415 if let Some(idx_kind) = lookup.fork_index_for_label_id(label_id, &property)
9416 && let Some(kind) = into_fusion_kind(idx_kind)
9417 {
9418 LogicalPlan::FusedIndexScanWrapped {
9419 inner: Box::new(LogicalPlan::InvertedIndexLookup {
9420 label_id,
9421 variable,
9422 property,
9423 terms,
9424 }),
9425 kind,
9426 }
9427 } else {
9428 LogicalPlan::InvertedIndexLookup {
9429 label_id,
9430 variable,
9431 property,
9432 terms,
9433 }
9434 }
9435 }
9436 LogicalPlan::Filter {
9441 input,
9442 predicate,
9443 optional_variables,
9444 } => LogicalPlan::Filter {
9445 input: Box::new(rewrite_node(*input, lookup)),
9446 predicate,
9447 optional_variables,
9448 },
9449 LogicalPlan::Project { input, projections } => LogicalPlan::Project {
9450 input: Box::new(rewrite_node(*input, lookup)),
9451 projections,
9452 },
9453 LogicalPlan::Limit { input, skip, fetch } => LogicalPlan::Limit {
9454 input: Box::new(rewrite_node(*input, lookup)),
9455 skip,
9456 fetch,
9457 },
9458 LogicalPlan::Sort { input, order_by } => {
9459 let new_input = match (*input, &order_by[..]) {
9466 (
9467 LogicalPlan::Scan {
9468 label_id,
9469 labels,
9470 variable,
9471 filter,
9472 optional,
9473 },
9474 [single_sort],
9475 ) if labels.len() == 1
9476 && let Some(col) = column_of_scan_variable(&single_sort.expr, &variable)
9477 && let Some(uni_store::fork::ForkLocalIndexKind::Sorted) =
9478 lookup.fork_index_for(&labels[0], &col) =>
9479 {
9480 LogicalPlan::FusedIndexScan {
9481 label_id,
9482 labels,
9483 variable,
9484 filter,
9485 optional,
9486 kind: FusionKind::SortedKWayMerge,
9487 }
9488 }
9489 (other_input, _) => rewrite_node(other_input, lookup),
9490 };
9491 LogicalPlan::Sort {
9492 input: Box::new(new_input),
9493 order_by,
9494 }
9495 }
9496 LogicalPlan::Union { left, right, all } => LogicalPlan::Union {
9497 left: Box::new(rewrite_node(*left, lookup)),
9498 right: Box::new(rewrite_node(*right, lookup)),
9499 all,
9500 },
9501 other => other,
9505 }
9506}
9507
9508fn procedure_call_fusion_kind<L: ForkIndexLookup>(
9521 procedure_name: &str,
9522 arguments: &[Expr],
9523 lookup: &L,
9524) -> Option<FusionKind> {
9525 if arguments.len() < 2 {
9526 return None;
9527 }
9528 let label = match &arguments[0] {
9529 Expr::Literal(uni_cypher::ast::CypherLiteral::String(s)) => s.as_str(),
9530 _ => return None,
9531 };
9532 let column = match &arguments[1] {
9533 Expr::Literal(uni_cypher::ast::CypherLiteral::String(s)) => s.as_str(),
9534 _ => return None,
9535 };
9536 let expected = match procedure_name {
9537 "uni.vector.query" => uni_store::fork::ForkLocalIndexKind::Vector,
9538 "uni.fts.query" => uni_store::fork::ForkLocalIndexKind::FullText,
9539 _ => return None,
9540 };
9541 let registered = lookup.fork_index_for(label, column)?;
9542 if registered != expected {
9543 return None;
9544 }
9545 into_fusion_kind(registered)
9546}
9547
9548fn into_fusion_kind(kind: uni_store::fork::ForkLocalIndexKind) -> Option<FusionKind> {
9552 use uni_store::fork::ForkLocalIndexKind as K;
9553 match kind {
9554 K::VidUid => Some(FusionKind::VidUidForkFirst),
9555 K::ScalarBtree => Some(FusionKind::BtreeUnion),
9556 K::Sorted => Some(FusionKind::SortedKWayMerge),
9557 K::Vector => Some(FusionKind::AnnRerank),
9558 K::FullText => Some(FusionKind::Bm25Rrf),
9559 _ => None,
9564 }
9565}
9566
9567fn equality_target_column(filter: &Expr, scan_variable: &str) -> Option<String> {
9573 let (lhs, rhs) = match filter {
9574 Expr::BinaryOp {
9575 left,
9576 op: uni_cypher::ast::BinaryOp::Eq,
9577 right,
9578 } => (left.as_ref(), right.as_ref()),
9579 _ => return None,
9580 };
9581 if let Some(col) = column_of_scan_variable(lhs, scan_variable)
9583 && is_constant_or_param(rhs)
9584 {
9585 return Some(col);
9586 }
9587 if let Some(col) = column_of_scan_variable(rhs, scan_variable)
9588 && is_constant_or_param(lhs)
9589 {
9590 return Some(col);
9591 }
9592 None
9593}
9594
9595fn column_of_scan_variable(expr: &Expr, scan_variable: &str) -> Option<String> {
9596 if let Expr::Property(base, prop) = expr
9597 && let Expr::Variable(v) = base.as_ref()
9598 && v == scan_variable
9599 {
9600 return Some(prop.clone());
9601 }
9602 None
9603}
9604
9605fn is_constant_or_param(expr: &Expr) -> bool {
9606 matches!(expr, Expr::Literal(_) | Expr::Parameter(_))
9607}
9608
9609#[cfg(test)]
9610mod pushdown_tests {
9611 use super::*;
9612
9613 #[test]
9614 fn test_validat_extracts_property_names() {
9615 let mut properties = HashMap::new();
9617
9618 let args = vec![
9619 Expr::Variable("e".to_string()),
9620 Expr::Literal(CypherLiteral::String("start".to_string())),
9621 Expr::Literal(CypherLiteral::String("end".to_string())),
9622 Expr::Variable("ts".to_string()),
9623 ];
9624
9625 analyze_function_property_requirements("uni.temporal.validAt", &args, &mut properties);
9626
9627 assert!(properties.contains_key("e"));
9628 let e_props: HashSet<String> = ["start".to_string(), "end".to_string()]
9629 .iter()
9630 .cloned()
9631 .collect();
9632 assert_eq!(properties.get("e").unwrap(), &e_props);
9633 }
9634
9635 #[test]
9636 fn test_keys_requires_wildcard() {
9637 let mut properties = HashMap::new();
9639
9640 let args = vec![Expr::Variable("n".to_string())];
9641
9642 analyze_function_property_requirements("keys", &args, &mut properties);
9643
9644 assert!(properties.contains_key("n"));
9645 let n_props: HashSet<String> = ["*".to_string()].iter().cloned().collect();
9646 assert_eq!(properties.get("n").unwrap(), &n_props);
9647 }
9648
9649 #[test]
9650 fn test_properties_requires_wildcard() {
9651 let mut properties = HashMap::new();
9653
9654 let args = vec![Expr::Variable("n".to_string())];
9655
9656 analyze_function_property_requirements("properties", &args, &mut properties);
9657
9658 assert!(properties.contains_key("n"));
9659 let n_props: HashSet<String> = ["*".to_string()].iter().cloned().collect();
9660 assert_eq!(properties.get("n").unwrap(), &n_props);
9661 }
9662
9663 #[test]
9664 fn test_unknown_function_conservative() {
9665 let mut properties = HashMap::new();
9667
9668 let args = vec![Expr::Variable("e".to_string())];
9669
9670 analyze_function_property_requirements("customUdf", &args, &mut properties);
9671
9672 assert!(properties.contains_key("e"));
9673 let e_props: HashSet<String> = ["*".to_string()].iter().cloned().collect();
9674 assert_eq!(properties.get("e").unwrap(), &e_props);
9675 }
9676
9677 #[test]
9678 fn test_parameter_property_name() {
9679 let mut properties = HashMap::new();
9681
9682 let args = vec![
9683 Expr::Variable("e".to_string()),
9684 Expr::Parameter("start".to_string()),
9685 Expr::Parameter("end".to_string()),
9686 Expr::Variable("ts".to_string()),
9687 ];
9688
9689 analyze_function_property_requirements("uni.temporal.validAt", &args, &mut properties);
9690
9691 assert!(properties.contains_key("e"));
9692 assert!(properties.get("e").unwrap().contains("*"));
9693 }
9694
9695 #[test]
9696 fn test_validat_expr_extracts_properties() {
9697 let mut properties = HashMap::new();
9699
9700 let validat_expr = Expr::ValidAt {
9701 entity: Box::new(Expr::Variable("e".to_string())),
9702 timestamp: Box::new(Expr::Variable("ts".to_string())),
9703 start_prop: Some("valid_from".to_string()),
9704 end_prop: Some("valid_to".to_string()),
9705 };
9706
9707 collect_properties_from_expr_into(&validat_expr, &mut properties);
9708
9709 assert!(properties.contains_key("e"));
9710 assert!(properties.get("e").unwrap().contains("valid_from"));
9711 assert!(properties.get("e").unwrap().contains("valid_to"));
9712 }
9713
9714 #[test]
9715 fn test_array_index_requires_wildcard() {
9716 let mut properties = HashMap::new();
9718
9719 let array_index_expr = Expr::ArrayIndex {
9720 array: Box::new(Expr::Variable("e".to_string())),
9721 index: Box::new(Expr::Variable("prop".to_string())),
9722 };
9723
9724 collect_properties_from_expr_into(&array_index_expr, &mut properties);
9725
9726 assert!(properties.contains_key("e"));
9727 assert!(properties.get("e").unwrap().contains("*"));
9728 }
9729
9730 #[test]
9731 fn test_property_access_extraction() {
9732 let mut properties = HashMap::new();
9734
9735 let prop_access = Expr::Property(
9736 Box::new(Expr::Variable("e".to_string())),
9737 "name".to_string(),
9738 );
9739
9740 collect_properties_from_expr_into(&prop_access, &mut properties);
9741
9742 assert!(properties.contains_key("e"));
9743 assert!(properties.get("e").unwrap().contains("name"));
9744 }
9745}