1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9 PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
14use crate::query::planner::QueryPlanner;
15use crate::query::similar_to::SimilarToError;
16use anyhow::{Result, anyhow};
17use arrow_array::builder::BooleanBuilder;
18use arrow_schema::{DataType, Field, Schema};
19use datafusion::execution::context::SessionState;
20use datafusion::physical_expr::expressions::binary;
21use datafusion::physical_plan::PhysicalExpr;
22use datafusion::physical_planner::PhysicalPlanner;
23use datafusion::prelude::SessionContext;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27use uni_common::Value;
28use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
29use uni_cypher::ast::{
30 BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
31 Statement, UnaryOp, UnwindClause, WithClause,
32};
33use uni_store::storage::manager::StorageManager;
34
35fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
38 dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
39}
40
41fn resolve_list_element_type(
51 list_data_type: &DataType,
52 large_binary_fallback: DataType,
53 context: &str,
54) -> Result<DataType> {
55 match list_data_type {
56 DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
57 DataType::Null => Ok(DataType::Null),
58 DataType::LargeBinary => Ok(large_binary_fallback),
59 _ => Err(anyhow!(
60 "{} input must be a list, got {:?}",
61 context,
62 list_data_type
63 )),
64 }
65}
66
67#[derive(Debug)]
72struct LargeListToCypherValueExpr {
73 child: Arc<dyn PhysicalExpr>,
74}
75
76impl LargeListToCypherValueExpr {
77 fn new(child: Arc<dyn PhysicalExpr>) -> Self {
78 Self { child }
79 }
80}
81
82impl std::fmt::Display for LargeListToCypherValueExpr {
83 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
84 write!(f, "LargeListToCypherValue({})", self.child)
85 }
86}
87
88impl PartialEq for LargeListToCypherValueExpr {
89 fn eq(&self, other: &Self) -> bool {
90 Arc::ptr_eq(&self.child, &other.child)
91 }
92}
93
94impl Eq for LargeListToCypherValueExpr {}
95
96impl std::hash::Hash for LargeListToCypherValueExpr {
97 fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
98 std::any::type_name::<Self>().hash(_state);
100 }
101}
102
103impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
104 fn eq(&self, other: &dyn std::any::Any) -> bool {
105 other
106 .downcast_ref::<Self>()
107 .map(|x| self == x)
108 .unwrap_or(false)
109 }
110}
111
112impl PhysicalExpr for LargeListToCypherValueExpr {
113 fn as_any(&self) -> &dyn std::any::Any {
114 self
115 }
116
117 fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
118 Ok(DataType::LargeBinary)
119 }
120
121 fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
122 self.child.nullable(input_schema)
123 }
124
125 fn evaluate(
126 &self,
127 batch: &arrow_array::RecordBatch,
128 ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
129 use datafusion::arrow::compute::cast;
130 use datafusion::logical_expr::ColumnarValue;
131
132 let child_result = self.child.evaluate(batch)?;
133 let child_array = child_result.into_array(batch.num_rows())?;
134
135 let list_array = if let DataType::List(field) = child_array.data_type() {
137 let target_type = DataType::LargeList(field.clone());
138 cast(&child_array, &target_type).map_err(|e| {
139 datafusion::error::DataFusionError::Execution(format!(
140 "List to LargeList cast failed: {e}"
141 ))
142 })?
143 } else {
144 child_array.clone()
145 };
146
147 if list_array.data_type() == &DataType::LargeBinary {
149 return Ok(ColumnarValue::Array(list_array));
150 }
151
152 if let Some(large_list) = list_array
154 .as_any()
155 .downcast_ref::<datafusion::arrow::array::LargeListArray>()
156 {
157 let cv_array =
158 crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
159 Ok(ColumnarValue::Array(cv_array))
160 } else {
161 Err(datafusion::error::DataFusionError::Execution(format!(
162 "Expected List or LargeList, got {:?}",
163 list_array.data_type()
164 )))
165 }
166 }
167
168 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
169 vec![&self.child]
170 }
171
172 fn with_new_children(
173 self: Arc<Self>,
174 children: Vec<Arc<dyn PhysicalExpr>>,
175 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
176 if children.len() != 1 {
177 return Err(datafusion::error::DataFusionError::Execution(
178 "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
179 ));
180 }
181 Ok(Arc::new(LargeListToCypherValueExpr::new(
182 children[0].clone(),
183 )))
184 }
185
186 fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
187 write!(f, "LargeListToCypherValue({})", self.child)
188 }
189}
190
191pub struct CypherPhysicalExprCompiler<'a> {
193 state: &'a SessionState,
194 translation_ctx: Option<&'a TranslationContext>,
195 graph_ctx: Option<Arc<GraphExecutionContext>>,
196 uni_schema: Option<Arc<UniSchema>>,
197 session_ctx: Option<Arc<RwLock<SessionContext>>>,
199 storage: Option<Arc<StorageManager>>,
201 params: HashMap<String, Value>,
203}
204
205impl<'a> CypherPhysicalExprCompiler<'a> {
206 pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
207 Self {
208 state,
209 translation_ctx,
210 graph_ctx: None,
211 uni_schema: None,
212 session_ctx: None,
213 storage: None,
214 params: HashMap::new(),
215 }
216 }
217
218 fn scoped_compiler<'b>(
230 &'b self,
231 exclude_vars: &[&str],
232 scoped_ctx_slot: &'b mut Option<TranslationContext>,
233 ) -> CypherPhysicalExprCompiler<'b>
234 where
235 'a: 'b,
236 {
237 let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
238 exclude_vars
239 .iter()
240 .any(|v| ctx.variable_kinds.contains_key(*v))
241 });
242
243 let ctx_ref = if needs_scoping {
244 let ctx = self.translation_ctx.unwrap();
245 let mut new_kinds = ctx.variable_kinds.clone();
246 for v in exclude_vars {
247 new_kinds.remove(*v);
248 }
249 *scoped_ctx_slot = Some(TranslationContext {
250 parameters: ctx.parameters.clone(),
251 outer_values: ctx.outer_values.clone(),
252 variable_labels: ctx.variable_labels.clone(),
253 variable_kinds: new_kinds,
254 node_variable_hints: ctx.node_variable_hints.clone(),
255 mutation_edge_hints: ctx.mutation_edge_hints.clone(),
256 statement_time: ctx.statement_time,
257 });
258 scoped_ctx_slot.as_ref()
259 } else {
260 self.translation_ctx
261 };
262
263 CypherPhysicalExprCompiler {
264 state: self.state,
265 translation_ctx: ctx_ref,
266 graph_ctx: self.graph_ctx.clone(),
267 uni_schema: self.uni_schema.clone(),
268 session_ctx: self.session_ctx.clone(),
269 storage: self.storage.clone(),
270 params: self.params.clone(),
271 }
272 }
273
274 pub fn with_graph_ctx(
276 mut self,
277 graph_ctx: Arc<GraphExecutionContext>,
278 uni_schema: Arc<UniSchema>,
279 ) -> Self {
280 self.graph_ctx = Some(graph_ctx);
281 self.uni_schema = Some(uni_schema);
282 self
283 }
284
285 pub fn with_subquery_ctx(
287 mut self,
288 graph_ctx: Arc<GraphExecutionContext>,
289 uni_schema: Arc<UniSchema>,
290 session_ctx: Arc<RwLock<SessionContext>>,
291 storage: Arc<StorageManager>,
292 params: HashMap<String, Value>,
293 ) -> Self {
294 self.graph_ctx = Some(graph_ctx);
295 self.uni_schema = Some(uni_schema);
296 self.session_ctx = Some(session_ctx);
297 self.storage = Some(storage);
298 self.params = params;
299 self
300 }
301
302 pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
304 match expr {
305 Expr::ListComprehension {
306 variable,
307 list,
308 where_clause,
309 map_expr,
310 } => self.compile_list_comprehension(
311 variable,
312 list,
313 where_clause.as_deref(),
314 map_expr,
315 input_schema,
316 ),
317 Expr::Quantifier {
318 quantifier,
319 variable,
320 list,
321 predicate,
322 } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
323 Expr::Reduce {
324 accumulator,
325 init,
326 variable,
327 list,
328 expr: expression,
329 } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
330 Expr::BinaryOp { left, op, right } => {
332 self.compile_binary_op_dispatch(left, op, right, input_schema)
333 }
334 Expr::UnaryOp { op, expr: inner } => {
335 if matches!(op, UnaryOp::Not) {
336 let mut inner_phy = self.compile(inner, input_schema)?;
337 if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
338 inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
339 }
340 self.compile_unary_op(op, inner_phy, input_schema)
341 } else if Self::contains_custom_expr(inner) {
342 let inner_phy = self.compile(inner, input_schema)?;
343 self.compile_unary_op(op, inner_phy, input_schema)
344 } else {
345 self.compile_standard(expr, input_schema)
346 }
347 }
348 Expr::IsNull(inner) => {
349 if Self::contains_custom_expr(inner) {
350 let inner_phy = self.compile(inner, input_schema)?;
351 Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
352 .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
353 } else {
354 self.compile_standard(expr, input_schema)
355 }
356 }
357 Expr::IsNotNull(inner) => {
358 if Self::contains_custom_expr(inner) {
359 let inner_phy = self.compile(inner, input_schema)?;
360 Ok(
361 datafusion::physical_expr::expressions::is_not_null(inner_phy)
362 .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
363 )
364 } else {
365 self.compile_standard(expr, input_schema)
366 }
367 }
368 Expr::In {
370 expr: left,
371 list: right,
372 } => {
373 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
374 let left_phy = self.compile(left, input_schema)?;
375 let right_phy = self.compile(right, input_schema)?;
376
377 let left_type = left_phy
378 .data_type(input_schema)
379 .unwrap_or(DataType::LargeBinary);
380 let right_type = right_phy
381 .data_type(input_schema)
382 .unwrap_or(DataType::LargeBinary);
383
384 self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
385 .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
386 } else {
387 self.compile_standard(expr, input_schema)
388 }
389 }
390
391 Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
393 "List literals containing comprehensions not yet supported in compiler"
394 )),
395 Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
396 Err(anyhow!(
397 "Map literals containing comprehensions not yet supported in compiler"
398 ))
399 }
400
401 Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
403
404 Expr::ArrayIndex { array, index } => {
406 self.compile_array_index(array, index, input_schema)
407 }
408
409 Expr::PatternComprehension {
411 path_variable,
412 pattern,
413 where_clause,
414 map_expr,
415 } => self.compile_pattern_comprehension(
416 path_variable,
417 pattern,
418 where_clause.as_deref(),
419 map_expr,
420 input_schema,
421 ),
422
423 Expr::Exists { query, .. } => self.compile_exists(query),
425
426 Expr::FunctionCall {
428 name,
429 args,
430 distinct,
431 ..
432 } => {
433 if name.eq_ignore_ascii_case("similar_to") {
434 return self.compile_similar_to(args, input_schema);
435 }
436 if args.iter().any(Self::contains_custom_expr) {
437 self.compile_function_with_custom_args(name, args, *distinct, input_schema)
438 } else {
439 self.compile_standard(expr, input_schema)
440 }
441 }
442
443 Expr::Case {
445 expr: case_operand,
446 when_then,
447 else_expr,
448 } => {
449 let has_custom = case_operand
451 .as_deref()
452 .is_some_and(Self::contains_custom_expr)
453 || when_then.iter().any(|(w, t)| {
454 Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
455 })
456 || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
457
458 if has_custom {
459 self.compile_case(case_operand, when_then, else_expr, input_schema)
462 } else {
463 self.compile_standard(expr, input_schema)
468 }
469 }
470
471 Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
473
474 _ => self.compile_standard(expr, input_schema),
476 }
477 }
478
479 fn compile_binary_op_dispatch(
481 &self,
482 left: &Expr,
483 op: &BinaryOp,
484 right: &Expr,
485 input_schema: &Schema,
486 ) -> Result<Arc<dyn PhysicalExpr>> {
487 if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
488 && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
489 && let Some(ctx) = self.translation_ctx
490 && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
491 {
492 let identity_prop = match (lk, rk) {
493 (VariableKind::Node, VariableKind::Node) => Some("_vid"),
494 (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
495 _ => None,
496 };
497
498 if let Some(id_prop) = identity_prop {
499 return self.compile_standard(
500 &Expr::BinaryOp {
501 left: Box::new(Expr::Property(
502 Box::new(Expr::Variable(lv.clone())),
503 id_prop.to_string(),
504 )),
505 op: *op,
506 right: Box::new(Expr::Property(
507 Box::new(Expr::Variable(rv.clone())),
508 id_prop.to_string(),
509 )),
510 },
511 input_schema,
512 );
513 }
514 }
515
516 if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
520 return self.compile_standard(
521 &Expr::BinaryOp {
522 left: Box::new(left.clone()),
523 op: *op,
524 right: Box::new(right.clone()),
525 },
526 input_schema,
527 );
528 }
529
530 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
531 let left_phy = self.compile(left, input_schema)?;
532 let right_phy = self.compile(right, input_schema)?;
533 return self.compile_binary_op(op, left_phy, right_phy, input_schema);
534 }
535
536 if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
540 {
541 return self.compile_standard(
542 &Expr::BinaryOp {
543 left: Box::new(left.clone()),
544 op: *op,
545 right: Box::new(right.clone()),
546 },
547 input_schema,
548 );
549 }
550
551 let left_phy = self.compile(left, input_schema)?;
556 let right_phy = self.compile(right, input_schema)?;
557 let left_dt = left_phy.data_type(input_schema).ok();
558 let right_dt = right_phy.data_type(input_schema).ok();
559 let has_cv =
560 is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
561
562 if has_cv {
563 self.compile_binary_op(op, left_phy, right_phy, input_schema)
565 } else {
566 self.compile_standard(
569 &Expr::BinaryOp {
570 left: Box::new(left.clone()),
571 op: *op,
572 right: Box::new(right.clone()),
573 },
574 input_schema,
575 )
576 }
577 }
578
579 fn try_compile_struct_field(
584 &self,
585 var_name: &str,
586 field_name: &str,
587 input_schema: &Schema,
588 ) -> Option<Arc<dyn PhysicalExpr>> {
589 let col_idx = input_schema.index_of(var_name).ok()?;
590 let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
591 return None;
592 };
593
594 if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
596 let output_type = struct_fields[field_idx].data_type().clone();
597 let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
598 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
599 );
600 Some(Arc::new(StructFieldAccessExpr::new(
601 col_expr,
602 field_idx,
603 output_type,
604 )))
605 } else {
606 Some(Arc::new(
607 datafusion::physical_expr::expressions::Literal::new(
608 datafusion::common::ScalarValue::Null,
609 ),
610 ))
611 }
612 }
613
614 fn compile_property_access(
616 &self,
617 base: &Expr,
618 prop: &str,
619 input_schema: &Schema,
620 ) -> Result<Arc<dyn PhysicalExpr>> {
621 if let Expr::Variable(var_name) = base {
622 if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
624 return Ok(expr);
625 }
626 let flat_col = format!("{}.{}", var_name, prop);
628 if let Ok(col_idx) = input_schema.index_of(&flat_col) {
629 return Ok(Arc::new(
630 datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
631 ));
632 }
633 }
634 self.compile_standard(
635 &Expr::Property(Box::new(base.clone()), prop.to_string()),
636 input_schema,
637 )
638 }
639
640 fn compile_array_index(
642 &self,
643 array: &Expr,
644 index: &Expr,
645 input_schema: &Schema,
646 ) -> Result<Arc<dyn PhysicalExpr>> {
647 if let Expr::Variable(var_name) = array
648 && let Expr::Literal(CypherLiteral::String(prop)) = index
649 && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
650 {
651 return Ok(expr);
652 }
653 self.compile_standard(
654 &Expr::ArrayIndex {
655 array: Box::new(array.clone()),
656 index: Box::new(index.clone()),
657 },
658 input_schema,
659 )
660 }
661
662 fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
664 if has_mutation_clause(query) {
666 return Err(anyhow!(
667 "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
668 ));
669 }
670
671 let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
672
673 let graph_ctx = self
674 .graph_ctx
675 .clone()
676 .ok_or_else(|| err("GraphExecutionContext"))?;
677 let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
678 let session_ctx = self
679 .session_ctx
680 .clone()
681 .ok_or_else(|| err("SessionContext"))?;
682 let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
683
684 Ok(Arc::new(ExistsExecExpr::new(
685 query.clone(),
686 graph_ctx,
687 session_ctx,
688 storage,
689 uni_schema,
690 self.params.clone(),
691 )))
692 }
693
694 fn compile_similar_to(
696 &self,
697 args: &[Expr],
698 input_schema: &Schema,
699 ) -> Result<Arc<dyn PhysicalExpr>> {
700 if args.len() < 2 || args.len() > 3 {
701 return Err(SimilarToError::InvalidArity { count: args.len() }.into());
702 }
703
704 let graph_ctx = self
705 .graph_ctx
706 .clone()
707 .ok_or(SimilarToError::NoGraphContext)?;
708
709 let source_variable = extract_source_variable(&args[0]);
711 let source_property_names = extract_property_names(&args[0]);
712
713 let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
716
717 let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
719 .iter()
720 .map(|e| self.compile(e, input_schema))
721 .collect::<Result<Vec<_>>>()?;
722 let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
723 .iter()
724 .map(|e| self.compile(e, input_schema))
725 .collect::<Result<Vec<_>>>()?;
726
727 let options_child = if args.len() == 3 {
729 Some(self.compile(&args[2], input_schema)?)
730 } else {
731 None
732 };
733
734 let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
736 .iter()
737 .map(|prop_name| {
738 prop_name.as_ref().and_then(|prop| {
739 self.uni_schema
740 .as_ref()
741 .and_then(|schema| resolve_metric_for_property(schema, prop))
742 })
743 })
744 .collect();
745
746 Ok(Arc::new(SimilarToExecExpr::new(
747 source_children,
748 query_children,
749 options_child,
750 graph_ctx,
751 source_variable,
752 source_property_names,
753 source_metrics,
754 )))
755 }
756
757 fn needs_vid_extraction_for_variable(
759 variable: &str,
760 map_expr: &Expr,
761 where_clause: Option<&Expr>,
762 ) -> bool {
763 fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
764 match expr {
765 Expr::PatternComprehension { pattern, .. } => {
766 pattern.paths.iter().any(|path| {
768 path.elements.iter().any(|elem| match elem {
769 uni_cypher::ast::PatternElement::Node(n) => {
770 n.variable.as_deref() == Some(var)
771 }
772 uni_cypher::ast::PatternElement::Relationship(r) => {
773 r.variable.as_deref() == Some(var)
774 }
775 _ => false,
776 })
777 })
778 }
779 Expr::FunctionCall { args, .. } => args
780 .iter()
781 .any(|a| expr_has_pattern_comp_referencing(a, var)),
782 Expr::BinaryOp { left, right, .. } => {
783 expr_has_pattern_comp_referencing(left, var)
784 || expr_has_pattern_comp_referencing(right, var)
785 }
786 Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
787 expr_has_pattern_comp_referencing(e, var)
788 }
789 Expr::List(items) => items
790 .iter()
791 .any(|i| expr_has_pattern_comp_referencing(i, var)),
792 Expr::ListComprehension {
793 list,
794 map_expr,
795 where_clause,
796 ..
797 } => {
798 expr_has_pattern_comp_referencing(list, var)
799 || expr_has_pattern_comp_referencing(map_expr, var)
800 || where_clause
801 .as_ref()
802 .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
803 }
804 _ => false,
805 }
806 }
807
808 expr_has_pattern_comp_referencing(map_expr, variable)
809 || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
810 }
811
812 pub fn contains_custom_expr(expr: &Expr) -> bool {
814 match expr {
815 Expr::ListComprehension { .. } => true,
816 Expr::Quantifier { .. } => true,
817 Expr::Reduce { .. } => true,
818 Expr::PatternComprehension { .. } => true,
819 Expr::BinaryOp { left, right, .. } => {
820 Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
821 }
822 Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
823 Expr::FunctionCall { name, args, .. } => {
824 name.eq_ignore_ascii_case("similar_to")
825 || args.iter().any(Self::contains_custom_expr)
826 }
827 Expr::Case {
828 when_then,
829 else_expr,
830 ..
831 } => {
832 when_then
833 .iter()
834 .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
835 || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
836 }
837 Expr::List(items) => items.iter().any(Self::contains_custom_expr),
838 Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
839 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
840 Expr::In { expr: l, list: r } => {
841 Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
842 }
843 Expr::Exists { .. } => true,
844 Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
845 _ => false,
846 }
847 }
848
849 fn is_list_producing(expr: &Expr) -> bool {
852 match expr {
853 Expr::List(_) => true,
854 Expr::ListComprehension { .. } => true,
855 Expr::ArraySlice { .. } => true,
856 Expr::BinaryOp {
858 left,
859 op: BinaryOp::Add,
860 right,
861 } => Self::is_list_producing(left) || Self::is_list_producing(right),
862 Expr::FunctionCall { name, .. } => {
863 matches!(
865 name.as_str(),
866 "range"
867 | "tail"
868 | "reverse"
869 | "collect"
870 | "keys"
871 | "labels"
872 | "nodes"
873 | "relationships"
874 )
875 }
876 _ => false,
877 }
878 }
879
880 fn compile_standard(
881 &self,
882 expr: &Expr,
883 input_schema: &Schema,
884 ) -> Result<Arc<dyn PhysicalExpr>> {
885 let resolved = Self::resolve_flat_column_properties(expr, input_schema);
890 let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
891 let resolved_expr = self.resolve_udfs(df_expr)?;
892
893 let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
894
895 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
897
898 let coerced_expr = self.resolve_udfs(coerced_expr)?;
900
901 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
902 planner
903 .create_physical_expr(&coerced_expr, &df_schema, self.state)
904 .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
905 }
906
907 fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
913 match expr {
914 Expr::Property(base, prop) => {
915 if let Expr::Variable(var) = base.as_ref() {
916 let flat_col = format!("{}.{}", var, prop);
917 if schema.index_of(&flat_col).is_ok() {
918 return Expr::Variable(flat_col);
919 }
920 }
921 Expr::Property(
923 Box::new(Self::resolve_flat_column_properties(base, schema)),
924 prop.clone(),
925 )
926 }
927 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
928 left: Box::new(Self::resolve_flat_column_properties(left, schema)),
929 op: *op,
930 right: Box::new(Self::resolve_flat_column_properties(right, schema)),
931 },
932 Expr::FunctionCall {
933 name,
934 args,
935 distinct,
936 window_spec,
937 } => Expr::FunctionCall {
938 name: name.clone(),
939 args: args
940 .iter()
941 .map(|a| Self::resolve_flat_column_properties(a, schema))
942 .collect(),
943 distinct: *distinct,
944 window_spec: window_spec.clone(),
945 },
946 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
947 op: *op,
948 expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
949 },
950 Expr::List(items) => Expr::List(
951 items
952 .iter()
953 .map(|i| Self::resolve_flat_column_properties(i, schema))
954 .collect(),
955 ),
956 other => other.clone(),
958 }
959 }
960
961 fn resolve_udfs(
966 &self,
967 expr: datafusion::logical_expr::Expr,
968 ) -> Result<datafusion::logical_expr::Expr> {
969 use datafusion::common::tree_node::{Transformed, TreeNode};
970 use datafusion::logical_expr::Expr as DfExpr;
971
972 let result = expr
973 .transform_up(|node| {
974 if let DfExpr::ScalarFunction(ref func) = node {
975 let udf_name = func.func.name();
976 if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
977 return Ok(Transformed::yes(DfExpr::ScalarFunction(
978 datafusion::logical_expr::expr::ScalarFunction {
979 func: registered_udf.clone(),
980 args: func.args.clone(),
981 },
982 )));
983 }
984 }
985 Ok(Transformed::no(node))
986 })
987 .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
988 Ok(result.data)
989 }
990
991 fn compile_list_comprehension(
992 &self,
993 variable: &str,
994 list: &Expr,
995 where_clause: Option<&Expr>,
996 map_expr: &Expr,
997 input_schema: &Schema,
998 ) -> Result<Arc<dyn PhysicalExpr>> {
999 let input_list_phy = self.compile(list, input_schema)?;
1000
1001 let list_data_type = input_list_phy.data_type(input_schema)?;
1003 let inner_data_type = resolve_list_element_type(
1004 &list_data_type,
1005 DataType::LargeBinary,
1006 "List comprehension",
1007 )?;
1008
1009 let mut fields = input_schema.fields().to_vec();
1011 let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1012
1013 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1014 fields[pos] = loop_var_field;
1015 } else {
1016 fields.push(loop_var_field);
1017 }
1018
1019 let needs_vid_extraction =
1021 Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1022 if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1023 let vid_field = Arc::new(Field::new(
1025 format!("{}._vid", variable),
1026 DataType::UInt64,
1027 true,
1028 ));
1029 fields.push(vid_field);
1030 }
1031
1032 let inner_schema = Arc::new(Schema::new(fields));
1033
1034 let mut scoped_ctx = None;
1036 let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1037
1038 let predicate_phy = if let Some(pred) = where_clause {
1039 Some(inner_compiler.compile(pred, &inner_schema)?)
1040 } else {
1041 None
1042 };
1043
1044 let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1045 let output_item_type = map_phy.data_type(&inner_schema)?;
1046
1047 Ok(Arc::new(ListComprehensionExecExpr::new(
1048 input_list_phy,
1049 map_phy,
1050 predicate_phy,
1051 variable.to_string(),
1052 Arc::new(input_schema.clone()),
1053 output_item_type,
1054 needs_vid_extraction,
1055 )))
1056 }
1057
1058 fn compile_reduce(
1059 &self,
1060 accumulator: &str,
1061 initial: &Expr,
1062 variable: &str,
1063 list: &Expr,
1064 reduce_expr: &Expr,
1065 input_schema: &Schema,
1066 ) -> Result<Arc<dyn PhysicalExpr>> {
1067 let list_phy = self.compile(list, input_schema)?;
1068
1069 let initial_phy = self.compile(initial, input_schema)?;
1070 let acc_type = initial_phy.data_type(input_schema)?;
1071
1072 let list_data_type = list_phy.data_type(input_schema)?;
1073 let inner_data_type =
1076 resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1077
1078 let mut fields = input_schema.fields().to_vec();
1080
1081 let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1082 if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1083 fields[pos] = acc_field;
1084 } else {
1085 fields.push(acc_field);
1086 }
1087
1088 let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1089 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1090 fields[pos] = var_field;
1091 } else {
1092 fields.push(var_field);
1093 }
1094
1095 let inner_schema = Arc::new(Schema::new(fields));
1096
1097 let mut scoped_ctx = None;
1099 let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1100
1101 let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1102 let output_type = reduce_phy.data_type(&inner_schema)?;
1103
1104 Ok(Arc::new(ReduceExecExpr::new(
1105 accumulator.to_string(),
1106 initial_phy,
1107 variable.to_string(),
1108 list_phy,
1109 reduce_phy,
1110 Arc::new(input_schema.clone()),
1111 output_type,
1112 )))
1113 }
1114
1115 fn compile_quantifier(
1116 &self,
1117 quantifier: &uni_cypher::ast::Quantifier,
1118 variable: &str,
1119 list: &Expr,
1120 predicate: &Expr,
1121 input_schema: &Schema,
1122 ) -> Result<Arc<dyn PhysicalExpr>> {
1123 let input_list_phy = self.compile(list, input_schema)?;
1124
1125 let list_data_type = input_list_phy.data_type(input_schema)?;
1127 let inner_data_type =
1128 resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1129
1130 let mut fields = input_schema.fields().to_vec();
1134 let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1135
1136 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1138 fields[pos] = loop_var_field;
1139 } else {
1140 fields.push(loop_var_field);
1141 }
1142
1143 let inner_schema = Arc::new(Schema::new(fields));
1144
1145 let mut scoped_ctx = None;
1149 let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1150
1151 let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1152
1153 if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1155 predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1156 }
1157
1158 let qt = match quantifier {
1159 uni_cypher::ast::Quantifier::All => QuantifierType::All,
1160 uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1161 uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1162 uni_cypher::ast::Quantifier::None => QuantifierType::None,
1163 };
1164
1165 Ok(Arc::new(QuantifierExecExpr::new(
1166 input_list_phy,
1167 predicate_phy,
1168 variable.to_string(),
1169 Arc::new(input_schema.clone()),
1170 qt,
1171 )))
1172 }
1173
1174 fn compile_pattern_comprehension(
1175 &self,
1176 path_variable: &Option<String>,
1177 pattern: &uni_cypher::ast::Pattern,
1178 where_clause: Option<&Expr>,
1179 map_expr: &Expr,
1180 input_schema: &Schema,
1181 ) -> Result<Arc<dyn PhysicalExpr>> {
1182 let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1183
1184 let graph_ctx = self
1185 .graph_ctx
1186 .as_ref()
1187 .ok_or_else(|| err("GraphExecutionContext"))?;
1188 let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1189
1190 let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1192
1193 let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1195
1196 let inner_schema = build_inner_schema(
1198 input_schema,
1199 &steps,
1200 &vertex_props,
1201 &edge_props,
1202 path_variable.as_deref(),
1203 );
1204
1205 let pred_phy = where_clause
1207 .map(|p| self.compile(p, &inner_schema))
1208 .transpose()?;
1209 let map_phy = self.compile(map_expr, &inner_schema)?;
1210 let output_type = map_phy.data_type(&inner_schema)?;
1211
1212 Ok(Arc::new(PatternComprehensionExecExpr::new(
1214 graph_ctx.clone(),
1215 anchor_col,
1216 steps,
1217 path_variable.clone(),
1218 pred_phy,
1219 map_phy,
1220 Arc::new(input_schema.clone()),
1221 Arc::new(inner_schema),
1222 output_type,
1223 vertex_props,
1224 edge_props,
1225 )))
1226 }
1227
1228 fn compile_function_with_custom_args(
1234 &self,
1235 name: &str,
1236 args: &[Expr],
1237 _distinct: bool,
1238 input_schema: &Schema,
1239 ) -> Result<Arc<dyn PhysicalExpr>> {
1240 let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1242 .iter()
1243 .map(|arg| self.compile(arg, input_schema))
1244 .collect::<Result<Vec<_>>>()?;
1245
1246 let udf_name = Self::cypher_fn_to_udf(name);
1248 let udf = self
1249 .state
1250 .scalar_functions()
1251 .get(udf_name.as_str())
1252 .ok_or_else(|| {
1253 anyhow!(
1254 "UDF '{}' not found in registry for function '{}'",
1255 udf_name,
1256 name
1257 )
1258 })?;
1259
1260 let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1262 let operand_types: Vec<(&str, DataType)> = compiled_args
1263 .iter()
1264 .enumerate()
1265 .map(|(i, arg)| {
1266 let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1267 let placeholder = placeholders[i.min(3)];
1268 (placeholder, dt)
1269 })
1270 .collect();
1271
1272 let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1273 .iter()
1274 .map(|(name, _)| {
1275 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1276 None::<String>,
1277 *name,
1278 ))
1279 })
1280 .collect();
1281
1282 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1283 datafusion::logical_expr::expr::ScalarFunction {
1284 func: udf.clone(),
1285 args: dummy_cols,
1286 },
1287 );
1288
1289 self.plan_udf_physical_expr(
1291 &udf_expr,
1292 &operand_types,
1293 compiled_args,
1294 &format!("function {}", name),
1295 )
1296 }
1297
1298 fn cypher_fn_to_udf(name: &str) -> String {
1303 match name.to_uppercase().as_str() {
1304 "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1305 "REVERSE" => "_cypher_reverse".to_string(),
1306 "TOSTRING" => "tostring".to_string(),
1307 "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1308 "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1309 "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1310 "HEAD" => "head".to_string(),
1311 "LAST" => "last".to_string(),
1312 "TAIL" => "tail".to_string(),
1313 "KEYS" => "keys".to_string(),
1314 "TYPE" => "type".to_string(),
1315 "PROPERTIES" => "properties".to_string(),
1316 "LABELS" => "labels".to_string(),
1317 "COALESCE" => "coalesce".to_string(),
1318 "ID" => "id".to_string(),
1319 _ => name.to_lowercase(),
1321 }
1322 }
1323
1324 fn compile_case(
1333 &self,
1334 operand: &Option<Box<Expr>>,
1335 when_then: &[(Expr, Expr)],
1336 else_expr: &Option<Box<Expr>>,
1337 input_schema: &Schema,
1338 ) -> Result<Arc<dyn PhysicalExpr>> {
1339 let operand_phy = operand
1340 .as_deref()
1341 .map(|e| self.compile(e, input_schema))
1342 .transpose()?;
1343
1344 let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1345 .iter()
1346 .map(|(w, t)| {
1347 let w_phy = self.compile(w, input_schema)?;
1348 let t_phy = self.compile(t, input_schema)?;
1349 Ok((w_phy, t_phy))
1350 })
1351 .collect::<Result<Vec<_>>>()?;
1352
1353 let mut else_phy = else_expr
1354 .as_deref()
1355 .map(|e| self.compile(e, input_schema))
1356 .transpose()?;
1357
1358 for (w_phy, _) in &mut when_then_phy {
1360 if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1361 *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1362 }
1363 }
1364
1365 let branch_types: Vec<DataType> = when_then_phy
1367 .iter()
1368 .map(|(_, t)| t.data_type(input_schema))
1369 .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1370 .filter_map(Result::ok)
1371 .collect();
1372
1373 let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1374 let has_list = branch_types
1375 .iter()
1376 .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1377
1378 if has_large_binary && has_list {
1380 for (_, t_phy) in &mut when_then_phy {
1381 if let Ok(dt) = t_phy.data_type(input_schema)
1382 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1383 {
1384 *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1385 }
1386 }
1387 if let Some(e_phy) = else_phy.take() {
1388 if let Ok(dt) = e_phy.data_type(input_schema)
1389 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1390 {
1391 else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1392 } else {
1393 else_phy = Some(e_phy);
1394 }
1395 }
1396 }
1397
1398 let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1399 operand_phy,
1400 when_then_phy,
1401 else_phy,
1402 )
1403 .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1404
1405 Ok(Arc::new(case_expr))
1406 }
1407
1408 fn compile_binary_op(
1409 &self,
1410 op: &BinaryOp,
1411 left: Arc<dyn PhysicalExpr>,
1412 right: Arc<dyn PhysicalExpr>,
1413 input_schema: &Schema,
1414 ) -> Result<Arc<dyn PhysicalExpr>> {
1415 use datafusion::logical_expr::Operator;
1416
1417 let string_op = match op {
1419 BinaryOp::StartsWith => Some(StringOp::StartsWith),
1420 BinaryOp::EndsWith => Some(StringOp::EndsWith),
1421 BinaryOp::Contains => Some(StringOp::Contains),
1422 _ => None,
1423 };
1424 if let Some(sop) = string_op {
1425 return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1426 }
1427
1428 let df_op = match op {
1429 BinaryOp::Add => Operator::Plus,
1430 BinaryOp::Sub => Operator::Minus,
1431 BinaryOp::Mul => Operator::Multiply,
1432 BinaryOp::Div => Operator::Divide,
1433 BinaryOp::Mod => Operator::Modulo,
1434 BinaryOp::Eq => Operator::Eq,
1435 BinaryOp::NotEq => Operator::NotEq,
1436 BinaryOp::Gt => Operator::Gt,
1437 BinaryOp::GtEq => Operator::GtEq,
1438 BinaryOp::Lt => Operator::Lt,
1439 BinaryOp::LtEq => Operator::LtEq,
1440 BinaryOp::And => Operator::And,
1441 BinaryOp::Or => Operator::Or,
1442 BinaryOp::Xor => {
1443 return Err(anyhow!(
1444 "XOR not supported via binary helper, use bitwise_xor"
1445 ));
1446 }
1447 BinaryOp::Regex => Operator::RegexMatch,
1448 BinaryOp::ApproxEq => {
1449 return Err(anyhow!(
1450 "ApproxEq (~=) not yet supported in physical compiler"
1451 ));
1452 }
1453 BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1454 _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1455 };
1456
1457 let mut left = left;
1461 let mut right = right;
1462 let mut left_type = left.data_type(input_schema).ok();
1463 let mut right_type = right.data_type(input_schema).ok();
1464
1465 let left_is_list = matches!(
1468 left_type.as_ref(),
1469 Some(DataType::List(_) | DataType::LargeList(_))
1470 );
1471 let right_is_list = matches!(
1472 right_type.as_ref(),
1473 Some(DataType::List(_) | DataType::LargeList(_))
1474 );
1475
1476 if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1477 left = Arc::new(LargeListToCypherValueExpr::new(left));
1478 left_type = Some(DataType::LargeBinary);
1479 } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1480 right = Arc::new(LargeListToCypherValueExpr::new(right));
1481 right_type = Some(DataType::LargeBinary);
1482 }
1483
1484 let has_cv =
1485 is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1486
1487 if has_cv {
1488 if let Some(result) = self.compile_cv_comparison(
1489 df_op,
1490 left.clone(),
1491 right.clone(),
1492 &left_type,
1493 &right_type,
1494 )? {
1495 return Ok(result);
1496 }
1497 if let Some(result) = self.compile_cv_list_concat(
1498 left.clone(),
1499 right.clone(),
1500 &left_type,
1501 &right_type,
1502 df_op,
1503 )? {
1504 return Ok(result);
1505 }
1506 if let Some(result) = self.compile_cv_arithmetic(
1507 df_op,
1508 left.clone(),
1509 right.clone(),
1510 &left_type,
1511 &right_type,
1512 input_schema,
1513 )? {
1514 return Ok(result);
1515 }
1516 }
1517
1518 binary(left, df_op, right, input_schema)
1520 .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1521 }
1522
1523 fn compile_cv_comparison(
1525 &self,
1526 df_op: datafusion::logical_expr::Operator,
1527 left: Arc<dyn PhysicalExpr>,
1528 right: Arc<dyn PhysicalExpr>,
1529 left_type: &Option<DataType>,
1530 right_type: &Option<DataType>,
1531 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1532 use datafusion::logical_expr::Operator;
1533
1534 let udf_name = match df_op {
1535 Operator::Eq => "_cypher_equal",
1536 Operator::NotEq => "_cypher_not_equal",
1537 Operator::Gt => "_cypher_gt",
1538 Operator::GtEq => "_cypher_gt_eq",
1539 Operator::Lt => "_cypher_lt",
1540 Operator::LtEq => "_cypher_lt_eq",
1541 _ => return Ok(None),
1542 };
1543
1544 self.plan_binary_udf(
1545 udf_name,
1546 left,
1547 right,
1548 left_type.clone().unwrap_or(DataType::LargeBinary),
1549 right_type.clone().unwrap_or(DataType::LargeBinary),
1550 )
1551 }
1552
1553 fn compile_cv_list_concat(
1555 &self,
1556 left: Arc<dyn PhysicalExpr>,
1557 right: Arc<dyn PhysicalExpr>,
1558 left_type: &Option<DataType>,
1559 right_type: &Option<DataType>,
1560 df_op: datafusion::logical_expr::Operator,
1561 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1562 use datafusion::logical_expr::Operator;
1563
1564 if df_op != Operator::Plus {
1565 return Ok(None);
1566 }
1567
1568 let is_list = |t: &Option<DataType>| {
1570 t.as_ref()
1571 .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1572 };
1573
1574 if !is_list(left_type) && !is_list(right_type) {
1575 return Ok(None);
1576 }
1577
1578 self.plan_binary_udf(
1579 "_cypher_list_concat",
1580 left,
1581 right,
1582 left_type.clone().unwrap_or(DataType::LargeBinary),
1583 right_type.clone().unwrap_or(DataType::LargeBinary),
1584 )
1585 }
1586
1587 fn compile_cv_arithmetic(
1592 &self,
1593 df_op: datafusion::logical_expr::Operator,
1594 left: Arc<dyn PhysicalExpr>,
1595 right: Arc<dyn PhysicalExpr>,
1596 left_type: &Option<DataType>,
1597 right_type: &Option<DataType>,
1598 _input_schema: &Schema,
1599 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1600 use datafusion::logical_expr::Operator;
1601
1602 let udf_name = match df_op {
1603 Operator::Plus => "_cypher_add",
1604 Operator::Minus => "_cypher_sub",
1605 Operator::Multiply => "_cypher_mul",
1606 Operator::Divide => "_cypher_div",
1607 Operator::Modulo => "_cypher_mod",
1608 _ => return Ok(None),
1609 };
1610
1611 self.plan_binary_udf(
1612 udf_name,
1613 left,
1614 right,
1615 left_type.clone().unwrap_or(DataType::LargeBinary),
1616 right_type.clone().unwrap_or(DataType::LargeBinary),
1617 )
1618 }
1619
1620 fn plan_udf_physical_expr(
1622 &self,
1623 udf_expr: &datafusion::logical_expr::Expr,
1624 operand_types: &[(&str, DataType)],
1625 children: Vec<Arc<dyn PhysicalExpr>>,
1626 error_context: &str,
1627 ) -> Result<Arc<dyn PhysicalExpr>> {
1628 let tmp_schema = Schema::new(
1629 operand_types
1630 .iter()
1631 .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1632 .collect::<Vec<_>>(),
1633 );
1634 let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1635 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1636 let udf_phy = planner
1637 .create_physical_expr(udf_expr, &df_schema, self.state)
1638 .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1639 udf_phy
1640 .with_new_children(children)
1641 .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1642 }
1643
1644 fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1648 let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1649 return Err(anyhow!("_cv_to_bool UDF not found"));
1650 };
1651
1652 let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1653 None::<String>,
1654 "__cv__",
1655 ));
1656 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1657 datafusion::logical_expr::expr::ScalarFunction {
1658 func: udf.clone(),
1659 args: vec![dummy_col],
1660 },
1661 );
1662
1663 self.plan_udf_physical_expr(
1664 &udf_expr,
1665 &[("__cv__", DataType::LargeBinary)],
1666 vec![expr],
1667 "CypherValue to bool",
1668 )
1669 }
1670
1671 fn plan_binary_udf(
1673 &self,
1674 udf_name: &str,
1675 left: Arc<dyn PhysicalExpr>,
1676 right: Arc<dyn PhysicalExpr>,
1677 left_type: DataType,
1678 right_type: DataType,
1679 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1680 let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1681 return Ok(None);
1682 };
1683 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1684 datafusion::logical_expr::expr::ScalarFunction {
1685 func: udf.clone(),
1686 args: vec![
1687 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1688 None::<String>,
1689 "__left__",
1690 )),
1691 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1692 None::<String>,
1693 "__right__",
1694 )),
1695 ],
1696 },
1697 );
1698 let result = self.plan_udf_physical_expr(
1699 &udf_expr,
1700 &[("__left__", left_type), ("__right__", right_type)],
1701 vec![left, right],
1702 udf_name,
1703 )?;
1704 Ok(Some(result))
1705 }
1706
1707 fn compile_unary_op(
1708 &self,
1709 op: &UnaryOp,
1710 expr: Arc<dyn PhysicalExpr>,
1711 input_schema: &Schema,
1712 ) -> Result<Arc<dyn PhysicalExpr>> {
1713 match op {
1714 UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1715 UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1716 }
1717 .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1718 }
1719}
1720
1721fn extract_source_variable(expr: &Expr) -> Option<String> {
1729 match expr {
1730 Expr::Property(inner, _) => extract_source_variable(inner),
1731 Expr::Variable(name) => Some(name.clone()),
1732 Expr::List(items) => items.first().and_then(extract_source_variable),
1733 _ => None,
1734 }
1735}
1736
1737fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1741 match expr {
1742 Expr::Property(_, prop) => vec![Some(prop.clone())],
1743 Expr::List(items) => items
1744 .iter()
1745 .map(|item| {
1746 if let Expr::Property(_, prop) = item {
1747 Some(prop.clone())
1748 } else {
1749 None
1750 }
1751 })
1752 .collect(),
1753 _ => vec![None],
1754 }
1755}
1756
1757fn normalize_similar_to_args<'e>(
1762 sources: &'e Expr,
1763 queries: &'e Expr,
1764) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1765 match (sources, queries) {
1766 (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1768 (src_items.iter().collect(), qry_items.iter().collect())
1769 }
1770 (Expr::List(src_items), _) if src_items.len() > 1 => {
1772 let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1773 (src_items.iter().collect(), queries_broadcast)
1774 }
1775 _ => (vec![sources], vec![queries]),
1777 }
1778}
1779
1780use datafusion::physical_plan::DisplayAs;
1781use datafusion::physical_plan::DisplayFormatType;
1782
1783#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1784enum StringOp {
1785 StartsWith,
1786 EndsWith,
1787 Contains,
1788}
1789
1790impl std::fmt::Display for StringOp {
1791 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792 match self {
1793 StringOp::StartsWith => write!(f, "STARTS WITH"),
1794 StringOp::EndsWith => write!(f, "ENDS WITH"),
1795 StringOp::Contains => write!(f, "CONTAINS"),
1796 }
1797 }
1798}
1799
1800#[derive(Debug, Eq)]
1801struct CypherStringMatchExpr {
1802 left: Arc<dyn PhysicalExpr>,
1803 right: Arc<dyn PhysicalExpr>,
1804 op: StringOp,
1805}
1806
1807impl PartialEq for CypherStringMatchExpr {
1808 fn eq(&self, other: &Self) -> bool {
1809 self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1810 }
1811}
1812
1813impl std::hash::Hash for CypherStringMatchExpr {
1814 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1815 self.op.hash(state);
1816 self.left.hash(state);
1817 self.right.hash(state);
1818 }
1819}
1820
1821impl CypherStringMatchExpr {
1822 fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1823 Self { left, right, op }
1824 }
1825}
1826
1827impl std::fmt::Display for CypherStringMatchExpr {
1828 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1829 write!(f, "{} {} {}", self.left, self.op, self.right)
1830 }
1831}
1832
1833impl DisplayAs for CypherStringMatchExpr {
1834 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1835 write!(f, "{}", self)
1836 }
1837}
1838
1839impl PhysicalExpr for CypherStringMatchExpr {
1840 fn as_any(&self) -> &dyn std::any::Any {
1841 self
1842 }
1843
1844 fn data_type(
1845 &self,
1846 _input_schema: &Schema,
1847 ) -> datafusion::error::Result<arrow_schema::DataType> {
1848 Ok(arrow_schema::DataType::Boolean)
1849 }
1850
1851 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1852 Ok(true)
1853 }
1854
1855 fn evaluate(
1856 &self,
1857 batch: &arrow_array::RecordBatch,
1858 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1859 use crate::query::df_udfs::invoke_cypher_string_op;
1860 use arrow_schema::Field;
1861 use datafusion::config::ConfigOptions;
1862 use datafusion::logical_expr::ScalarFunctionArgs;
1863
1864 let left_val = self.left.evaluate(batch)?;
1865 let right_val = self.right.evaluate(batch)?;
1866
1867 let args = ScalarFunctionArgs {
1868 args: vec![left_val, right_val],
1869 number_rows: batch.num_rows(),
1870 return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1871 config_options: Arc::new(ConfigOptions::default()),
1872 arg_fields: vec![], };
1874
1875 match self.op {
1876 StringOp::StartsWith => {
1877 invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1878 }
1879 StringOp::EndsWith => {
1880 invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1881 }
1882 StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1883 }
1884 }
1885
1886 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1887 vec![&self.left, &self.right]
1888 }
1889
1890 fn with_new_children(
1891 self: Arc<Self>,
1892 children: Vec<Arc<dyn PhysicalExpr>>,
1893 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1894 Ok(Arc::new(CypherStringMatchExpr::new(
1895 children[0].clone(),
1896 children[1].clone(),
1897 self.op,
1898 )))
1899 }
1900
1901 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1902 write!(f, "{}", self)
1903 }
1904}
1905
1906impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1907 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1908 if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1909 self == other
1910 } else {
1911 false
1912 }
1913 }
1914}
1915
1916#[derive(Debug, Eq)]
1921struct StructFieldAccessExpr {
1922 input: Arc<dyn PhysicalExpr>,
1924 field_idx: usize,
1926 output_type: arrow_schema::DataType,
1928}
1929
1930impl PartialEq for StructFieldAccessExpr {
1931 fn eq(&self, other: &Self) -> bool {
1932 self.field_idx == other.field_idx
1933 && self.input.eq(&other.input)
1934 && self.output_type == other.output_type
1935 }
1936}
1937
1938impl std::hash::Hash for StructFieldAccessExpr {
1939 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1940 self.input.hash(state);
1941 self.field_idx.hash(state);
1942 }
1943}
1944
1945impl StructFieldAccessExpr {
1946 fn new(
1947 input: Arc<dyn PhysicalExpr>,
1948 field_idx: usize,
1949 output_type: arrow_schema::DataType,
1950 ) -> Self {
1951 Self {
1952 input,
1953 field_idx,
1954 output_type,
1955 }
1956 }
1957}
1958
1959impl std::fmt::Display for StructFieldAccessExpr {
1960 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1961 write!(f, "{}[{}]", self.input, self.field_idx)
1962 }
1963}
1964
1965impl DisplayAs for StructFieldAccessExpr {
1966 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1967 write!(f, "{}", self)
1968 }
1969}
1970
1971impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1972 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1973 if let Some(other) = other.as_any().downcast_ref::<Self>() {
1974 self.field_idx == other.field_idx && self.input.eq(&other.input)
1975 } else {
1976 false
1977 }
1978 }
1979}
1980
1981impl PhysicalExpr for StructFieldAccessExpr {
1982 fn as_any(&self) -> &dyn std::any::Any {
1983 self
1984 }
1985
1986 fn data_type(
1987 &self,
1988 _input_schema: &Schema,
1989 ) -> datafusion::error::Result<arrow_schema::DataType> {
1990 Ok(self.output_type.clone())
1991 }
1992
1993 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1994 Ok(true)
1995 }
1996
1997 fn evaluate(
1998 &self,
1999 batch: &arrow_array::RecordBatch,
2000 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2001 use arrow_array::StructArray;
2002
2003 let input_val = self.input.evaluate(batch)?;
2004 let array = input_val.into_array(batch.num_rows())?;
2005
2006 let struct_array = array
2007 .as_any()
2008 .downcast_ref::<StructArray>()
2009 .ok_or_else(|| {
2010 datafusion::error::DataFusionError::Execution(
2011 "StructFieldAccessExpr: input is not a StructArray".to_string(),
2012 )
2013 })?;
2014
2015 let field_col = struct_array.column(self.field_idx).clone();
2016 Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2017 }
2018
2019 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2020 vec![&self.input]
2021 }
2022
2023 fn with_new_children(
2024 self: Arc<Self>,
2025 children: Vec<Arc<dyn PhysicalExpr>>,
2026 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2027 Ok(Arc::new(StructFieldAccessExpr::new(
2028 children[0].clone(),
2029 self.field_idx,
2030 self.output_type.clone(),
2031 )))
2032 }
2033
2034 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2035 write!(f, "{}", self)
2036 }
2037}
2038
2039struct ExistsExecExpr {
2052 query: Query,
2053 graph_ctx: Arc<GraphExecutionContext>,
2054 session_ctx: Arc<RwLock<SessionContext>>,
2055 storage: Arc<StorageManager>,
2056 uni_schema: Arc<UniSchema>,
2057 params: HashMap<String, Value>,
2058}
2059
2060impl std::fmt::Debug for ExistsExecExpr {
2061 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2062 f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2063 }
2064}
2065
2066impl ExistsExecExpr {
2067 fn new(
2068 query: Query,
2069 graph_ctx: Arc<GraphExecutionContext>,
2070 session_ctx: Arc<RwLock<SessionContext>>,
2071 storage: Arc<StorageManager>,
2072 uni_schema: Arc<UniSchema>,
2073 params: HashMap<String, Value>,
2074 ) -> Self {
2075 Self {
2076 query,
2077 graph_ctx,
2078 session_ctx,
2079 storage,
2080 uni_schema,
2081 params,
2082 }
2083 }
2084}
2085
2086impl std::fmt::Display for ExistsExecExpr {
2087 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2088 write!(f, "EXISTS(<subquery>)")
2089 }
2090}
2091
2092impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2093 fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2094 false
2095 }
2096}
2097
2098impl PartialEq for ExistsExecExpr {
2099 fn eq(&self, _other: &Self) -> bool {
2100 false
2101 }
2102}
2103
2104impl Eq for ExistsExecExpr {}
2105
2106impl std::hash::Hash for ExistsExecExpr {
2107 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2108 "ExistsExecExpr".hash(state);
2109 }
2110}
2111
2112impl DisplayAs for ExistsExecExpr {
2113 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2114 write!(f, "{}", self)
2115 }
2116}
2117
2118impl PhysicalExpr for ExistsExecExpr {
2119 fn as_any(&self) -> &dyn std::any::Any {
2120 self
2121 }
2122
2123 fn data_type(
2124 &self,
2125 _input_schema: &Schema,
2126 ) -> datafusion::error::Result<arrow_schema::DataType> {
2127 Ok(DataType::Boolean)
2128 }
2129
2130 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2131 Ok(true)
2132 }
2133
2134 fn evaluate(
2135 &self,
2136 batch: &arrow_array::RecordBatch,
2137 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2138 let num_rows = batch.num_rows();
2139 let mut builder = BooleanBuilder::with_capacity(num_rows);
2140
2141 let schema = batch.schema();
2152 let mut entity_vars: HashSet<String> = HashSet::new();
2153 for field in schema.fields() {
2154 let name = field.name();
2155 if let Some(base) = name.strip_suffix("._vid") {
2156 entity_vars.insert(base.to_string());
2157 }
2158 if matches!(field.data_type(), DataType::Struct(_)) {
2159 entity_vars.insert(name.to_string());
2160 }
2161 if !name.contains('.')
2165 && !name.starts_with('_')
2166 && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2167 {
2168 entity_vars.insert(name.to_string());
2169 }
2170 }
2171 let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2172
2173 let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2176
2177 let planner = QueryPlanner::new(self.uni_schema.clone());
2179 let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2180 Ok(plan) => plan,
2181 Err(e) => {
2182 return Err(datafusion::error::DataFusionError::Execution(format!(
2183 "EXISTS subquery planning failed: {}",
2184 e
2185 )));
2186 }
2187 };
2188
2189 let graph_ctx = self.graph_ctx.clone();
2192 let session_ctx = self.session_ctx.clone();
2193 let storage = self.storage.clone();
2194 let uni_schema = self.uni_schema.clone();
2195 let base_params = self.params.clone();
2196
2197 let result = std::thread::scope(|s| {
2198 s.spawn(|| {
2199 let rt = tokio::runtime::Builder::new_current_thread()
2200 .enable_all()
2201 .build()
2202 .map_err(|e| {
2203 datafusion::error::DataFusionError::Execution(format!(
2204 "Failed to create runtime for EXISTS: {}",
2205 e
2206 ))
2207 })?;
2208
2209 for row_idx in 0..num_rows {
2210 let row_params = extract_row_params(batch, row_idx);
2211 let mut sub_params = base_params.clone();
2212 sub_params.extend(row_params);
2213
2214 for var in &entity_vars {
2217 let vid_key = format!("{}._vid", var);
2218 if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2219 sub_params.insert(var.clone(), vid_val);
2220 }
2221 }
2222
2223 let batches = rt.block_on(execute_subplan(
2224 &logical_plan,
2225 &sub_params,
2226 &HashMap::new(), &graph_ctx,
2228 &session_ctx,
2229 &storage,
2230 &uni_schema,
2231 ))?;
2232
2233 let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2234 builder.append_value(has_rows);
2235 }
2236
2237 Ok::<_, datafusion::error::DataFusionError>(())
2238 })
2239 .join()
2240 .unwrap_or_else(|_| {
2241 Err(datafusion::error::DataFusionError::Execution(
2242 "EXISTS subquery thread panicked".to_string(),
2243 ))
2244 })
2245 });
2246
2247 if let Err(e) = result {
2248 return Err(datafusion::error::DataFusionError::Execution(format!(
2249 "EXISTS subquery execution failed: {}",
2250 e
2251 )));
2252 }
2253
2254 Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2255 builder.finish(),
2256 )))
2257 }
2258
2259 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2260 vec![]
2261 }
2262
2263 fn with_new_children(
2264 self: Arc<Self>,
2265 children: Vec<Arc<dyn PhysicalExpr>>,
2266 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2267 if !children.is_empty() {
2268 return Err(datafusion::error::DataFusionError::Plan(
2269 "ExistsExecExpr has no children".to_string(),
2270 ));
2271 }
2272 Ok(self)
2273 }
2274
2275 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2276 write!(f, "{}", self)
2277 }
2278}
2279
2280fn has_mutation_clause(query: &Query) -> bool {
2287 match query {
2288 Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2289 matches!(
2290 c,
2291 Clause::Create(_)
2292 | Clause::Delete(_)
2293 | Clause::Set(_)
2294 | Clause::Remove(_)
2295 | Clause::Merge(_)
2296 ) || has_mutation_in_clause_exprs(c)
2297 }),
2298 Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2299 _ => false,
2300 }
2301}
2302
2303fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2305 let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2306
2307 match clause {
2308 Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2309 Clause::With(w) => {
2310 w.where_clause.as_ref().is_some_and(check_expr)
2311 || w.items.iter().any(|item| match item {
2312 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2313 ReturnItem::All => false,
2314 })
2315 }
2316 Clause::Return(r) => r.items.iter().any(|item| match item {
2317 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2318 ReturnItem::All => false,
2319 }),
2320 _ => false,
2321 }
2322}
2323
2324fn has_mutation_in_expr(expr: &Expr) -> bool {
2326 match expr {
2327 Expr::Exists { query, .. } => has_mutation_clause(query),
2328 _ => {
2329 let mut found = false;
2330 expr.for_each_child(&mut |child| {
2331 if has_mutation_in_expr(child) {
2332 found = true;
2333 }
2334 });
2335 found
2336 }
2337 }
2338}
2339
2340fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2346 match query {
2347 Query::Single(stmt) => Query::Single(Statement {
2348 clauses: stmt
2349 .clauses
2350 .iter()
2351 .map(|c| rewrite_clause_correlated(c, outer_vars))
2352 .collect(),
2353 }),
2354 Query::Union { left, right, all } => Query::Union {
2355 left: Box::new(rewrite_query_correlated(left, outer_vars)),
2356 right: Box::new(rewrite_query_correlated(right, outer_vars)),
2357 all: *all,
2358 },
2359 other => other.clone(),
2360 }
2361}
2362
2363fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2365 match clause {
2366 Clause::Match(m) => Clause::Match(MatchClause {
2367 optional: m.optional,
2368 pattern: m.pattern.clone(),
2369 where_clause: m
2370 .where_clause
2371 .as_ref()
2372 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2373 }),
2374 Clause::With(w) => Clause::With(WithClause {
2375 distinct: w.distinct,
2376 items: w
2377 .items
2378 .iter()
2379 .map(|item| rewrite_return_item(item, outer_vars))
2380 .collect(),
2381 order_by: w.order_by.as_ref().map(|items| {
2382 items
2383 .iter()
2384 .map(|si| SortItem {
2385 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2386 ascending: si.ascending,
2387 })
2388 .collect()
2389 }),
2390 skip: w
2391 .skip
2392 .as_ref()
2393 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2394 limit: w
2395 .limit
2396 .as_ref()
2397 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2398 where_clause: w
2399 .where_clause
2400 .as_ref()
2401 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2402 }),
2403 Clause::Return(r) => Clause::Return(ReturnClause {
2404 distinct: r.distinct,
2405 items: r
2406 .items
2407 .iter()
2408 .map(|item| rewrite_return_item(item, outer_vars))
2409 .collect(),
2410 order_by: r.order_by.as_ref().map(|items| {
2411 items
2412 .iter()
2413 .map(|si| SortItem {
2414 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2415 ascending: si.ascending,
2416 })
2417 .collect()
2418 }),
2419 skip: r
2420 .skip
2421 .as_ref()
2422 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2423 limit: r
2424 .limit
2425 .as_ref()
2426 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2427 }),
2428 Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2429 expr: rewrite_expr_correlated(&u.expr, outer_vars),
2430 variable: u.variable.clone(),
2431 }),
2432 other => other.clone(),
2433 }
2434}
2435
2436fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2437 match item {
2438 ReturnItem::All => ReturnItem::All,
2439 ReturnItem::Expr {
2440 expr,
2441 alias,
2442 source_text,
2443 } => ReturnItem::Expr {
2444 expr: rewrite_expr_correlated(expr, outer_vars),
2445 alias: alias.clone(),
2446 source_text: source_text.clone(),
2447 },
2448 }
2449}
2450
2451fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2454 match expr {
2455 Expr::Property(base, key) => {
2457 if let Expr::Variable(v) = base.as_ref()
2458 && outer_vars.contains(v)
2459 {
2460 return Expr::Parameter(format!("{}.{}", v, key));
2461 }
2462 Expr::Property(
2463 Box::new(rewrite_expr_correlated(base, outer_vars)),
2464 key.clone(),
2465 )
2466 }
2467 Expr::Exists {
2469 query,
2470 from_pattern_predicate,
2471 } => Expr::Exists {
2472 query: Box::new(rewrite_query_correlated(query, outer_vars)),
2473 from_pattern_predicate: *from_pattern_predicate,
2474 },
2475 Expr::CountSubquery(query) => {
2477 Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2478 }
2479 Expr::CollectSubquery(query) => {
2480 Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2481 }
2482 other => other
2484 .clone()
2485 .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2486 }
2487}
2488
2489fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2493 for idx in &schema.indexes {
2494 if let IndexDefinition::Vector(config) = idx
2495 && config.property == property
2496 {
2497 return Some(config.metric.clone());
2498 }
2499 }
2500 None
2501}