1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9 PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
14use crate::query::planner::QueryPlanner;
15use crate::query::similar_to::SimilarToError;
16use anyhow::{Result, anyhow};
17use arrow_array::builder::BooleanBuilder;
18use arrow_schema::{DataType, Field, Schema};
19use datafusion::execution::context::SessionState;
20use datafusion::physical_expr::expressions::binary;
21use datafusion::physical_plan::PhysicalExpr;
22use datafusion::physical_planner::PhysicalPlanner;
23use datafusion::prelude::SessionContext;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27use uni_common::Value;
28use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
29use uni_cypher::ast::{
30 BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
31 Statement, UnaryOp, UnwindClause, WithClause,
32};
33use uni_store::storage::manager::StorageManager;
34
35fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
37 dt.is_some_and(|t| *t == DataType::LargeBinary)
38}
39
40fn resolve_list_element_type(
50 list_data_type: &DataType,
51 large_binary_fallback: DataType,
52 context: &str,
53) -> Result<DataType> {
54 match list_data_type {
55 DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
56 DataType::Null => Ok(DataType::Null),
57 DataType::LargeBinary => Ok(large_binary_fallback),
58 _ => Err(anyhow!(
59 "{} input must be a list, got {:?}",
60 context,
61 list_data_type
62 )),
63 }
64}
65
66#[derive(Debug)]
71struct LargeListToCypherValueExpr {
72 child: Arc<dyn PhysicalExpr>,
73}
74
75impl LargeListToCypherValueExpr {
76 fn new(child: Arc<dyn PhysicalExpr>) -> Self {
77 Self { child }
78 }
79}
80
81impl std::fmt::Display for LargeListToCypherValueExpr {
82 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(f, "LargeListToCypherValue({})", self.child)
84 }
85}
86
87impl PartialEq for LargeListToCypherValueExpr {
88 fn eq(&self, other: &Self) -> bool {
89 Arc::ptr_eq(&self.child, &other.child)
90 }
91}
92
93impl Eq for LargeListToCypherValueExpr {}
94
95impl std::hash::Hash for LargeListToCypherValueExpr {
96 fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
97 std::any::type_name::<Self>().hash(_state);
99 }
100}
101
102impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
103 fn eq(&self, other: &dyn std::any::Any) -> bool {
104 other
105 .downcast_ref::<Self>()
106 .map(|x| self == x)
107 .unwrap_or(false)
108 }
109}
110
111impl PhysicalExpr for LargeListToCypherValueExpr {
112 fn as_any(&self) -> &dyn std::any::Any {
113 self
114 }
115
116 fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
117 Ok(DataType::LargeBinary)
118 }
119
120 fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
121 self.child.nullable(input_schema)
122 }
123
124 fn evaluate(
125 &self,
126 batch: &arrow_array::RecordBatch,
127 ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
128 use datafusion::arrow::compute::cast;
129 use datafusion::logical_expr::ColumnarValue;
130
131 let child_result = self.child.evaluate(batch)?;
132 let child_array = child_result.into_array(batch.num_rows())?;
133
134 let list_array = if let DataType::List(field) = child_array.data_type() {
136 let target_type = DataType::LargeList(field.clone());
137 cast(&child_array, &target_type).map_err(|e| {
138 datafusion::error::DataFusionError::Execution(format!(
139 "List to LargeList cast failed: {e}"
140 ))
141 })?
142 } else {
143 child_array.clone()
144 };
145
146 if list_array.data_type() == &DataType::LargeBinary {
148 return Ok(ColumnarValue::Array(list_array));
149 }
150
151 if let Some(large_list) = list_array
153 .as_any()
154 .downcast_ref::<datafusion::arrow::array::LargeListArray>()
155 {
156 let cv_array =
157 crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
158 Ok(ColumnarValue::Array(cv_array))
159 } else {
160 Err(datafusion::error::DataFusionError::Execution(format!(
161 "Expected List or LargeList, got {:?}",
162 list_array.data_type()
163 )))
164 }
165 }
166
167 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
168 vec![&self.child]
169 }
170
171 fn with_new_children(
172 self: Arc<Self>,
173 children: Vec<Arc<dyn PhysicalExpr>>,
174 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
175 if children.len() != 1 {
176 return Err(datafusion::error::DataFusionError::Execution(
177 "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
178 ));
179 }
180 Ok(Arc::new(LargeListToCypherValueExpr::new(
181 children[0].clone(),
182 )))
183 }
184
185 fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
186 write!(f, "LargeListToCypherValue({})", self.child)
187 }
188}
189
190pub struct CypherPhysicalExprCompiler<'a> {
192 state: &'a SessionState,
193 translation_ctx: Option<&'a TranslationContext>,
194 graph_ctx: Option<Arc<GraphExecutionContext>>,
195 uni_schema: Option<Arc<UniSchema>>,
196 session_ctx: Option<Arc<RwLock<SessionContext>>>,
198 storage: Option<Arc<StorageManager>>,
200 params: HashMap<String, Value>,
202}
203
204impl<'a> CypherPhysicalExprCompiler<'a> {
205 pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
206 Self {
207 state,
208 translation_ctx,
209 graph_ctx: None,
210 uni_schema: None,
211 session_ctx: None,
212 storage: None,
213 params: HashMap::new(),
214 }
215 }
216
217 fn scoped_compiler<'b>(
229 &'b self,
230 exclude_vars: &[&str],
231 scoped_ctx_slot: &'b mut Option<TranslationContext>,
232 ) -> CypherPhysicalExprCompiler<'b>
233 where
234 'a: 'b,
235 {
236 let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
237 exclude_vars
238 .iter()
239 .any(|v| ctx.variable_kinds.contains_key(*v))
240 });
241
242 let ctx_ref = if needs_scoping {
243 let ctx = self.translation_ctx.unwrap();
244 let mut new_kinds = ctx.variable_kinds.clone();
245 for v in exclude_vars {
246 new_kinds.remove(*v);
247 }
248 *scoped_ctx_slot = Some(TranslationContext {
249 parameters: ctx.parameters.clone(),
250 outer_values: ctx.outer_values.clone(),
251 variable_labels: ctx.variable_labels.clone(),
252 variable_kinds: new_kinds,
253 node_variable_hints: ctx.node_variable_hints.clone(),
254 mutation_edge_hints: ctx.mutation_edge_hints.clone(),
255 statement_time: ctx.statement_time,
256 });
257 scoped_ctx_slot.as_ref()
258 } else {
259 self.translation_ctx
260 };
261
262 CypherPhysicalExprCompiler {
263 state: self.state,
264 translation_ctx: ctx_ref,
265 graph_ctx: self.graph_ctx.clone(),
266 uni_schema: self.uni_schema.clone(),
267 session_ctx: self.session_ctx.clone(),
268 storage: self.storage.clone(),
269 params: self.params.clone(),
270 }
271 }
272
273 pub fn with_graph_ctx(
275 mut self,
276 graph_ctx: Arc<GraphExecutionContext>,
277 uni_schema: Arc<UniSchema>,
278 ) -> Self {
279 self.graph_ctx = Some(graph_ctx);
280 self.uni_schema = Some(uni_schema);
281 self
282 }
283
284 pub fn with_subquery_ctx(
286 mut self,
287 graph_ctx: Arc<GraphExecutionContext>,
288 uni_schema: Arc<UniSchema>,
289 session_ctx: Arc<RwLock<SessionContext>>,
290 storage: Arc<StorageManager>,
291 params: HashMap<String, Value>,
292 ) -> Self {
293 self.graph_ctx = Some(graph_ctx);
294 self.uni_schema = Some(uni_schema);
295 self.session_ctx = Some(session_ctx);
296 self.storage = Some(storage);
297 self.params = params;
298 self
299 }
300
301 pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
303 match expr {
304 Expr::ListComprehension {
305 variable,
306 list,
307 where_clause,
308 map_expr,
309 } => self.compile_list_comprehension(
310 variable,
311 list,
312 where_clause.as_deref(),
313 map_expr,
314 input_schema,
315 ),
316 Expr::Quantifier {
317 quantifier,
318 variable,
319 list,
320 predicate,
321 } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
322 Expr::Reduce {
323 accumulator,
324 init,
325 variable,
326 list,
327 expr: expression,
328 } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
329 Expr::BinaryOp { left, op, right } => {
331 self.compile_binary_op_dispatch(left, op, right, input_schema)
332 }
333 Expr::UnaryOp { op, expr: inner } => {
334 if matches!(op, UnaryOp::Not) {
335 let mut inner_phy = self.compile(inner, input_schema)?;
336 if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
337 inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
338 }
339 self.compile_unary_op(op, inner_phy, input_schema)
340 } else if Self::contains_custom_expr(inner) {
341 let inner_phy = self.compile(inner, input_schema)?;
342 self.compile_unary_op(op, inner_phy, input_schema)
343 } else {
344 self.compile_standard(expr, input_schema)
345 }
346 }
347 Expr::IsNull(inner) => {
348 if Self::contains_custom_expr(inner) {
349 let inner_phy = self.compile(inner, input_schema)?;
350 Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
351 .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
352 } else {
353 self.compile_standard(expr, input_schema)
354 }
355 }
356 Expr::IsNotNull(inner) => {
357 if Self::contains_custom_expr(inner) {
358 let inner_phy = self.compile(inner, input_schema)?;
359 Ok(
360 datafusion::physical_expr::expressions::is_not_null(inner_phy)
361 .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
362 )
363 } else {
364 self.compile_standard(expr, input_schema)
365 }
366 }
367 Expr::In {
369 expr: left,
370 list: right,
371 } => {
372 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
373 let left_phy = self.compile(left, input_schema)?;
374 let right_phy = self.compile(right, input_schema)?;
375
376 let left_type = left_phy
377 .data_type(input_schema)
378 .unwrap_or(DataType::LargeBinary);
379 let right_type = right_phy
380 .data_type(input_schema)
381 .unwrap_or(DataType::LargeBinary);
382
383 self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
384 .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
385 } else {
386 self.compile_standard(expr, input_schema)
387 }
388 }
389
390 Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
392 "List literals containing comprehensions not yet supported in compiler"
393 )),
394 Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
395 Err(anyhow!(
396 "Map literals containing comprehensions not yet supported in compiler"
397 ))
398 }
399
400 Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
402
403 Expr::ArrayIndex { array, index } => {
405 self.compile_array_index(array, index, input_schema)
406 }
407
408 Expr::PatternComprehension {
410 path_variable,
411 pattern,
412 where_clause,
413 map_expr,
414 } => self.compile_pattern_comprehension(
415 path_variable,
416 pattern,
417 where_clause.as_deref(),
418 map_expr,
419 input_schema,
420 ),
421
422 Expr::Exists { query, .. } => self.compile_exists(query),
424
425 Expr::FunctionCall {
427 name,
428 args,
429 distinct,
430 ..
431 } => {
432 if name.eq_ignore_ascii_case("similar_to") {
433 return self.compile_similar_to(args, input_schema);
434 }
435 if args.iter().any(Self::contains_custom_expr) {
436 self.compile_function_with_custom_args(name, args, *distinct, input_schema)
437 } else {
438 self.compile_standard(expr, input_schema)
439 }
440 }
441
442 Expr::Case {
444 expr: case_operand,
445 when_then,
446 else_expr,
447 } => {
448 let has_custom = case_operand
450 .as_deref()
451 .is_some_and(Self::contains_custom_expr)
452 || when_then.iter().any(|(w, t)| {
453 Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
454 })
455 || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
456
457 if has_custom {
458 self.compile_case(case_operand, when_then, else_expr, input_schema)
461 } else {
462 self.compile_standard(expr, input_schema)
467 }
468 }
469
470 Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
472
473 _ => self.compile_standard(expr, input_schema),
475 }
476 }
477
478 fn compile_binary_op_dispatch(
480 &self,
481 left: &Expr,
482 op: &BinaryOp,
483 right: &Expr,
484 input_schema: &Schema,
485 ) -> Result<Arc<dyn PhysicalExpr>> {
486 if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
487 && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
488 && let Some(ctx) = self.translation_ctx
489 && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
490 {
491 let identity_prop = match (lk, rk) {
492 (VariableKind::Node, VariableKind::Node) => Some("_vid"),
493 (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
494 _ => None,
495 };
496
497 if let Some(id_prop) = identity_prop {
498 return self.compile_standard(
499 &Expr::BinaryOp {
500 left: Box::new(Expr::Property(
501 Box::new(Expr::Variable(lv.clone())),
502 id_prop.to_string(),
503 )),
504 op: *op,
505 right: Box::new(Expr::Property(
506 Box::new(Expr::Variable(rv.clone())),
507 id_prop.to_string(),
508 )),
509 },
510 input_schema,
511 );
512 }
513 }
514
515 if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
519 return self.compile_standard(
520 &Expr::BinaryOp {
521 left: Box::new(left.clone()),
522 op: *op,
523 right: Box::new(right.clone()),
524 },
525 input_schema,
526 );
527 }
528
529 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
530 let left_phy = self.compile(left, input_schema)?;
531 let right_phy = self.compile(right, input_schema)?;
532 return self.compile_binary_op(op, left_phy, right_phy, input_schema);
533 }
534
535 if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
539 {
540 return self.compile_standard(
541 &Expr::BinaryOp {
542 left: Box::new(left.clone()),
543 op: *op,
544 right: Box::new(right.clone()),
545 },
546 input_schema,
547 );
548 }
549
550 let left_phy = self.compile(left, input_schema)?;
555 let right_phy = self.compile(right, input_schema)?;
556 let left_dt = left_phy.data_type(input_schema).ok();
557 let right_dt = right_phy.data_type(input_schema).ok();
558 let has_cv =
559 is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
560
561 if has_cv {
562 self.compile_binary_op(op, left_phy, right_phy, input_schema)
564 } else {
565 self.compile_standard(
568 &Expr::BinaryOp {
569 left: Box::new(left.clone()),
570 op: *op,
571 right: Box::new(right.clone()),
572 },
573 input_schema,
574 )
575 }
576 }
577
578 fn try_compile_struct_field(
583 &self,
584 var_name: &str,
585 field_name: &str,
586 input_schema: &Schema,
587 ) -> Option<Arc<dyn PhysicalExpr>> {
588 let col_idx = input_schema.index_of(var_name).ok()?;
589 let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
590 return None;
591 };
592
593 if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
595 let output_type = struct_fields[field_idx].data_type().clone();
596 let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
597 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
598 );
599 Some(Arc::new(StructFieldAccessExpr::new(
600 col_expr,
601 field_idx,
602 output_type,
603 )))
604 } else {
605 Some(Arc::new(
606 datafusion::physical_expr::expressions::Literal::new(
607 datafusion::common::ScalarValue::Null,
608 ),
609 ))
610 }
611 }
612
613 fn compile_property_access(
615 &self,
616 base: &Expr,
617 prop: &str,
618 input_schema: &Schema,
619 ) -> Result<Arc<dyn PhysicalExpr>> {
620 if let Expr::Variable(var_name) = base {
621 if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
623 return Ok(expr);
624 }
625 let flat_col = format!("{}.{}", var_name, prop);
627 if let Ok(col_idx) = input_schema.index_of(&flat_col) {
628 return Ok(Arc::new(
629 datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
630 ));
631 }
632 }
633 self.compile_standard(
634 &Expr::Property(Box::new(base.clone()), prop.to_string()),
635 input_schema,
636 )
637 }
638
639 fn compile_array_index(
641 &self,
642 array: &Expr,
643 index: &Expr,
644 input_schema: &Schema,
645 ) -> Result<Arc<dyn PhysicalExpr>> {
646 if let Expr::Variable(var_name) = array
647 && let Expr::Literal(CypherLiteral::String(prop)) = index
648 && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
649 {
650 return Ok(expr);
651 }
652 self.compile_standard(
653 &Expr::ArrayIndex {
654 array: Box::new(array.clone()),
655 index: Box::new(index.clone()),
656 },
657 input_schema,
658 )
659 }
660
661 fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
663 if has_mutation_clause(query) {
665 return Err(anyhow!(
666 "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
667 ));
668 }
669
670 let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
671
672 let graph_ctx = self
673 .graph_ctx
674 .clone()
675 .ok_or_else(|| err("GraphExecutionContext"))?;
676 let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
677 let session_ctx = self
678 .session_ctx
679 .clone()
680 .ok_or_else(|| err("SessionContext"))?;
681 let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
682
683 Ok(Arc::new(ExistsExecExpr::new(
684 query.clone(),
685 graph_ctx,
686 session_ctx,
687 storage,
688 uni_schema,
689 self.params.clone(),
690 )))
691 }
692
693 fn compile_similar_to(
695 &self,
696 args: &[Expr],
697 input_schema: &Schema,
698 ) -> Result<Arc<dyn PhysicalExpr>> {
699 if args.len() < 2 || args.len() > 3 {
700 return Err(SimilarToError::InvalidArity { count: args.len() }.into());
701 }
702
703 let graph_ctx = self
704 .graph_ctx
705 .clone()
706 .ok_or(SimilarToError::NoGraphContext)?;
707
708 let source_variable = extract_source_variable(&args[0]);
710 let source_property_names = extract_property_names(&args[0]);
711
712 let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
715
716 let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
718 .iter()
719 .map(|e| self.compile(e, input_schema))
720 .collect::<Result<Vec<_>>>()?;
721 let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
722 .iter()
723 .map(|e| self.compile(e, input_schema))
724 .collect::<Result<Vec<_>>>()?;
725
726 let options_child = if args.len() == 3 {
728 Some(self.compile(&args[2], input_schema)?)
729 } else {
730 None
731 };
732
733 let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
735 .iter()
736 .map(|prop_name| {
737 prop_name.as_ref().and_then(|prop| {
738 self.uni_schema
739 .as_ref()
740 .and_then(|schema| resolve_metric_for_property(schema, prop))
741 })
742 })
743 .collect();
744
745 Ok(Arc::new(SimilarToExecExpr::new(
746 source_children,
747 query_children,
748 options_child,
749 graph_ctx,
750 source_variable,
751 source_property_names,
752 source_metrics,
753 )))
754 }
755
756 fn needs_vid_extraction_for_variable(
758 variable: &str,
759 map_expr: &Expr,
760 where_clause: Option<&Expr>,
761 ) -> bool {
762 fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
763 match expr {
764 Expr::PatternComprehension { pattern, .. } => {
765 pattern.paths.iter().any(|path| {
767 path.elements.iter().any(|elem| match elem {
768 uni_cypher::ast::PatternElement::Node(n) => {
769 n.variable.as_deref() == Some(var)
770 }
771 uni_cypher::ast::PatternElement::Relationship(r) => {
772 r.variable.as_deref() == Some(var)
773 }
774 _ => false,
775 })
776 })
777 }
778 Expr::FunctionCall { args, .. } => args
779 .iter()
780 .any(|a| expr_has_pattern_comp_referencing(a, var)),
781 Expr::BinaryOp { left, right, .. } => {
782 expr_has_pattern_comp_referencing(left, var)
783 || expr_has_pattern_comp_referencing(right, var)
784 }
785 Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
786 expr_has_pattern_comp_referencing(e, var)
787 }
788 Expr::List(items) => items
789 .iter()
790 .any(|i| expr_has_pattern_comp_referencing(i, var)),
791 Expr::ListComprehension {
792 list,
793 map_expr,
794 where_clause,
795 ..
796 } => {
797 expr_has_pattern_comp_referencing(list, var)
798 || expr_has_pattern_comp_referencing(map_expr, var)
799 || where_clause
800 .as_ref()
801 .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
802 }
803 _ => false,
804 }
805 }
806
807 expr_has_pattern_comp_referencing(map_expr, variable)
808 || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
809 }
810
811 pub fn contains_custom_expr(expr: &Expr) -> bool {
813 match expr {
814 Expr::ListComprehension { .. } => true,
815 Expr::Quantifier { .. } => true,
816 Expr::Reduce { .. } => true,
817 Expr::PatternComprehension { .. } => true,
818 Expr::BinaryOp { left, right, .. } => {
819 Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
820 }
821 Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
822 Expr::FunctionCall { name, args, .. } => {
823 name.eq_ignore_ascii_case("similar_to")
824 || args.iter().any(Self::contains_custom_expr)
825 }
826 Expr::Case {
827 when_then,
828 else_expr,
829 ..
830 } => {
831 when_then
832 .iter()
833 .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
834 || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
835 }
836 Expr::List(items) => items.iter().any(Self::contains_custom_expr),
837 Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
838 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
839 Expr::In { expr: l, list: r } => {
840 Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
841 }
842 Expr::Exists { .. } => true,
843 Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
844 _ => false,
845 }
846 }
847
848 fn is_list_producing(expr: &Expr) -> bool {
851 match expr {
852 Expr::List(_) => true,
853 Expr::ListComprehension { .. } => true,
854 Expr::ArraySlice { .. } => true,
855 Expr::BinaryOp {
857 left,
858 op: BinaryOp::Add,
859 right,
860 } => Self::is_list_producing(left) || Self::is_list_producing(right),
861 Expr::FunctionCall { name, .. } => {
862 matches!(
864 name.as_str(),
865 "range"
866 | "tail"
867 | "reverse"
868 | "collect"
869 | "keys"
870 | "labels"
871 | "nodes"
872 | "relationships"
873 )
874 }
875 _ => false,
876 }
877 }
878
879 fn compile_standard(
880 &self,
881 expr: &Expr,
882 input_schema: &Schema,
883 ) -> Result<Arc<dyn PhysicalExpr>> {
884 let resolved = Self::resolve_flat_column_properties(expr, input_schema);
889 let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
890 let resolved_expr = self.resolve_udfs(df_expr)?;
891
892 let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
893
894 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
896
897 let coerced_expr = self.resolve_udfs(coerced_expr)?;
899
900 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
901 planner
902 .create_physical_expr(&coerced_expr, &df_schema, self.state)
903 .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
904 }
905
906 fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
912 match expr {
913 Expr::Property(base, prop) => {
914 if let Expr::Variable(var) = base.as_ref() {
915 let flat_col = format!("{}.{}", var, prop);
916 if schema.index_of(&flat_col).is_ok() {
917 return Expr::Variable(flat_col);
918 }
919 }
920 Expr::Property(
922 Box::new(Self::resolve_flat_column_properties(base, schema)),
923 prop.clone(),
924 )
925 }
926 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
927 left: Box::new(Self::resolve_flat_column_properties(left, schema)),
928 op: *op,
929 right: Box::new(Self::resolve_flat_column_properties(right, schema)),
930 },
931 Expr::FunctionCall {
932 name,
933 args,
934 distinct,
935 window_spec,
936 } => Expr::FunctionCall {
937 name: name.clone(),
938 args: args
939 .iter()
940 .map(|a| Self::resolve_flat_column_properties(a, schema))
941 .collect(),
942 distinct: *distinct,
943 window_spec: window_spec.clone(),
944 },
945 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
946 op: *op,
947 expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
948 },
949 Expr::List(items) => Expr::List(
950 items
951 .iter()
952 .map(|i| Self::resolve_flat_column_properties(i, schema))
953 .collect(),
954 ),
955 other => other.clone(),
957 }
958 }
959
960 fn resolve_udfs(
965 &self,
966 expr: datafusion::logical_expr::Expr,
967 ) -> Result<datafusion::logical_expr::Expr> {
968 use datafusion::common::tree_node::{Transformed, TreeNode};
969 use datafusion::logical_expr::Expr as DfExpr;
970
971 let result = expr
972 .transform_up(|node| {
973 if let DfExpr::ScalarFunction(ref func) = node {
974 let udf_name = func.func.name();
975 if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
976 return Ok(Transformed::yes(DfExpr::ScalarFunction(
977 datafusion::logical_expr::expr::ScalarFunction {
978 func: registered_udf.clone(),
979 args: func.args.clone(),
980 },
981 )));
982 }
983 }
984 Ok(Transformed::no(node))
985 })
986 .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
987 Ok(result.data)
988 }
989
990 fn compile_list_comprehension(
991 &self,
992 variable: &str,
993 list: &Expr,
994 where_clause: Option<&Expr>,
995 map_expr: &Expr,
996 input_schema: &Schema,
997 ) -> Result<Arc<dyn PhysicalExpr>> {
998 let input_list_phy = self.compile(list, input_schema)?;
999
1000 let list_data_type = input_list_phy.data_type(input_schema)?;
1002 let inner_data_type = resolve_list_element_type(
1003 &list_data_type,
1004 DataType::LargeBinary,
1005 "List comprehension",
1006 )?;
1007
1008 let mut fields = input_schema.fields().to_vec();
1010 let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1011
1012 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1013 fields[pos] = loop_var_field;
1014 } else {
1015 fields.push(loop_var_field);
1016 }
1017
1018 let needs_vid_extraction =
1020 Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1021 if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1022 let vid_field = Arc::new(Field::new(
1024 format!("{}._vid", variable),
1025 DataType::UInt64,
1026 true,
1027 ));
1028 fields.push(vid_field);
1029 }
1030
1031 let inner_schema = Arc::new(Schema::new(fields));
1032
1033 let mut scoped_ctx = None;
1035 let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1036
1037 let predicate_phy = if let Some(pred) = where_clause {
1038 Some(inner_compiler.compile(pred, &inner_schema)?)
1039 } else {
1040 None
1041 };
1042
1043 let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1044 let output_item_type = map_phy.data_type(&inner_schema)?;
1045
1046 Ok(Arc::new(ListComprehensionExecExpr::new(
1047 input_list_phy,
1048 map_phy,
1049 predicate_phy,
1050 variable.to_string(),
1051 Arc::new(input_schema.clone()),
1052 output_item_type,
1053 needs_vid_extraction,
1054 )))
1055 }
1056
1057 fn compile_reduce(
1058 &self,
1059 accumulator: &str,
1060 initial: &Expr,
1061 variable: &str,
1062 list: &Expr,
1063 reduce_expr: &Expr,
1064 input_schema: &Schema,
1065 ) -> Result<Arc<dyn PhysicalExpr>> {
1066 let list_phy = self.compile(list, input_schema)?;
1067
1068 let initial_phy = self.compile(initial, input_schema)?;
1069 let acc_type = initial_phy.data_type(input_schema)?;
1070
1071 let list_data_type = list_phy.data_type(input_schema)?;
1072 let inner_data_type =
1075 resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1076
1077 let mut fields = input_schema.fields().to_vec();
1079
1080 let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1081 if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1082 fields[pos] = acc_field;
1083 } else {
1084 fields.push(acc_field);
1085 }
1086
1087 let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1088 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1089 fields[pos] = var_field;
1090 } else {
1091 fields.push(var_field);
1092 }
1093
1094 let inner_schema = Arc::new(Schema::new(fields));
1095
1096 let mut scoped_ctx = None;
1098 let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1099
1100 let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1101 let output_type = reduce_phy.data_type(&inner_schema)?;
1102
1103 Ok(Arc::new(ReduceExecExpr::new(
1104 accumulator.to_string(),
1105 initial_phy,
1106 variable.to_string(),
1107 list_phy,
1108 reduce_phy,
1109 Arc::new(input_schema.clone()),
1110 output_type,
1111 )))
1112 }
1113
1114 fn compile_quantifier(
1115 &self,
1116 quantifier: &uni_cypher::ast::Quantifier,
1117 variable: &str,
1118 list: &Expr,
1119 predicate: &Expr,
1120 input_schema: &Schema,
1121 ) -> Result<Arc<dyn PhysicalExpr>> {
1122 let input_list_phy = self.compile(list, input_schema)?;
1123
1124 let list_data_type = input_list_phy.data_type(input_schema)?;
1126 let inner_data_type =
1127 resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1128
1129 let mut fields = input_schema.fields().to_vec();
1133 let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1134
1135 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1137 fields[pos] = loop_var_field;
1138 } else {
1139 fields.push(loop_var_field);
1140 }
1141
1142 let inner_schema = Arc::new(Schema::new(fields));
1143
1144 let mut scoped_ctx = None;
1148 let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1149
1150 let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1151
1152 if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1154 predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1155 }
1156
1157 let qt = match quantifier {
1158 uni_cypher::ast::Quantifier::All => QuantifierType::All,
1159 uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1160 uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1161 uni_cypher::ast::Quantifier::None => QuantifierType::None,
1162 };
1163
1164 Ok(Arc::new(QuantifierExecExpr::new(
1165 input_list_phy,
1166 predicate_phy,
1167 variable.to_string(),
1168 Arc::new(input_schema.clone()),
1169 qt,
1170 )))
1171 }
1172
1173 fn compile_pattern_comprehension(
1174 &self,
1175 path_variable: &Option<String>,
1176 pattern: &uni_cypher::ast::Pattern,
1177 where_clause: Option<&Expr>,
1178 map_expr: &Expr,
1179 input_schema: &Schema,
1180 ) -> Result<Arc<dyn PhysicalExpr>> {
1181 let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1182
1183 let graph_ctx = self
1184 .graph_ctx
1185 .as_ref()
1186 .ok_or_else(|| err("GraphExecutionContext"))?;
1187 let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1188
1189 let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1191
1192 let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1194
1195 let inner_schema = build_inner_schema(
1197 input_schema,
1198 &steps,
1199 &vertex_props,
1200 &edge_props,
1201 path_variable.as_deref(),
1202 );
1203
1204 let pred_phy = where_clause
1206 .map(|p| self.compile(p, &inner_schema))
1207 .transpose()?;
1208 let map_phy = self.compile(map_expr, &inner_schema)?;
1209 let output_type = map_phy.data_type(&inner_schema)?;
1210
1211 Ok(Arc::new(PatternComprehensionExecExpr::new(
1213 graph_ctx.clone(),
1214 anchor_col,
1215 steps,
1216 path_variable.clone(),
1217 pred_phy,
1218 map_phy,
1219 Arc::new(input_schema.clone()),
1220 Arc::new(inner_schema),
1221 output_type,
1222 vertex_props,
1223 edge_props,
1224 )))
1225 }
1226
1227 fn compile_function_with_custom_args(
1233 &self,
1234 name: &str,
1235 args: &[Expr],
1236 _distinct: bool,
1237 input_schema: &Schema,
1238 ) -> Result<Arc<dyn PhysicalExpr>> {
1239 let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1241 .iter()
1242 .map(|arg| self.compile(arg, input_schema))
1243 .collect::<Result<Vec<_>>>()?;
1244
1245 let udf_name = Self::cypher_fn_to_udf(name);
1247 let udf = self
1248 .state
1249 .scalar_functions()
1250 .get(udf_name.as_str())
1251 .ok_or_else(|| {
1252 anyhow!(
1253 "UDF '{}' not found in registry for function '{}'",
1254 udf_name,
1255 name
1256 )
1257 })?;
1258
1259 let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1261 let operand_types: Vec<(&str, DataType)> = compiled_args
1262 .iter()
1263 .enumerate()
1264 .map(|(i, arg)| {
1265 let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1266 let placeholder = placeholders[i.min(3)];
1267 (placeholder, dt)
1268 })
1269 .collect();
1270
1271 let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1272 .iter()
1273 .map(|(name, _)| {
1274 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1275 None::<String>,
1276 *name,
1277 ))
1278 })
1279 .collect();
1280
1281 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1282 datafusion::logical_expr::expr::ScalarFunction {
1283 func: udf.clone(),
1284 args: dummy_cols,
1285 },
1286 );
1287
1288 self.plan_udf_physical_expr(
1290 &udf_expr,
1291 &operand_types,
1292 compiled_args,
1293 &format!("function {}", name),
1294 )
1295 }
1296
1297 fn cypher_fn_to_udf(name: &str) -> String {
1302 match name.to_uppercase().as_str() {
1303 "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1304 "REVERSE" => "_cypher_reverse".to_string(),
1305 "TOSTRING" => "tostring".to_string(),
1306 "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1307 "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1308 "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1309 "HEAD" => "head".to_string(),
1310 "LAST" => "last".to_string(),
1311 "TAIL" => "tail".to_string(),
1312 "KEYS" => "keys".to_string(),
1313 "TYPE" => "type".to_string(),
1314 "PROPERTIES" => "properties".to_string(),
1315 "LABELS" => "labels".to_string(),
1316 "COALESCE" => "coalesce".to_string(),
1317 "ID" => "id".to_string(),
1318 _ => name.to_lowercase(),
1320 }
1321 }
1322
1323 fn compile_case(
1332 &self,
1333 operand: &Option<Box<Expr>>,
1334 when_then: &[(Expr, Expr)],
1335 else_expr: &Option<Box<Expr>>,
1336 input_schema: &Schema,
1337 ) -> Result<Arc<dyn PhysicalExpr>> {
1338 let operand_phy = operand
1339 .as_deref()
1340 .map(|e| self.compile(e, input_schema))
1341 .transpose()?;
1342
1343 let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1344 .iter()
1345 .map(|(w, t)| {
1346 let w_phy = self.compile(w, input_schema)?;
1347 let t_phy = self.compile(t, input_schema)?;
1348 Ok((w_phy, t_phy))
1349 })
1350 .collect::<Result<Vec<_>>>()?;
1351
1352 let mut else_phy = else_expr
1353 .as_deref()
1354 .map(|e| self.compile(e, input_schema))
1355 .transpose()?;
1356
1357 for (w_phy, _) in &mut when_then_phy {
1359 if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1360 *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1361 }
1362 }
1363
1364 let branch_types: Vec<DataType> = when_then_phy
1366 .iter()
1367 .map(|(_, t)| t.data_type(input_schema))
1368 .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1369 .filter_map(Result::ok)
1370 .collect();
1371
1372 let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1373 let has_list = branch_types
1374 .iter()
1375 .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1376
1377 if has_large_binary && has_list {
1379 for (_, t_phy) in &mut when_then_phy {
1380 if let Ok(dt) = t_phy.data_type(input_schema)
1381 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1382 {
1383 *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1384 }
1385 }
1386 if let Some(e_phy) = else_phy.take() {
1387 if let Ok(dt) = e_phy.data_type(input_schema)
1388 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1389 {
1390 else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1391 } else {
1392 else_phy = Some(e_phy);
1393 }
1394 }
1395 }
1396
1397 let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1398 operand_phy,
1399 when_then_phy,
1400 else_phy,
1401 )
1402 .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1403
1404 Ok(Arc::new(case_expr))
1405 }
1406
1407 fn compile_binary_op(
1408 &self,
1409 op: &BinaryOp,
1410 left: Arc<dyn PhysicalExpr>,
1411 right: Arc<dyn PhysicalExpr>,
1412 input_schema: &Schema,
1413 ) -> Result<Arc<dyn PhysicalExpr>> {
1414 use datafusion::logical_expr::Operator;
1415
1416 let string_op = match op {
1418 BinaryOp::StartsWith => Some(StringOp::StartsWith),
1419 BinaryOp::EndsWith => Some(StringOp::EndsWith),
1420 BinaryOp::Contains => Some(StringOp::Contains),
1421 _ => None,
1422 };
1423 if let Some(sop) = string_op {
1424 return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1425 }
1426
1427 let df_op = match op {
1428 BinaryOp::Add => Operator::Plus,
1429 BinaryOp::Sub => Operator::Minus,
1430 BinaryOp::Mul => Operator::Multiply,
1431 BinaryOp::Div => Operator::Divide,
1432 BinaryOp::Mod => Operator::Modulo,
1433 BinaryOp::Eq => Operator::Eq,
1434 BinaryOp::NotEq => Operator::NotEq,
1435 BinaryOp::Gt => Operator::Gt,
1436 BinaryOp::GtEq => Operator::GtEq,
1437 BinaryOp::Lt => Operator::Lt,
1438 BinaryOp::LtEq => Operator::LtEq,
1439 BinaryOp::And => Operator::And,
1440 BinaryOp::Or => Operator::Or,
1441 BinaryOp::Xor => {
1442 return Err(anyhow!(
1443 "XOR not supported via binary helper, use bitwise_xor"
1444 ));
1445 }
1446 BinaryOp::Regex => Operator::RegexMatch,
1447 BinaryOp::ApproxEq => {
1448 return Err(anyhow!(
1449 "ApproxEq (~=) not yet supported in physical compiler"
1450 ));
1451 }
1452 BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1453 _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1454 };
1455
1456 let mut left = left;
1460 let mut right = right;
1461 let mut left_type = left.data_type(input_schema).ok();
1462 let mut right_type = right.data_type(input_schema).ok();
1463
1464 let left_is_list = matches!(
1467 left_type.as_ref(),
1468 Some(DataType::List(_) | DataType::LargeList(_))
1469 );
1470 let right_is_list = matches!(
1471 right_type.as_ref(),
1472 Some(DataType::List(_) | DataType::LargeList(_))
1473 );
1474
1475 if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1476 left = Arc::new(LargeListToCypherValueExpr::new(left));
1477 left_type = Some(DataType::LargeBinary);
1478 } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1479 right = Arc::new(LargeListToCypherValueExpr::new(right));
1480 right_type = Some(DataType::LargeBinary);
1481 }
1482
1483 let has_cv =
1484 is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1485
1486 if has_cv {
1487 if let Some(result) = self.compile_cv_comparison(
1488 df_op,
1489 left.clone(),
1490 right.clone(),
1491 &left_type,
1492 &right_type,
1493 )? {
1494 return Ok(result);
1495 }
1496 if let Some(result) = self.compile_cv_list_concat(
1497 left.clone(),
1498 right.clone(),
1499 &left_type,
1500 &right_type,
1501 df_op,
1502 )? {
1503 return Ok(result);
1504 }
1505 if let Some(result) = self.compile_cv_arithmetic(
1506 df_op,
1507 left.clone(),
1508 right.clone(),
1509 &left_type,
1510 &right_type,
1511 input_schema,
1512 )? {
1513 return Ok(result);
1514 }
1515 }
1516
1517 binary(left, df_op, right, input_schema)
1519 .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1520 }
1521
1522 fn compile_cv_comparison(
1524 &self,
1525 df_op: datafusion::logical_expr::Operator,
1526 left: Arc<dyn PhysicalExpr>,
1527 right: Arc<dyn PhysicalExpr>,
1528 left_type: &Option<DataType>,
1529 right_type: &Option<DataType>,
1530 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1531 use datafusion::logical_expr::Operator;
1532
1533 let udf_name = match df_op {
1534 Operator::Eq => "_cypher_equal",
1535 Operator::NotEq => "_cypher_not_equal",
1536 Operator::Gt => "_cypher_gt",
1537 Operator::GtEq => "_cypher_gt_eq",
1538 Operator::Lt => "_cypher_lt",
1539 Operator::LtEq => "_cypher_lt_eq",
1540 _ => return Ok(None),
1541 };
1542
1543 self.plan_binary_udf(
1544 udf_name,
1545 left,
1546 right,
1547 left_type.clone().unwrap_or(DataType::LargeBinary),
1548 right_type.clone().unwrap_or(DataType::LargeBinary),
1549 )
1550 }
1551
1552 fn compile_cv_list_concat(
1554 &self,
1555 left: Arc<dyn PhysicalExpr>,
1556 right: Arc<dyn PhysicalExpr>,
1557 left_type: &Option<DataType>,
1558 right_type: &Option<DataType>,
1559 df_op: datafusion::logical_expr::Operator,
1560 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1561 use datafusion::logical_expr::Operator;
1562
1563 if df_op != Operator::Plus {
1564 return Ok(None);
1565 }
1566
1567 let is_list = |t: &Option<DataType>| {
1569 t.as_ref()
1570 .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1571 };
1572
1573 if !is_list(left_type) && !is_list(right_type) {
1574 return Ok(None);
1575 }
1576
1577 self.plan_binary_udf(
1578 "_cypher_list_concat",
1579 left,
1580 right,
1581 left_type.clone().unwrap_or(DataType::LargeBinary),
1582 right_type.clone().unwrap_or(DataType::LargeBinary),
1583 )
1584 }
1585
1586 fn compile_cv_arithmetic(
1591 &self,
1592 df_op: datafusion::logical_expr::Operator,
1593 left: Arc<dyn PhysicalExpr>,
1594 right: Arc<dyn PhysicalExpr>,
1595 left_type: &Option<DataType>,
1596 right_type: &Option<DataType>,
1597 _input_schema: &Schema,
1598 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1599 use datafusion::logical_expr::Operator;
1600
1601 let udf_name = match df_op {
1602 Operator::Plus => "_cypher_add",
1603 Operator::Minus => "_cypher_sub",
1604 Operator::Multiply => "_cypher_mul",
1605 Operator::Divide => "_cypher_div",
1606 Operator::Modulo => "_cypher_mod",
1607 _ => return Ok(None),
1608 };
1609
1610 self.plan_binary_udf(
1611 udf_name,
1612 left,
1613 right,
1614 left_type.clone().unwrap_or(DataType::LargeBinary),
1615 right_type.clone().unwrap_or(DataType::LargeBinary),
1616 )
1617 }
1618
1619 fn plan_udf_physical_expr(
1621 &self,
1622 udf_expr: &datafusion::logical_expr::Expr,
1623 operand_types: &[(&str, DataType)],
1624 children: Vec<Arc<dyn PhysicalExpr>>,
1625 error_context: &str,
1626 ) -> Result<Arc<dyn PhysicalExpr>> {
1627 let tmp_schema = Schema::new(
1628 operand_types
1629 .iter()
1630 .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1631 .collect::<Vec<_>>(),
1632 );
1633 let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1634 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1635 let udf_phy = planner
1636 .create_physical_expr(udf_expr, &df_schema, self.state)
1637 .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1638 udf_phy
1639 .with_new_children(children)
1640 .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1641 }
1642
1643 fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1647 let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1648 return Err(anyhow!("_cv_to_bool UDF not found"));
1649 };
1650
1651 let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1652 None::<String>,
1653 "__cv__",
1654 ));
1655 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1656 datafusion::logical_expr::expr::ScalarFunction {
1657 func: udf.clone(),
1658 args: vec![dummy_col],
1659 },
1660 );
1661
1662 self.plan_udf_physical_expr(
1663 &udf_expr,
1664 &[("__cv__", DataType::LargeBinary)],
1665 vec![expr],
1666 "CypherValue to bool",
1667 )
1668 }
1669
1670 fn plan_binary_udf(
1672 &self,
1673 udf_name: &str,
1674 left: Arc<dyn PhysicalExpr>,
1675 right: Arc<dyn PhysicalExpr>,
1676 left_type: DataType,
1677 right_type: DataType,
1678 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1679 let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1680 return Ok(None);
1681 };
1682 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1683 datafusion::logical_expr::expr::ScalarFunction {
1684 func: udf.clone(),
1685 args: vec![
1686 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1687 None::<String>,
1688 "__left__",
1689 )),
1690 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1691 None::<String>,
1692 "__right__",
1693 )),
1694 ],
1695 },
1696 );
1697 let result = self.plan_udf_physical_expr(
1698 &udf_expr,
1699 &[("__left__", left_type), ("__right__", right_type)],
1700 vec![left, right],
1701 udf_name,
1702 )?;
1703 Ok(Some(result))
1704 }
1705
1706 fn compile_unary_op(
1707 &self,
1708 op: &UnaryOp,
1709 expr: Arc<dyn PhysicalExpr>,
1710 input_schema: &Schema,
1711 ) -> Result<Arc<dyn PhysicalExpr>> {
1712 match op {
1713 UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1714 UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1715 }
1716 .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1717 }
1718}
1719
1720fn extract_source_variable(expr: &Expr) -> Option<String> {
1728 match expr {
1729 Expr::Property(inner, _) => extract_source_variable(inner),
1730 Expr::Variable(name) => Some(name.clone()),
1731 Expr::List(items) => items.first().and_then(extract_source_variable),
1732 _ => None,
1733 }
1734}
1735
1736fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1740 match expr {
1741 Expr::Property(_, prop) => vec![Some(prop.clone())],
1742 Expr::List(items) => items
1743 .iter()
1744 .map(|item| {
1745 if let Expr::Property(_, prop) = item {
1746 Some(prop.clone())
1747 } else {
1748 None
1749 }
1750 })
1751 .collect(),
1752 _ => vec![None],
1753 }
1754}
1755
1756fn normalize_similar_to_args<'e>(
1761 sources: &'e Expr,
1762 queries: &'e Expr,
1763) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1764 match (sources, queries) {
1765 (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1767 (src_items.iter().collect(), qry_items.iter().collect())
1768 }
1769 (Expr::List(src_items), _) if src_items.len() > 1 => {
1771 let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1772 (src_items.iter().collect(), queries_broadcast)
1773 }
1774 _ => (vec![sources], vec![queries]),
1776 }
1777}
1778
1779use datafusion::physical_plan::DisplayAs;
1780use datafusion::physical_plan::DisplayFormatType;
1781
1782#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1783enum StringOp {
1784 StartsWith,
1785 EndsWith,
1786 Contains,
1787}
1788
1789impl std::fmt::Display for StringOp {
1790 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1791 match self {
1792 StringOp::StartsWith => write!(f, "STARTS WITH"),
1793 StringOp::EndsWith => write!(f, "ENDS WITH"),
1794 StringOp::Contains => write!(f, "CONTAINS"),
1795 }
1796 }
1797}
1798
1799#[derive(Debug, Eq)]
1800struct CypherStringMatchExpr {
1801 left: Arc<dyn PhysicalExpr>,
1802 right: Arc<dyn PhysicalExpr>,
1803 op: StringOp,
1804}
1805
1806impl PartialEq for CypherStringMatchExpr {
1807 fn eq(&self, other: &Self) -> bool {
1808 self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1809 }
1810}
1811
1812impl std::hash::Hash for CypherStringMatchExpr {
1813 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1814 self.op.hash(state);
1815 self.left.hash(state);
1816 self.right.hash(state);
1817 }
1818}
1819
1820impl CypherStringMatchExpr {
1821 fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1822 Self { left, right, op }
1823 }
1824}
1825
1826impl std::fmt::Display for CypherStringMatchExpr {
1827 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828 write!(f, "{} {} {}", self.left, self.op, self.right)
1829 }
1830}
1831
1832impl DisplayAs for CypherStringMatchExpr {
1833 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1834 write!(f, "{}", self)
1835 }
1836}
1837
1838impl PhysicalExpr for CypherStringMatchExpr {
1839 fn as_any(&self) -> &dyn std::any::Any {
1840 self
1841 }
1842
1843 fn data_type(
1844 &self,
1845 _input_schema: &Schema,
1846 ) -> datafusion::error::Result<arrow_schema::DataType> {
1847 Ok(arrow_schema::DataType::Boolean)
1848 }
1849
1850 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1851 Ok(true)
1852 }
1853
1854 fn evaluate(
1855 &self,
1856 batch: &arrow_array::RecordBatch,
1857 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1858 use crate::query::df_udfs::invoke_cypher_string_op;
1859 use arrow_schema::Field;
1860 use datafusion::config::ConfigOptions;
1861 use datafusion::logical_expr::ScalarFunctionArgs;
1862
1863 let left_val = self.left.evaluate(batch)?;
1864 let right_val = self.right.evaluate(batch)?;
1865
1866 let args = ScalarFunctionArgs {
1867 args: vec![left_val, right_val],
1868 number_rows: batch.num_rows(),
1869 return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1870 config_options: Arc::new(ConfigOptions::default()),
1871 arg_fields: vec![], };
1873
1874 match self.op {
1875 StringOp::StartsWith => {
1876 invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1877 }
1878 StringOp::EndsWith => {
1879 invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1880 }
1881 StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1882 }
1883 }
1884
1885 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1886 vec![&self.left, &self.right]
1887 }
1888
1889 fn with_new_children(
1890 self: Arc<Self>,
1891 children: Vec<Arc<dyn PhysicalExpr>>,
1892 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1893 Ok(Arc::new(CypherStringMatchExpr::new(
1894 children[0].clone(),
1895 children[1].clone(),
1896 self.op,
1897 )))
1898 }
1899
1900 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1901 write!(f, "{}", self)
1902 }
1903}
1904
1905impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1906 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1907 if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1908 self == other
1909 } else {
1910 false
1911 }
1912 }
1913}
1914
1915#[derive(Debug, Eq)]
1920struct StructFieldAccessExpr {
1921 input: Arc<dyn PhysicalExpr>,
1923 field_idx: usize,
1925 output_type: arrow_schema::DataType,
1927}
1928
1929impl PartialEq for StructFieldAccessExpr {
1930 fn eq(&self, other: &Self) -> bool {
1931 self.field_idx == other.field_idx
1932 && self.input.eq(&other.input)
1933 && self.output_type == other.output_type
1934 }
1935}
1936
1937impl std::hash::Hash for StructFieldAccessExpr {
1938 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1939 self.input.hash(state);
1940 self.field_idx.hash(state);
1941 }
1942}
1943
1944impl StructFieldAccessExpr {
1945 fn new(
1946 input: Arc<dyn PhysicalExpr>,
1947 field_idx: usize,
1948 output_type: arrow_schema::DataType,
1949 ) -> Self {
1950 Self {
1951 input,
1952 field_idx,
1953 output_type,
1954 }
1955 }
1956}
1957
1958impl std::fmt::Display for StructFieldAccessExpr {
1959 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1960 write!(f, "{}[{}]", self.input, self.field_idx)
1961 }
1962}
1963
1964impl DisplayAs for StructFieldAccessExpr {
1965 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1966 write!(f, "{}", self)
1967 }
1968}
1969
1970impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1971 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1972 if let Some(other) = other.as_any().downcast_ref::<Self>() {
1973 self.field_idx == other.field_idx && self.input.eq(&other.input)
1974 } else {
1975 false
1976 }
1977 }
1978}
1979
1980impl PhysicalExpr for StructFieldAccessExpr {
1981 fn as_any(&self) -> &dyn std::any::Any {
1982 self
1983 }
1984
1985 fn data_type(
1986 &self,
1987 _input_schema: &Schema,
1988 ) -> datafusion::error::Result<arrow_schema::DataType> {
1989 Ok(self.output_type.clone())
1990 }
1991
1992 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1993 Ok(true)
1994 }
1995
1996 fn evaluate(
1997 &self,
1998 batch: &arrow_array::RecordBatch,
1999 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2000 use arrow_array::StructArray;
2001
2002 let input_val = self.input.evaluate(batch)?;
2003 let array = input_val.into_array(batch.num_rows())?;
2004
2005 let struct_array = array
2006 .as_any()
2007 .downcast_ref::<StructArray>()
2008 .ok_or_else(|| {
2009 datafusion::error::DataFusionError::Execution(
2010 "StructFieldAccessExpr: input is not a StructArray".to_string(),
2011 )
2012 })?;
2013
2014 let field_col = struct_array.column(self.field_idx).clone();
2015 Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2016 }
2017
2018 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2019 vec![&self.input]
2020 }
2021
2022 fn with_new_children(
2023 self: Arc<Self>,
2024 children: Vec<Arc<dyn PhysicalExpr>>,
2025 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2026 Ok(Arc::new(StructFieldAccessExpr::new(
2027 children[0].clone(),
2028 self.field_idx,
2029 self.output_type.clone(),
2030 )))
2031 }
2032
2033 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2034 write!(f, "{}", self)
2035 }
2036}
2037
2038struct ExistsExecExpr {
2051 query: Query,
2052 graph_ctx: Arc<GraphExecutionContext>,
2053 session_ctx: Arc<RwLock<SessionContext>>,
2054 storage: Arc<StorageManager>,
2055 uni_schema: Arc<UniSchema>,
2056 params: HashMap<String, Value>,
2057}
2058
2059impl std::fmt::Debug for ExistsExecExpr {
2060 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061 f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2062 }
2063}
2064
2065impl ExistsExecExpr {
2066 fn new(
2067 query: Query,
2068 graph_ctx: Arc<GraphExecutionContext>,
2069 session_ctx: Arc<RwLock<SessionContext>>,
2070 storage: Arc<StorageManager>,
2071 uni_schema: Arc<UniSchema>,
2072 params: HashMap<String, Value>,
2073 ) -> Self {
2074 Self {
2075 query,
2076 graph_ctx,
2077 session_ctx,
2078 storage,
2079 uni_schema,
2080 params,
2081 }
2082 }
2083}
2084
2085impl std::fmt::Display for ExistsExecExpr {
2086 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2087 write!(f, "EXISTS(<subquery>)")
2088 }
2089}
2090
2091impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2092 fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2093 false
2094 }
2095}
2096
2097impl PartialEq for ExistsExecExpr {
2098 fn eq(&self, _other: &Self) -> bool {
2099 false
2100 }
2101}
2102
2103impl Eq for ExistsExecExpr {}
2104
2105impl std::hash::Hash for ExistsExecExpr {
2106 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2107 "ExistsExecExpr".hash(state);
2108 }
2109}
2110
2111impl DisplayAs for ExistsExecExpr {
2112 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2113 write!(f, "{}", self)
2114 }
2115}
2116
2117impl PhysicalExpr for ExistsExecExpr {
2118 fn as_any(&self) -> &dyn std::any::Any {
2119 self
2120 }
2121
2122 fn data_type(
2123 &self,
2124 _input_schema: &Schema,
2125 ) -> datafusion::error::Result<arrow_schema::DataType> {
2126 Ok(DataType::Boolean)
2127 }
2128
2129 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2130 Ok(true)
2131 }
2132
2133 fn evaluate(
2134 &self,
2135 batch: &arrow_array::RecordBatch,
2136 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2137 let num_rows = batch.num_rows();
2138 let mut builder = BooleanBuilder::with_capacity(num_rows);
2139
2140 let schema = batch.schema();
2151 let mut entity_vars: HashSet<String> = HashSet::new();
2152 for field in schema.fields() {
2153 let name = field.name();
2154 if let Some(base) = name.strip_suffix("._vid") {
2155 entity_vars.insert(base.to_string());
2156 }
2157 if matches!(field.data_type(), DataType::Struct(_)) {
2158 entity_vars.insert(name.to_string());
2159 }
2160 if !name.contains('.')
2164 && !name.starts_with('_')
2165 && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2166 {
2167 entity_vars.insert(name.to_string());
2168 }
2169 }
2170 let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2171
2172 let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2175
2176 let planner = QueryPlanner::new(self.uni_schema.clone());
2178 let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2179 Ok(plan) => plan,
2180 Err(e) => {
2181 return Err(datafusion::error::DataFusionError::Execution(format!(
2182 "EXISTS subquery planning failed: {}",
2183 e
2184 )));
2185 }
2186 };
2187
2188 let graph_ctx = self.graph_ctx.clone();
2191 let session_ctx = self.session_ctx.clone();
2192 let storage = self.storage.clone();
2193 let uni_schema = self.uni_schema.clone();
2194 let base_params = self.params.clone();
2195
2196 let result = std::thread::scope(|s| {
2197 s.spawn(|| {
2198 let rt = tokio::runtime::Builder::new_current_thread()
2199 .enable_all()
2200 .build()
2201 .map_err(|e| {
2202 datafusion::error::DataFusionError::Execution(format!(
2203 "Failed to create runtime for EXISTS: {}",
2204 e
2205 ))
2206 })?;
2207
2208 for row_idx in 0..num_rows {
2209 let row_params = extract_row_params(batch, row_idx);
2210 let mut sub_params = base_params.clone();
2211 sub_params.extend(row_params);
2212
2213 for var in &entity_vars {
2216 let vid_key = format!("{}._vid", var);
2217 if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2218 sub_params.insert(var.clone(), vid_val);
2219 }
2220 }
2221
2222 let batches = rt.block_on(execute_subplan(
2223 &logical_plan,
2224 &sub_params,
2225 &HashMap::new(), &graph_ctx,
2227 &session_ctx,
2228 &storage,
2229 &uni_schema,
2230 ))?;
2231
2232 let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2233 builder.append_value(has_rows);
2234 }
2235
2236 Ok::<_, datafusion::error::DataFusionError>(())
2237 })
2238 .join()
2239 .unwrap_or_else(|_| {
2240 Err(datafusion::error::DataFusionError::Execution(
2241 "EXISTS subquery thread panicked".to_string(),
2242 ))
2243 })
2244 });
2245
2246 if let Err(e) = result {
2247 return Err(datafusion::error::DataFusionError::Execution(format!(
2248 "EXISTS subquery execution failed: {}",
2249 e
2250 )));
2251 }
2252
2253 Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2254 builder.finish(),
2255 )))
2256 }
2257
2258 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2259 vec![]
2260 }
2261
2262 fn with_new_children(
2263 self: Arc<Self>,
2264 children: Vec<Arc<dyn PhysicalExpr>>,
2265 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2266 if !children.is_empty() {
2267 return Err(datafusion::error::DataFusionError::Plan(
2268 "ExistsExecExpr has no children".to_string(),
2269 ));
2270 }
2271 Ok(self)
2272 }
2273
2274 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2275 write!(f, "{}", self)
2276 }
2277}
2278
2279fn has_mutation_clause(query: &Query) -> bool {
2286 match query {
2287 Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2288 matches!(
2289 c,
2290 Clause::Create(_)
2291 | Clause::Delete(_)
2292 | Clause::Set(_)
2293 | Clause::Remove(_)
2294 | Clause::Merge(_)
2295 ) || has_mutation_in_clause_exprs(c)
2296 }),
2297 Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2298 _ => false,
2299 }
2300}
2301
2302fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2304 let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2305
2306 match clause {
2307 Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2308 Clause::With(w) => {
2309 w.where_clause.as_ref().is_some_and(check_expr)
2310 || w.items.iter().any(|item| match item {
2311 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2312 ReturnItem::All => false,
2313 })
2314 }
2315 Clause::Return(r) => r.items.iter().any(|item| match item {
2316 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2317 ReturnItem::All => false,
2318 }),
2319 _ => false,
2320 }
2321}
2322
2323fn has_mutation_in_expr(expr: &Expr) -> bool {
2325 match expr {
2326 Expr::Exists { query, .. } => has_mutation_clause(query),
2327 _ => {
2328 let mut found = false;
2329 expr.for_each_child(&mut |child| {
2330 if has_mutation_in_expr(child) {
2331 found = true;
2332 }
2333 });
2334 found
2335 }
2336 }
2337}
2338
2339fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2345 match query {
2346 Query::Single(stmt) => Query::Single(Statement {
2347 clauses: stmt
2348 .clauses
2349 .iter()
2350 .map(|c| rewrite_clause_correlated(c, outer_vars))
2351 .collect(),
2352 }),
2353 Query::Union { left, right, all } => Query::Union {
2354 left: Box::new(rewrite_query_correlated(left, outer_vars)),
2355 right: Box::new(rewrite_query_correlated(right, outer_vars)),
2356 all: *all,
2357 },
2358 other => other.clone(),
2359 }
2360}
2361
2362fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2364 match clause {
2365 Clause::Match(m) => Clause::Match(MatchClause {
2366 optional: m.optional,
2367 pattern: m.pattern.clone(),
2368 where_clause: m
2369 .where_clause
2370 .as_ref()
2371 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2372 }),
2373 Clause::With(w) => Clause::With(WithClause {
2374 distinct: w.distinct,
2375 items: w
2376 .items
2377 .iter()
2378 .map(|item| rewrite_return_item(item, outer_vars))
2379 .collect(),
2380 order_by: w.order_by.as_ref().map(|items| {
2381 items
2382 .iter()
2383 .map(|si| SortItem {
2384 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2385 ascending: si.ascending,
2386 })
2387 .collect()
2388 }),
2389 skip: w
2390 .skip
2391 .as_ref()
2392 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2393 limit: w
2394 .limit
2395 .as_ref()
2396 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2397 where_clause: w
2398 .where_clause
2399 .as_ref()
2400 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2401 }),
2402 Clause::Return(r) => Clause::Return(ReturnClause {
2403 distinct: r.distinct,
2404 items: r
2405 .items
2406 .iter()
2407 .map(|item| rewrite_return_item(item, outer_vars))
2408 .collect(),
2409 order_by: r.order_by.as_ref().map(|items| {
2410 items
2411 .iter()
2412 .map(|si| SortItem {
2413 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2414 ascending: si.ascending,
2415 })
2416 .collect()
2417 }),
2418 skip: r
2419 .skip
2420 .as_ref()
2421 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2422 limit: r
2423 .limit
2424 .as_ref()
2425 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2426 }),
2427 Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2428 expr: rewrite_expr_correlated(&u.expr, outer_vars),
2429 variable: u.variable.clone(),
2430 }),
2431 other => other.clone(),
2432 }
2433}
2434
2435fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2436 match item {
2437 ReturnItem::All => ReturnItem::All,
2438 ReturnItem::Expr {
2439 expr,
2440 alias,
2441 source_text,
2442 } => ReturnItem::Expr {
2443 expr: rewrite_expr_correlated(expr, outer_vars),
2444 alias: alias.clone(),
2445 source_text: source_text.clone(),
2446 },
2447 }
2448}
2449
2450fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2453 match expr {
2454 Expr::Property(base, key) => {
2456 if let Expr::Variable(v) = base.as_ref()
2457 && outer_vars.contains(v)
2458 {
2459 return Expr::Parameter(format!("{}.{}", v, key));
2460 }
2461 Expr::Property(
2462 Box::new(rewrite_expr_correlated(base, outer_vars)),
2463 key.clone(),
2464 )
2465 }
2466 Expr::Exists {
2468 query,
2469 from_pattern_predicate,
2470 } => Expr::Exists {
2471 query: Box::new(rewrite_query_correlated(query, outer_vars)),
2472 from_pattern_predicate: *from_pattern_predicate,
2473 },
2474 Expr::CountSubquery(query) => {
2476 Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2477 }
2478 Expr::CollectSubquery(query) => {
2479 Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2480 }
2481 other => other
2483 .clone()
2484 .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2485 }
2486}
2487
2488fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2492 for idx in &schema.indexes {
2493 if let IndexDefinition::Vector(config) = idx
2494 && config.property == property
2495 {
2496 return Some(config.metric.clone());
2497 }
2498 }
2499 None
2500}