1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9 PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::planner::QueryPlanner;
14use anyhow::{Result, anyhow};
15use arrow_array::builder::BooleanBuilder;
16use arrow_schema::{DataType, Field, Schema};
17use datafusion::execution::context::SessionState;
18use datafusion::physical_expr::expressions::binary;
19use datafusion::physical_plan::PhysicalExpr;
20use datafusion::physical_planner::PhysicalPlanner;
21use datafusion::prelude::SessionContext;
22use parking_lot::RwLock;
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use uni_common::Value;
26use uni_common::core::schema::Schema as UniSchema;
27use uni_cypher::ast::{
28 BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
29 Statement, UnaryOp, UnwindClause, WithClause,
30};
31use uni_store::storage::manager::StorageManager;
32
33fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
35 dt.is_some_and(|t| *t == DataType::LargeBinary)
36}
37
38fn resolve_list_element_type(
48 list_data_type: &DataType,
49 large_binary_fallback: DataType,
50 context: &str,
51) -> Result<DataType> {
52 match list_data_type {
53 DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
54 DataType::Null => Ok(DataType::Null),
55 DataType::LargeBinary => Ok(large_binary_fallback),
56 _ => Err(anyhow!(
57 "{} input must be a list, got {:?}",
58 context,
59 list_data_type
60 )),
61 }
62}
63
64#[derive(Debug)]
69struct LargeListToCypherValueExpr {
70 child: Arc<dyn PhysicalExpr>,
71}
72
73impl LargeListToCypherValueExpr {
74 fn new(child: Arc<dyn PhysicalExpr>) -> Self {
75 Self { child }
76 }
77}
78
79impl std::fmt::Display for LargeListToCypherValueExpr {
80 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81 write!(f, "LargeListToCypherValue({})", self.child)
82 }
83}
84
85impl PartialEq for LargeListToCypherValueExpr {
86 fn eq(&self, other: &Self) -> bool {
87 Arc::ptr_eq(&self.child, &other.child)
88 }
89}
90
91impl Eq for LargeListToCypherValueExpr {}
92
93impl std::hash::Hash for LargeListToCypherValueExpr {
94 fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
95 std::any::type_name::<Self>().hash(_state);
97 }
98}
99
100impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
101 fn eq(&self, other: &dyn std::any::Any) -> bool {
102 other
103 .downcast_ref::<Self>()
104 .map(|x| self == x)
105 .unwrap_or(false)
106 }
107}
108
109impl PhysicalExpr for LargeListToCypherValueExpr {
110 fn as_any(&self) -> &dyn std::any::Any {
111 self
112 }
113
114 fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
115 Ok(DataType::LargeBinary)
116 }
117
118 fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
119 self.child.nullable(input_schema)
120 }
121
122 fn evaluate(
123 &self,
124 batch: &arrow_array::RecordBatch,
125 ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
126 use datafusion::arrow::compute::cast;
127 use datafusion::logical_expr::ColumnarValue;
128
129 let child_result = self.child.evaluate(batch)?;
130 let child_array = child_result.into_array(batch.num_rows())?;
131
132 let list_array = if let DataType::List(field) = child_array.data_type() {
134 let target_type = DataType::LargeList(field.clone());
135 cast(&child_array, &target_type).map_err(|e| {
136 datafusion::error::DataFusionError::Execution(format!(
137 "List to LargeList cast failed: {e}"
138 ))
139 })?
140 } else {
141 child_array.clone()
142 };
143
144 if list_array.data_type() == &DataType::LargeBinary {
146 return Ok(ColumnarValue::Array(list_array));
147 }
148
149 if let Some(large_list) = list_array
151 .as_any()
152 .downcast_ref::<datafusion::arrow::array::LargeListArray>()
153 {
154 let cv_array =
155 crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
156 Ok(ColumnarValue::Array(cv_array))
157 } else {
158 Err(datafusion::error::DataFusionError::Execution(format!(
159 "Expected List or LargeList, got {:?}",
160 list_array.data_type()
161 )))
162 }
163 }
164
165 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
166 vec![&self.child]
167 }
168
169 fn with_new_children(
170 self: Arc<Self>,
171 children: Vec<Arc<dyn PhysicalExpr>>,
172 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
173 if children.len() != 1 {
174 return Err(datafusion::error::DataFusionError::Execution(
175 "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
176 ));
177 }
178 Ok(Arc::new(LargeListToCypherValueExpr::new(
179 children[0].clone(),
180 )))
181 }
182
183 fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
184 write!(f, "LargeListToCypherValue({})", self.child)
185 }
186}
187
188pub struct CypherPhysicalExprCompiler<'a> {
190 state: &'a SessionState,
191 translation_ctx: Option<&'a TranslationContext>,
192 graph_ctx: Option<Arc<GraphExecutionContext>>,
193 uni_schema: Option<Arc<UniSchema>>,
194 session_ctx: Option<Arc<RwLock<SessionContext>>>,
196 storage: Option<Arc<StorageManager>>,
198 params: HashMap<String, Value>,
200}
201
202impl<'a> CypherPhysicalExprCompiler<'a> {
203 pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
204 Self {
205 state,
206 translation_ctx,
207 graph_ctx: None,
208 uni_schema: None,
209 session_ctx: None,
210 storage: None,
211 params: HashMap::new(),
212 }
213 }
214
215 fn scoped_compiler<'b>(
227 &'b self,
228 exclude_vars: &[&str],
229 scoped_ctx_slot: &'b mut Option<TranslationContext>,
230 ) -> CypherPhysicalExprCompiler<'b>
231 where
232 'a: 'b,
233 {
234 let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
235 exclude_vars
236 .iter()
237 .any(|v| ctx.variable_kinds.contains_key(*v))
238 });
239
240 let ctx_ref = if needs_scoping {
241 let ctx = self.translation_ctx.unwrap();
242 let mut new_kinds = ctx.variable_kinds.clone();
243 for v in exclude_vars {
244 new_kinds.remove(*v);
245 }
246 *scoped_ctx_slot = Some(TranslationContext {
247 parameters: ctx.parameters.clone(),
248 outer_values: ctx.outer_values.clone(),
249 variable_labels: ctx.variable_labels.clone(),
250 variable_kinds: new_kinds,
251 node_variable_hints: ctx.node_variable_hints.clone(),
252 mutation_edge_hints: ctx.mutation_edge_hints.clone(),
253 statement_time: ctx.statement_time,
254 });
255 scoped_ctx_slot.as_ref()
256 } else {
257 self.translation_ctx
258 };
259
260 CypherPhysicalExprCompiler {
261 state: self.state,
262 translation_ctx: ctx_ref,
263 graph_ctx: self.graph_ctx.clone(),
264 uni_schema: self.uni_schema.clone(),
265 session_ctx: self.session_ctx.clone(),
266 storage: self.storage.clone(),
267 params: self.params.clone(),
268 }
269 }
270
271 pub fn with_graph_ctx(
273 mut self,
274 graph_ctx: Arc<GraphExecutionContext>,
275 uni_schema: Arc<UniSchema>,
276 ) -> Self {
277 self.graph_ctx = Some(graph_ctx);
278 self.uni_schema = Some(uni_schema);
279 self
280 }
281
282 pub fn with_subquery_ctx(
284 mut self,
285 graph_ctx: Arc<GraphExecutionContext>,
286 uni_schema: Arc<UniSchema>,
287 session_ctx: Arc<RwLock<SessionContext>>,
288 storage: Arc<StorageManager>,
289 params: HashMap<String, Value>,
290 ) -> Self {
291 self.graph_ctx = Some(graph_ctx);
292 self.uni_schema = Some(uni_schema);
293 self.session_ctx = Some(session_ctx);
294 self.storage = Some(storage);
295 self.params = params;
296 self
297 }
298
299 pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
301 match expr {
302 Expr::ListComprehension {
303 variable,
304 list,
305 where_clause,
306 map_expr,
307 } => self.compile_list_comprehension(
308 variable,
309 list,
310 where_clause.as_deref(),
311 map_expr,
312 input_schema,
313 ),
314 Expr::Quantifier {
315 quantifier,
316 variable,
317 list,
318 predicate,
319 } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
320 Expr::Reduce {
321 accumulator,
322 init,
323 variable,
324 list,
325 expr: expression,
326 } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
327 Expr::BinaryOp { left, op, right } => {
329 self.compile_binary_op_dispatch(left, op, right, input_schema)
330 }
331 Expr::UnaryOp { op, expr: inner } => {
332 if matches!(op, UnaryOp::Not) {
333 let mut inner_phy = self.compile(inner, input_schema)?;
334 if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
335 inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
336 }
337 self.compile_unary_op(op, inner_phy, input_schema)
338 } else if Self::contains_custom_expr(inner) {
339 let inner_phy = self.compile(inner, input_schema)?;
340 self.compile_unary_op(op, inner_phy, input_schema)
341 } else {
342 self.compile_standard(expr, input_schema)
343 }
344 }
345 Expr::IsNull(inner) => {
346 if Self::contains_custom_expr(inner) {
347 let inner_phy = self.compile(inner, input_schema)?;
348 Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
349 .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
350 } else {
351 self.compile_standard(expr, input_schema)
352 }
353 }
354 Expr::IsNotNull(inner) => {
355 if Self::contains_custom_expr(inner) {
356 let inner_phy = self.compile(inner, input_schema)?;
357 Ok(
358 datafusion::physical_expr::expressions::is_not_null(inner_phy)
359 .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
360 )
361 } else {
362 self.compile_standard(expr, input_schema)
363 }
364 }
365 Expr::In {
367 expr: left,
368 list: right,
369 } => {
370 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
371 let left_phy = self.compile(left, input_schema)?;
372 let right_phy = self.compile(right, input_schema)?;
373
374 let left_type = left_phy
375 .data_type(input_schema)
376 .unwrap_or(DataType::LargeBinary);
377 let right_type = right_phy
378 .data_type(input_schema)
379 .unwrap_or(DataType::LargeBinary);
380
381 self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
382 .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
383 } else {
384 self.compile_standard(expr, input_schema)
385 }
386 }
387
388 Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
390 "List literals containing comprehensions not yet supported in compiler"
391 )),
392 Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
393 Err(anyhow!(
394 "Map literals containing comprehensions not yet supported in compiler"
395 ))
396 }
397
398 Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
400
401 Expr::ArrayIndex { array, index } => {
403 self.compile_array_index(array, index, input_schema)
404 }
405
406 Expr::PatternComprehension {
408 path_variable,
409 pattern,
410 where_clause,
411 map_expr,
412 } => self.compile_pattern_comprehension(
413 path_variable,
414 pattern,
415 where_clause.as_deref(),
416 map_expr,
417 input_schema,
418 ),
419
420 Expr::Exists { query, .. } => self.compile_exists(query),
422
423 Expr::FunctionCall {
425 name,
426 args,
427 distinct,
428 ..
429 } => {
430 if args.iter().any(Self::contains_custom_expr) {
431 self.compile_function_with_custom_args(name, args, *distinct, input_schema)
432 } else {
433 self.compile_standard(expr, input_schema)
434 }
435 }
436
437 Expr::Case {
439 expr: case_operand,
440 when_then,
441 else_expr,
442 } => {
443 let has_custom = case_operand
445 .as_deref()
446 .is_some_and(Self::contains_custom_expr)
447 || when_then.iter().any(|(w, t)| {
448 Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
449 })
450 || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
451
452 if has_custom {
453 self.compile_case(case_operand, when_then, else_expr, input_schema)
456 } else {
457 self.compile_standard(expr, input_schema)
462 }
463 }
464
465 Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
467
468 _ => self.compile_standard(expr, input_schema),
470 }
471 }
472
473 fn compile_binary_op_dispatch(
475 &self,
476 left: &Expr,
477 op: &BinaryOp,
478 right: &Expr,
479 input_schema: &Schema,
480 ) -> Result<Arc<dyn PhysicalExpr>> {
481 if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
482 && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
483 && let Some(ctx) = self.translation_ctx
484 && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
485 {
486 let identity_prop = match (lk, rk) {
487 (VariableKind::Node, VariableKind::Node) => Some("_vid"),
488 (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
489 _ => None,
490 };
491
492 if let Some(id_prop) = identity_prop {
493 return self.compile_standard(
494 &Expr::BinaryOp {
495 left: Box::new(Expr::Property(
496 Box::new(Expr::Variable(lv.clone())),
497 id_prop.to_string(),
498 )),
499 op: *op,
500 right: Box::new(Expr::Property(
501 Box::new(Expr::Variable(rv.clone())),
502 id_prop.to_string(),
503 )),
504 },
505 input_schema,
506 );
507 }
508 }
509
510 if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
514 return self.compile_standard(
515 &Expr::BinaryOp {
516 left: Box::new(left.clone()),
517 op: *op,
518 right: Box::new(right.clone()),
519 },
520 input_schema,
521 );
522 }
523
524 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
525 let left_phy = self.compile(left, input_schema)?;
526 let right_phy = self.compile(right, input_schema)?;
527 return self.compile_binary_op(op, left_phy, right_phy, input_schema);
528 }
529
530 if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
534 {
535 return self.compile_standard(
536 &Expr::BinaryOp {
537 left: Box::new(left.clone()),
538 op: *op,
539 right: Box::new(right.clone()),
540 },
541 input_schema,
542 );
543 }
544
545 let left_phy = self.compile(left, input_schema)?;
550 let right_phy = self.compile(right, input_schema)?;
551 let left_dt = left_phy.data_type(input_schema).ok();
552 let right_dt = right_phy.data_type(input_schema).ok();
553 let has_cv =
554 is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
555
556 if has_cv {
557 self.compile_binary_op(op, left_phy, right_phy, input_schema)
559 } else {
560 self.compile_standard(
563 &Expr::BinaryOp {
564 left: Box::new(left.clone()),
565 op: *op,
566 right: Box::new(right.clone()),
567 },
568 input_schema,
569 )
570 }
571 }
572
573 fn try_compile_struct_field(
578 &self,
579 var_name: &str,
580 field_name: &str,
581 input_schema: &Schema,
582 ) -> Option<Arc<dyn PhysicalExpr>> {
583 let col_idx = input_schema.index_of(var_name).ok()?;
584 let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
585 return None;
586 };
587
588 if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
590 let output_type = struct_fields[field_idx].data_type().clone();
591 let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
592 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
593 );
594 Some(Arc::new(StructFieldAccessExpr::new(
595 col_expr,
596 field_idx,
597 output_type,
598 )))
599 } else {
600 Some(Arc::new(
601 datafusion::physical_expr::expressions::Literal::new(
602 datafusion::common::ScalarValue::Null,
603 ),
604 ))
605 }
606 }
607
608 fn compile_property_access(
610 &self,
611 base: &Expr,
612 prop: &str,
613 input_schema: &Schema,
614 ) -> Result<Arc<dyn PhysicalExpr>> {
615 if let Expr::Variable(var_name) = base {
616 if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
618 return Ok(expr);
619 }
620 let flat_col = format!("{}.{}", var_name, prop);
622 if let Ok(col_idx) = input_schema.index_of(&flat_col) {
623 return Ok(Arc::new(
624 datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
625 ));
626 }
627 }
628 self.compile_standard(
629 &Expr::Property(Box::new(base.clone()), prop.to_string()),
630 input_schema,
631 )
632 }
633
634 fn compile_array_index(
636 &self,
637 array: &Expr,
638 index: &Expr,
639 input_schema: &Schema,
640 ) -> Result<Arc<dyn PhysicalExpr>> {
641 if let Expr::Variable(var_name) = array
642 && let Expr::Literal(CypherLiteral::String(prop)) = index
643 && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
644 {
645 return Ok(expr);
646 }
647 self.compile_standard(
648 &Expr::ArrayIndex {
649 array: Box::new(array.clone()),
650 index: Box::new(index.clone()),
651 },
652 input_schema,
653 )
654 }
655
656 fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
658 if has_mutation_clause(query) {
660 return Err(anyhow!(
661 "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
662 ));
663 }
664
665 let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
666
667 let graph_ctx = self
668 .graph_ctx
669 .clone()
670 .ok_or_else(|| err("GraphExecutionContext"))?;
671 let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
672 let session_ctx = self
673 .session_ctx
674 .clone()
675 .ok_or_else(|| err("SessionContext"))?;
676 let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
677
678 Ok(Arc::new(ExistsExecExpr::new(
679 query.clone(),
680 graph_ctx,
681 session_ctx,
682 storage,
683 uni_schema,
684 self.params.clone(),
685 )))
686 }
687
688 fn needs_vid_extraction_for_variable(
690 variable: &str,
691 map_expr: &Expr,
692 where_clause: Option<&Expr>,
693 ) -> bool {
694 fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
695 match expr {
696 Expr::PatternComprehension { pattern, .. } => {
697 pattern.paths.iter().any(|path| {
699 path.elements.iter().any(|elem| match elem {
700 uni_cypher::ast::PatternElement::Node(n) => {
701 n.variable.as_deref() == Some(var)
702 }
703 uni_cypher::ast::PatternElement::Relationship(r) => {
704 r.variable.as_deref() == Some(var)
705 }
706 _ => false,
707 })
708 })
709 }
710 Expr::FunctionCall { args, .. } => args
711 .iter()
712 .any(|a| expr_has_pattern_comp_referencing(a, var)),
713 Expr::BinaryOp { left, right, .. } => {
714 expr_has_pattern_comp_referencing(left, var)
715 || expr_has_pattern_comp_referencing(right, var)
716 }
717 Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
718 expr_has_pattern_comp_referencing(e, var)
719 }
720 Expr::List(items) => items
721 .iter()
722 .any(|i| expr_has_pattern_comp_referencing(i, var)),
723 Expr::ListComprehension {
724 list,
725 map_expr,
726 where_clause,
727 ..
728 } => {
729 expr_has_pattern_comp_referencing(list, var)
730 || expr_has_pattern_comp_referencing(map_expr, var)
731 || where_clause
732 .as_ref()
733 .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
734 }
735 _ => false,
736 }
737 }
738
739 expr_has_pattern_comp_referencing(map_expr, variable)
740 || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
741 }
742
743 pub fn contains_custom_expr(expr: &Expr) -> bool {
745 match expr {
746 Expr::ListComprehension { .. } => true,
747 Expr::Quantifier { .. } => true,
748 Expr::Reduce { .. } => true,
749 Expr::PatternComprehension { .. } => true,
750 Expr::BinaryOp { left, right, .. } => {
751 Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
752 }
753 Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
754 Expr::FunctionCall { args, .. } => args.iter().any(Self::contains_custom_expr),
755 Expr::Case {
756 when_then,
757 else_expr,
758 ..
759 } => {
760 when_then
761 .iter()
762 .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
763 || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
764 }
765 Expr::List(items) => items.iter().any(Self::contains_custom_expr),
766 Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
767 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
768 Expr::In { expr: l, list: r } => {
769 Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
770 }
771 Expr::Exists { .. } => true,
772 Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
773 _ => false,
774 }
775 }
776
777 fn is_list_producing(expr: &Expr) -> bool {
780 match expr {
781 Expr::List(_) => true,
782 Expr::ListComprehension { .. } => true,
783 Expr::ArraySlice { .. } => true,
784 Expr::BinaryOp {
786 left,
787 op: BinaryOp::Add,
788 right,
789 } => Self::is_list_producing(left) || Self::is_list_producing(right),
790 Expr::FunctionCall { name, .. } => {
791 matches!(
793 name.as_str(),
794 "range"
795 | "tail"
796 | "reverse"
797 | "collect"
798 | "keys"
799 | "labels"
800 | "nodes"
801 | "relationships"
802 )
803 }
804 _ => false,
805 }
806 }
807
808 fn compile_standard(
809 &self,
810 expr: &Expr,
811 input_schema: &Schema,
812 ) -> Result<Arc<dyn PhysicalExpr>> {
813 let df_expr = cypher_expr_to_df(expr, self.translation_ctx)?;
814 let resolved_expr = self.resolve_udfs(df_expr)?;
815
816 let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
817
818 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
820
821 let coerced_expr = self.resolve_udfs(coerced_expr)?;
823
824 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
825 planner
826 .create_physical_expr(&coerced_expr, &df_schema, self.state)
827 .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
828 }
829
830 fn resolve_udfs(
835 &self,
836 expr: datafusion::logical_expr::Expr,
837 ) -> Result<datafusion::logical_expr::Expr> {
838 use datafusion::common::tree_node::{Transformed, TreeNode};
839 use datafusion::logical_expr::Expr as DfExpr;
840
841 let result = expr
842 .transform_up(|node| {
843 if let DfExpr::ScalarFunction(ref func) = node {
844 let udf_name = func.func.name();
845 if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
846 return Ok(Transformed::yes(DfExpr::ScalarFunction(
847 datafusion::logical_expr::expr::ScalarFunction {
848 func: registered_udf.clone(),
849 args: func.args.clone(),
850 },
851 )));
852 }
853 }
854 Ok(Transformed::no(node))
855 })
856 .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
857 Ok(result.data)
858 }
859
860 fn compile_list_comprehension(
861 &self,
862 variable: &str,
863 list: &Expr,
864 where_clause: Option<&Expr>,
865 map_expr: &Expr,
866 input_schema: &Schema,
867 ) -> Result<Arc<dyn PhysicalExpr>> {
868 let input_list_phy = self.compile(list, input_schema)?;
869
870 let list_data_type = input_list_phy.data_type(input_schema)?;
872 let inner_data_type = resolve_list_element_type(
873 &list_data_type,
874 DataType::LargeBinary,
875 "List comprehension",
876 )?;
877
878 let mut fields = input_schema.fields().to_vec();
880 let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
881
882 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
883 fields[pos] = loop_var_field;
884 } else {
885 fields.push(loop_var_field);
886 }
887
888 let needs_vid_extraction =
890 Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
891 if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
892 let vid_field = Arc::new(Field::new(
894 format!("{}._vid", variable),
895 DataType::UInt64,
896 true,
897 ));
898 fields.push(vid_field);
899 }
900
901 let inner_schema = Arc::new(Schema::new(fields));
902
903 let mut scoped_ctx = None;
905 let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
906
907 let predicate_phy = if let Some(pred) = where_clause {
908 Some(inner_compiler.compile(pred, &inner_schema)?)
909 } else {
910 None
911 };
912
913 let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
914 let output_item_type = map_phy.data_type(&inner_schema)?;
915
916 Ok(Arc::new(ListComprehensionExecExpr::new(
917 input_list_phy,
918 map_phy,
919 predicate_phy,
920 variable.to_string(),
921 Arc::new(input_schema.clone()),
922 output_item_type,
923 needs_vid_extraction,
924 )))
925 }
926
927 fn compile_reduce(
928 &self,
929 accumulator: &str,
930 initial: &Expr,
931 variable: &str,
932 list: &Expr,
933 reduce_expr: &Expr,
934 input_schema: &Schema,
935 ) -> Result<Arc<dyn PhysicalExpr>> {
936 let list_phy = self.compile(list, input_schema)?;
937
938 let initial_phy = self.compile(initial, input_schema)?;
939 let acc_type = initial_phy.data_type(input_schema)?;
940
941 let list_data_type = list_phy.data_type(input_schema)?;
942 let inner_data_type =
945 resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
946
947 let mut fields = input_schema.fields().to_vec();
949
950 let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
951 if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
952 fields[pos] = acc_field;
953 } else {
954 fields.push(acc_field);
955 }
956
957 let var_field = Arc::new(Field::new(variable, inner_data_type, true));
958 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
959 fields[pos] = var_field;
960 } else {
961 fields.push(var_field);
962 }
963
964 let inner_schema = Arc::new(Schema::new(fields));
965
966 let mut scoped_ctx = None;
968 let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
969
970 let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
971 let output_type = reduce_phy.data_type(&inner_schema)?;
972
973 Ok(Arc::new(ReduceExecExpr::new(
974 accumulator.to_string(),
975 initial_phy,
976 variable.to_string(),
977 list_phy,
978 reduce_phy,
979 Arc::new(input_schema.clone()),
980 output_type,
981 )))
982 }
983
984 fn compile_quantifier(
985 &self,
986 quantifier: &uni_cypher::ast::Quantifier,
987 variable: &str,
988 list: &Expr,
989 predicate: &Expr,
990 input_schema: &Schema,
991 ) -> Result<Arc<dyn PhysicalExpr>> {
992 let input_list_phy = self.compile(list, input_schema)?;
993
994 let list_data_type = input_list_phy.data_type(input_schema)?;
996 let inner_data_type =
997 resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
998
999 let mut fields = input_schema.fields().to_vec();
1003 let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1004
1005 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1007 fields[pos] = loop_var_field;
1008 } else {
1009 fields.push(loop_var_field);
1010 }
1011
1012 let inner_schema = Arc::new(Schema::new(fields));
1013
1014 let mut scoped_ctx = None;
1018 let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1019
1020 let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1021
1022 if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1024 predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1025 }
1026
1027 let qt = match quantifier {
1028 uni_cypher::ast::Quantifier::All => QuantifierType::All,
1029 uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1030 uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1031 uni_cypher::ast::Quantifier::None => QuantifierType::None,
1032 };
1033
1034 Ok(Arc::new(QuantifierExecExpr::new(
1035 input_list_phy,
1036 predicate_phy,
1037 variable.to_string(),
1038 Arc::new(input_schema.clone()),
1039 qt,
1040 )))
1041 }
1042
1043 fn compile_pattern_comprehension(
1044 &self,
1045 path_variable: &Option<String>,
1046 pattern: &uni_cypher::ast::Pattern,
1047 where_clause: Option<&Expr>,
1048 map_expr: &Expr,
1049 input_schema: &Schema,
1050 ) -> Result<Arc<dyn PhysicalExpr>> {
1051 let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1052
1053 let graph_ctx = self
1054 .graph_ctx
1055 .as_ref()
1056 .ok_or_else(|| err("GraphExecutionContext"))?;
1057 let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1058
1059 let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1061
1062 let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1064
1065 let inner_schema = build_inner_schema(
1067 input_schema,
1068 &steps,
1069 &vertex_props,
1070 &edge_props,
1071 path_variable.as_deref(),
1072 );
1073
1074 let pred_phy = where_clause
1076 .map(|p| self.compile(p, &inner_schema))
1077 .transpose()?;
1078 let map_phy = self.compile(map_expr, &inner_schema)?;
1079 let output_type = map_phy.data_type(&inner_schema)?;
1080
1081 Ok(Arc::new(PatternComprehensionExecExpr::new(
1083 graph_ctx.clone(),
1084 anchor_col,
1085 steps,
1086 path_variable.clone(),
1087 pred_phy,
1088 map_phy,
1089 Arc::new(input_schema.clone()),
1090 Arc::new(inner_schema),
1091 output_type,
1092 vertex_props,
1093 edge_props,
1094 )))
1095 }
1096
1097 fn compile_function_with_custom_args(
1103 &self,
1104 name: &str,
1105 args: &[Expr],
1106 _distinct: bool,
1107 input_schema: &Schema,
1108 ) -> Result<Arc<dyn PhysicalExpr>> {
1109 let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1111 .iter()
1112 .map(|arg| self.compile(arg, input_schema))
1113 .collect::<Result<Vec<_>>>()?;
1114
1115 let udf_name = Self::cypher_fn_to_udf(name);
1117 let udf = self
1118 .state
1119 .scalar_functions()
1120 .get(udf_name.as_str())
1121 .ok_or_else(|| {
1122 anyhow!(
1123 "UDF '{}' not found in registry for function '{}'",
1124 udf_name,
1125 name
1126 )
1127 })?;
1128
1129 let operand_types: Vec<(&str, DataType)> = compiled_args
1131 .iter()
1132 .enumerate()
1133 .map(|(i, arg)| {
1134 let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1135 let placeholder: &str = match i {
1137 0 => "__arg0__",
1138 1 => "__arg1__",
1139 2 => "__arg2__",
1140 _ => "__argN__",
1141 };
1142 (placeholder, dt)
1143 })
1144 .collect();
1145
1146 let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1148 .iter()
1149 .map(|(name, _)| {
1150 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1151 None::<String>,
1152 *name,
1153 ))
1154 })
1155 .collect();
1156
1157 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1158 datafusion::logical_expr::expr::ScalarFunction {
1159 func: udf.clone(),
1160 args: dummy_cols,
1161 },
1162 );
1163
1164 self.plan_udf_physical_expr(
1166 &udf_expr,
1167 &operand_types
1168 .iter()
1169 .map(|(n, dt)| (*n, dt.clone()))
1170 .collect::<Vec<_>>(),
1171 compiled_args,
1172 &format!("function {}", name),
1173 )
1174 }
1175
1176 fn cypher_fn_to_udf(name: &str) -> String {
1181 match name.to_uppercase().as_str() {
1182 "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1183 "REVERSE" => "_cypher_reverse".to_string(),
1184 "TOSTRING" => "tostring".to_string(),
1185 "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1186 "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1187 "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1188 "HEAD" => "head".to_string(),
1189 "LAST" => "last".to_string(),
1190 "TAIL" => "tail".to_string(),
1191 "KEYS" => "keys".to_string(),
1192 "TYPE" => "type".to_string(),
1193 "PROPERTIES" => "properties".to_string(),
1194 "LABELS" => "labels".to_string(),
1195 "COALESCE" => "coalesce".to_string(),
1196 "ID" => "id".to_string(),
1197 _ => name.to_lowercase(),
1199 }
1200 }
1201
1202 fn compile_case(
1211 &self,
1212 operand: &Option<Box<Expr>>,
1213 when_then: &[(Expr, Expr)],
1214 else_expr: &Option<Box<Expr>>,
1215 input_schema: &Schema,
1216 ) -> Result<Arc<dyn PhysicalExpr>> {
1217 let operand_phy = operand
1218 .as_deref()
1219 .map(|e| self.compile(e, input_schema))
1220 .transpose()?;
1221
1222 let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1223 .iter()
1224 .map(|(w, t)| {
1225 let w_phy = self.compile(w, input_schema)?;
1226 let t_phy = self.compile(t, input_schema)?;
1227 Ok((w_phy, t_phy))
1228 })
1229 .collect::<Result<Vec<_>>>()?;
1230
1231 let mut else_phy = else_expr
1232 .as_deref()
1233 .map(|e| self.compile(e, input_schema))
1234 .transpose()?;
1235
1236 for (w_phy, _) in &mut when_then_phy {
1239 let should_wrap = match w_phy.data_type(input_schema) {
1240 Ok(dt) => dt == DataType::LargeBinary,
1241 Err(_) => {
1242 false
1246 }
1247 };
1248 if should_wrap {
1249 *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1250 }
1251 }
1252
1253 let mut branch_types: Vec<DataType> = Vec::new();
1256 for (_, t_phy) in &when_then_phy {
1257 if let Ok(dt) = t_phy.data_type(input_schema) {
1258 branch_types.push(dt);
1259 }
1260 }
1261 if let Some(ref e_phy) = else_phy
1262 && let Ok(dt) = e_phy.data_type(input_schema)
1263 {
1264 branch_types.push(dt);
1265 }
1266
1267 let has_large_binary = branch_types
1269 .iter()
1270 .any(|dt| matches!(dt, DataType::LargeBinary));
1271 let has_list = branch_types
1272 .iter()
1273 .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1274
1275 if has_large_binary && has_list {
1277 for (_, t_phy) in &mut when_then_phy {
1278 if let Ok(dt) = t_phy.data_type(input_schema)
1279 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1280 {
1281 *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1282 }
1283 }
1284 if let Some(e_phy) = else_phy.take() {
1285 if let Ok(dt) = e_phy.data_type(input_schema)
1286 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1287 {
1288 else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1289 } else {
1290 else_phy = Some(e_phy);
1291 }
1292 }
1293 }
1294
1295 let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1296 operand_phy,
1297 when_then_phy,
1298 else_phy,
1299 )
1300 .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1301
1302 Ok(Arc::new(case_expr))
1303 }
1304
1305 fn compile_binary_op(
1306 &self,
1307 op: &BinaryOp,
1308 left: Arc<dyn PhysicalExpr>,
1309 right: Arc<dyn PhysicalExpr>,
1310 input_schema: &Schema,
1311 ) -> Result<Arc<dyn PhysicalExpr>> {
1312 use datafusion::logical_expr::Operator;
1313
1314 let string_op = match op {
1316 BinaryOp::StartsWith => Some(StringOp::StartsWith),
1317 BinaryOp::EndsWith => Some(StringOp::EndsWith),
1318 BinaryOp::Contains => Some(StringOp::Contains),
1319 _ => None,
1320 };
1321 if let Some(sop) = string_op {
1322 return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1323 }
1324
1325 let df_op = match op {
1326 BinaryOp::Add => Operator::Plus,
1327 BinaryOp::Sub => Operator::Minus,
1328 BinaryOp::Mul => Operator::Multiply,
1329 BinaryOp::Div => Operator::Divide,
1330 BinaryOp::Mod => Operator::Modulo,
1331 BinaryOp::Eq => Operator::Eq,
1332 BinaryOp::NotEq => Operator::NotEq,
1333 BinaryOp::Gt => Operator::Gt,
1334 BinaryOp::GtEq => Operator::GtEq,
1335 BinaryOp::Lt => Operator::Lt,
1336 BinaryOp::LtEq => Operator::LtEq,
1337 BinaryOp::And => Operator::And,
1338 BinaryOp::Or => Operator::Or,
1339 BinaryOp::Xor => {
1340 return Err(anyhow!(
1341 "XOR not supported via binary helper, use bitwise_xor"
1342 ));
1343 }
1344 BinaryOp::Regex => Operator::RegexMatch,
1345 BinaryOp::ApproxEq => {
1346 return Err(anyhow!(
1347 "ApproxEq (~=) not yet supported in physical compiler"
1348 ));
1349 }
1350 BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1351 _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1352 };
1353
1354 let mut left = left;
1358 let mut right = right;
1359 let left_type = left.data_type(input_schema).ok();
1360 let right_type = right.data_type(input_schema).ok();
1361
1362 let left_is_list = matches!(
1365 left_type.as_ref(),
1366 Some(DataType::List(_) | DataType::LargeList(_))
1367 );
1368 let right_is_list = matches!(
1369 right_type.as_ref(),
1370 Some(DataType::List(_) | DataType::LargeList(_))
1371 );
1372 let left_is_binary = matches!(left_type.as_ref(), Some(DataType::LargeBinary));
1373 let right_is_binary = matches!(right_type.as_ref(), Some(DataType::LargeBinary));
1374
1375 if left_is_list && right_is_binary {
1376 left = Arc::new(LargeListToCypherValueExpr::new(left));
1377 } else if right_is_list && left_is_binary {
1378 right = Arc::new(LargeListToCypherValueExpr::new(right));
1379 }
1380
1381 let left_type = left.data_type(input_schema).ok();
1383 let right_type = right.data_type(input_schema).ok();
1384 let has_cv =
1385 is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1386
1387 if has_cv {
1388 if let Some(result) = self.compile_cv_comparison(
1389 df_op,
1390 left.clone(),
1391 right.clone(),
1392 &left_type,
1393 &right_type,
1394 )? {
1395 return Ok(result);
1396 }
1397 if let Some(result) = self.compile_cv_list_concat(
1398 left.clone(),
1399 right.clone(),
1400 &left_type,
1401 &right_type,
1402 df_op,
1403 )? {
1404 return Ok(result);
1405 }
1406 if let Some(result) = self.compile_cv_arithmetic(
1407 df_op,
1408 left.clone(),
1409 right.clone(),
1410 &left_type,
1411 &right_type,
1412 input_schema,
1413 )? {
1414 return Ok(result);
1415 }
1416 }
1417
1418 binary(left, df_op, right, input_schema)
1420 .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1421 }
1422
1423 fn compile_cv_comparison(
1425 &self,
1426 df_op: datafusion::logical_expr::Operator,
1427 left: Arc<dyn PhysicalExpr>,
1428 right: Arc<dyn PhysicalExpr>,
1429 left_type: &Option<DataType>,
1430 right_type: &Option<DataType>,
1431 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1432 use datafusion::logical_expr::Operator;
1433
1434 let udf_name = match df_op {
1435 Operator::Eq => "_cypher_equal",
1436 Operator::NotEq => "_cypher_not_equal",
1437 Operator::Gt => "_cypher_gt",
1438 Operator::GtEq => "_cypher_gt_eq",
1439 Operator::Lt => "_cypher_lt",
1440 Operator::LtEq => "_cypher_lt_eq",
1441 _ => return Ok(None),
1442 };
1443
1444 self.plan_binary_udf(
1445 udf_name,
1446 left,
1447 right,
1448 left_type.clone().unwrap_or(DataType::LargeBinary),
1449 right_type.clone().unwrap_or(DataType::LargeBinary),
1450 )
1451 }
1452
1453 fn compile_cv_list_concat(
1455 &self,
1456 left: Arc<dyn PhysicalExpr>,
1457 right: Arc<dyn PhysicalExpr>,
1458 left_type: &Option<DataType>,
1459 right_type: &Option<DataType>,
1460 df_op: datafusion::logical_expr::Operator,
1461 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1462 use datafusion::logical_expr::Operator;
1463
1464 if df_op != Operator::Plus {
1465 return Ok(None);
1466 }
1467
1468 let is_list = |t: &Option<DataType>| {
1470 t.as_ref()
1471 .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1472 };
1473
1474 if !is_list(left_type) && !is_list(right_type) {
1475 return Ok(None);
1476 }
1477
1478 self.plan_binary_udf(
1479 "_cypher_list_concat",
1480 left,
1481 right,
1482 left_type.clone().unwrap_or(DataType::LargeBinary),
1483 right_type.clone().unwrap_or(DataType::LargeBinary),
1484 )
1485 }
1486
1487 fn compile_cv_arithmetic(
1492 &self,
1493 df_op: datafusion::logical_expr::Operator,
1494 left: Arc<dyn PhysicalExpr>,
1495 right: Arc<dyn PhysicalExpr>,
1496 left_type: &Option<DataType>,
1497 right_type: &Option<DataType>,
1498 _input_schema: &Schema,
1499 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1500 use datafusion::logical_expr::Operator;
1501
1502 let udf_name = match df_op {
1503 Operator::Plus => "_cypher_add",
1504 Operator::Minus => "_cypher_sub",
1505 Operator::Multiply => "_cypher_mul",
1506 Operator::Divide => "_cypher_div",
1507 Operator::Modulo => "_cypher_mod",
1508 _ => return Ok(None),
1509 };
1510
1511 self.plan_binary_udf(
1512 udf_name,
1513 left,
1514 right,
1515 left_type.clone().unwrap_or(DataType::LargeBinary),
1516 right_type.clone().unwrap_or(DataType::LargeBinary),
1517 )
1518 }
1519
1520 fn plan_udf_physical_expr(
1522 &self,
1523 udf_expr: &datafusion::logical_expr::Expr,
1524 operand_types: &[(&str, DataType)],
1525 children: Vec<Arc<dyn PhysicalExpr>>,
1526 error_context: &str,
1527 ) -> Result<Arc<dyn PhysicalExpr>> {
1528 let tmp_schema = Schema::new(
1529 operand_types
1530 .iter()
1531 .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1532 .collect::<Vec<_>>(),
1533 );
1534 let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1535 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1536 let udf_phy = planner
1537 .create_physical_expr(udf_expr, &df_schema, self.state)
1538 .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1539 udf_phy
1540 .with_new_children(children)
1541 .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1542 }
1543
1544 fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1548 let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1549 return Err(anyhow!("_cv_to_bool UDF not found"));
1550 };
1551
1552 let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1553 None::<String>,
1554 "__cv__",
1555 ));
1556 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1557 datafusion::logical_expr::expr::ScalarFunction {
1558 func: udf.clone(),
1559 args: vec![dummy_col],
1560 },
1561 );
1562
1563 self.plan_udf_physical_expr(
1564 &udf_expr,
1565 &[("__cv__", DataType::LargeBinary)],
1566 vec![expr],
1567 "CypherValue to bool",
1568 )
1569 }
1570
1571 fn plan_binary_udf(
1573 &self,
1574 udf_name: &str,
1575 left: Arc<dyn PhysicalExpr>,
1576 right: Arc<dyn PhysicalExpr>,
1577 left_type: DataType,
1578 right_type: DataType,
1579 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1580 let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1581 return Ok(None);
1582 };
1583 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1584 datafusion::logical_expr::expr::ScalarFunction {
1585 func: udf.clone(),
1586 args: vec![
1587 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1588 None::<String>,
1589 "__left__",
1590 )),
1591 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1592 None::<String>,
1593 "__right__",
1594 )),
1595 ],
1596 },
1597 );
1598 let result = self.plan_udf_physical_expr(
1599 &udf_expr,
1600 &[("__left__", left_type), ("__right__", right_type)],
1601 vec![left, right],
1602 udf_name,
1603 )?;
1604 Ok(Some(result))
1605 }
1606
1607 fn compile_unary_op(
1608 &self,
1609 op: &UnaryOp,
1610 expr: Arc<dyn PhysicalExpr>,
1611 input_schema: &Schema,
1612 ) -> Result<Arc<dyn PhysicalExpr>> {
1613 match op {
1614 UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1615 UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1616 }
1617 .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1618 }
1619}
1620
1621use datafusion::physical_plan::DisplayAs;
1622use datafusion::physical_plan::DisplayFormatType;
1623
1624#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1625enum StringOp {
1626 StartsWith,
1627 EndsWith,
1628 Contains,
1629}
1630
1631impl std::fmt::Display for StringOp {
1632 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1633 match self {
1634 StringOp::StartsWith => write!(f, "STARTS WITH"),
1635 StringOp::EndsWith => write!(f, "ENDS WITH"),
1636 StringOp::Contains => write!(f, "CONTAINS"),
1637 }
1638 }
1639}
1640
1641#[derive(Debug, Eq)]
1642struct CypherStringMatchExpr {
1643 left: Arc<dyn PhysicalExpr>,
1644 right: Arc<dyn PhysicalExpr>,
1645 op: StringOp,
1646}
1647
1648impl PartialEq for CypherStringMatchExpr {
1649 fn eq(&self, other: &Self) -> bool {
1650 self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1651 }
1652}
1653
1654impl std::hash::Hash for CypherStringMatchExpr {
1655 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1656 self.op.hash(state);
1657 self.left.hash(state);
1658 self.right.hash(state);
1659 }
1660}
1661
1662impl CypherStringMatchExpr {
1663 fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1664 Self { left, right, op }
1665 }
1666}
1667
1668impl std::fmt::Display for CypherStringMatchExpr {
1669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1670 write!(f, "{} {} {}", self.left, self.op, self.right)
1671 }
1672}
1673
1674impl DisplayAs for CypherStringMatchExpr {
1675 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1676 write!(f, "{}", self)
1677 }
1678}
1679
1680impl PhysicalExpr for CypherStringMatchExpr {
1681 fn as_any(&self) -> &dyn std::any::Any {
1682 self
1683 }
1684
1685 fn data_type(
1686 &self,
1687 _input_schema: &Schema,
1688 ) -> datafusion::error::Result<arrow_schema::DataType> {
1689 Ok(arrow_schema::DataType::Boolean)
1690 }
1691
1692 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1693 Ok(true)
1694 }
1695
1696 fn evaluate(
1697 &self,
1698 batch: &arrow_array::RecordBatch,
1699 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1700 use crate::query::df_udfs::invoke_cypher_string_op;
1701 use arrow_schema::Field;
1702 use datafusion::config::ConfigOptions;
1703 use datafusion::logical_expr::ScalarFunctionArgs;
1704
1705 let left_val = self.left.evaluate(batch)?;
1706 let right_val = self.right.evaluate(batch)?;
1707
1708 let args = ScalarFunctionArgs {
1709 args: vec![left_val, right_val],
1710 number_rows: batch.num_rows(),
1711 return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1712 config_options: Arc::new(ConfigOptions::default()),
1713 arg_fields: vec![], };
1715
1716 match self.op {
1717 StringOp::StartsWith => {
1718 invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1719 }
1720 StringOp::EndsWith => {
1721 invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1722 }
1723 StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1724 }
1725 }
1726
1727 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1728 vec![&self.left, &self.right]
1729 }
1730
1731 fn with_new_children(
1732 self: Arc<Self>,
1733 children: Vec<Arc<dyn PhysicalExpr>>,
1734 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1735 Ok(Arc::new(CypherStringMatchExpr::new(
1736 children[0].clone(),
1737 children[1].clone(),
1738 self.op,
1739 )))
1740 }
1741
1742 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1743 write!(f, "{}", self)
1744 }
1745}
1746
1747impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1748 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1749 if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1750 self == other
1751 } else {
1752 false
1753 }
1754 }
1755}
1756
1757#[derive(Debug, Eq)]
1762struct StructFieldAccessExpr {
1763 input: Arc<dyn PhysicalExpr>,
1765 field_idx: usize,
1767 output_type: arrow_schema::DataType,
1769}
1770
1771impl PartialEq for StructFieldAccessExpr {
1772 fn eq(&self, other: &Self) -> bool {
1773 self.field_idx == other.field_idx
1774 && self.input.eq(&other.input)
1775 && self.output_type == other.output_type
1776 }
1777}
1778
1779impl std::hash::Hash for StructFieldAccessExpr {
1780 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1781 self.input.hash(state);
1782 self.field_idx.hash(state);
1783 }
1784}
1785
1786impl StructFieldAccessExpr {
1787 fn new(
1788 input: Arc<dyn PhysicalExpr>,
1789 field_idx: usize,
1790 output_type: arrow_schema::DataType,
1791 ) -> Self {
1792 Self {
1793 input,
1794 field_idx,
1795 output_type,
1796 }
1797 }
1798}
1799
1800impl std::fmt::Display for StructFieldAccessExpr {
1801 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1802 write!(f, "{}[{}]", self.input, self.field_idx)
1803 }
1804}
1805
1806impl DisplayAs for StructFieldAccessExpr {
1807 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1808 write!(f, "{}", self)
1809 }
1810}
1811
1812impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1813 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1814 if let Some(other) = other.as_any().downcast_ref::<Self>() {
1815 self.field_idx == other.field_idx && self.input.eq(&other.input)
1816 } else {
1817 false
1818 }
1819 }
1820}
1821
1822impl PhysicalExpr for StructFieldAccessExpr {
1823 fn as_any(&self) -> &dyn std::any::Any {
1824 self
1825 }
1826
1827 fn data_type(
1828 &self,
1829 _input_schema: &Schema,
1830 ) -> datafusion::error::Result<arrow_schema::DataType> {
1831 Ok(self.output_type.clone())
1832 }
1833
1834 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1835 Ok(true)
1836 }
1837
1838 fn evaluate(
1839 &self,
1840 batch: &arrow_array::RecordBatch,
1841 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1842 use arrow_array::StructArray;
1843
1844 let input_val = self.input.evaluate(batch)?;
1845 let array = input_val.into_array(batch.num_rows())?;
1846
1847 let struct_array = array
1848 .as_any()
1849 .downcast_ref::<StructArray>()
1850 .ok_or_else(|| {
1851 datafusion::error::DataFusionError::Execution(
1852 "StructFieldAccessExpr: input is not a StructArray".to_string(),
1853 )
1854 })?;
1855
1856 let field_col = struct_array.column(self.field_idx).clone();
1857 Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
1858 }
1859
1860 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1861 vec![&self.input]
1862 }
1863
1864 fn with_new_children(
1865 self: Arc<Self>,
1866 children: Vec<Arc<dyn PhysicalExpr>>,
1867 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1868 Ok(Arc::new(StructFieldAccessExpr::new(
1869 children[0].clone(),
1870 self.field_idx,
1871 self.output_type.clone(),
1872 )))
1873 }
1874
1875 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1876 write!(f, "{}", self)
1877 }
1878}
1879
1880struct ExistsExecExpr {
1893 query: Query,
1894 graph_ctx: Arc<GraphExecutionContext>,
1895 session_ctx: Arc<RwLock<SessionContext>>,
1896 storage: Arc<StorageManager>,
1897 uni_schema: Arc<UniSchema>,
1898 params: HashMap<String, Value>,
1899}
1900
1901impl std::fmt::Debug for ExistsExecExpr {
1902 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1903 f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
1904 }
1905}
1906
1907impl ExistsExecExpr {
1908 fn new(
1909 query: Query,
1910 graph_ctx: Arc<GraphExecutionContext>,
1911 session_ctx: Arc<RwLock<SessionContext>>,
1912 storage: Arc<StorageManager>,
1913 uni_schema: Arc<UniSchema>,
1914 params: HashMap<String, Value>,
1915 ) -> Self {
1916 Self {
1917 query,
1918 graph_ctx,
1919 session_ctx,
1920 storage,
1921 uni_schema,
1922 params,
1923 }
1924 }
1925}
1926
1927impl std::fmt::Display for ExistsExecExpr {
1928 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1929 write!(f, "EXISTS(<subquery>)")
1930 }
1931}
1932
1933impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
1934 fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
1935 false
1936 }
1937}
1938
1939impl PartialEq for ExistsExecExpr {
1940 fn eq(&self, _other: &Self) -> bool {
1941 false
1942 }
1943}
1944
1945impl Eq for ExistsExecExpr {}
1946
1947impl std::hash::Hash for ExistsExecExpr {
1948 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1949 "ExistsExecExpr".hash(state);
1950 }
1951}
1952
1953impl DisplayAs for ExistsExecExpr {
1954 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1955 write!(f, "{}", self)
1956 }
1957}
1958
1959impl PhysicalExpr for ExistsExecExpr {
1960 fn as_any(&self) -> &dyn std::any::Any {
1961 self
1962 }
1963
1964 fn data_type(
1965 &self,
1966 _input_schema: &Schema,
1967 ) -> datafusion::error::Result<arrow_schema::DataType> {
1968 Ok(DataType::Boolean)
1969 }
1970
1971 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1972 Ok(true)
1973 }
1974
1975 #[allow(clippy::manual_try_fold)]
1976 fn evaluate(
1977 &self,
1978 batch: &arrow_array::RecordBatch,
1979 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1980 let num_rows = batch.num_rows();
1981 let mut builder = BooleanBuilder::with_capacity(num_rows);
1982
1983 let schema = batch.schema();
1994 let mut entity_vars: HashSet<String> = HashSet::new();
1995 for field in schema.fields() {
1996 let name = field.name();
1997 if let Some(base) = name.strip_suffix("._vid") {
1998 entity_vars.insert(base.to_string());
1999 }
2000 if matches!(field.data_type(), DataType::Struct(_)) {
2001 entity_vars.insert(name.to_string());
2002 }
2003 if !name.contains('.')
2007 && !name.starts_with('_')
2008 && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2009 {
2010 entity_vars.insert(name.to_string());
2011 }
2012 }
2013 let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2014
2015 let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2018
2019 let planner = QueryPlanner::new(self.uni_schema.clone());
2021 let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2022 Ok(plan) => plan,
2023 Err(e) => {
2024 return Err(datafusion::error::DataFusionError::Execution(format!(
2025 "EXISTS subquery planning failed: {}",
2026 e
2027 )));
2028 }
2029 };
2030
2031 let graph_ctx = self.graph_ctx.clone();
2034 let session_ctx = self.session_ctx.clone();
2035 let storage = self.storage.clone();
2036 let uni_schema = self.uni_schema.clone();
2037 let base_params = self.params.clone();
2038
2039 let result = std::thread::scope(|s| {
2040 s.spawn(|| {
2041 let rt = tokio::runtime::Builder::new_current_thread()
2042 .enable_all()
2043 .build()
2044 .map_err(|e| {
2045 datafusion::error::DataFusionError::Execution(format!(
2046 "Failed to create runtime for EXISTS: {}",
2047 e
2048 ))
2049 })?;
2050
2051 for row_idx in 0..num_rows {
2052 let row_params = extract_row_params(batch, row_idx);
2053 let mut sub_params = base_params.clone();
2054 sub_params.extend(row_params);
2055
2056 for var in &entity_vars {
2059 let vid_key = format!("{}._vid", var);
2060 if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2061 sub_params.insert(var.clone(), vid_val);
2062 }
2063 }
2064
2065 let batches = rt.block_on(execute_subplan(
2066 &logical_plan,
2067 &sub_params,
2068 &HashMap::new(), &graph_ctx,
2070 &session_ctx,
2071 &storage,
2072 &uni_schema,
2073 ))?;
2074
2075 let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2076 builder.append_value(has_rows);
2077 }
2078
2079 Ok::<_, datafusion::error::DataFusionError>(())
2080 })
2081 .join()
2082 .unwrap_or_else(|_| {
2083 Err(datafusion::error::DataFusionError::Execution(
2084 "EXISTS subquery thread panicked".to_string(),
2085 ))
2086 })
2087 });
2088
2089 if let Err(e) = result {
2090 return Err(datafusion::error::DataFusionError::Execution(format!(
2091 "EXISTS subquery execution failed: {}",
2092 e
2093 )));
2094 }
2095
2096 Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2097 builder.finish(),
2098 )))
2099 }
2100
2101 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2102 vec![]
2103 }
2104
2105 fn with_new_children(
2106 self: Arc<Self>,
2107 children: Vec<Arc<dyn PhysicalExpr>>,
2108 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2109 if !children.is_empty() {
2110 return Err(datafusion::error::DataFusionError::Plan(
2111 "ExistsExecExpr has no children".to_string(),
2112 ));
2113 }
2114 Ok(self)
2115 }
2116
2117 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2118 write!(f, "{}", self)
2119 }
2120}
2121
2122fn has_mutation_clause(query: &Query) -> bool {
2129 match query {
2130 Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2131 matches!(
2132 c,
2133 Clause::Create(_)
2134 | Clause::Delete(_)
2135 | Clause::Set(_)
2136 | Clause::Remove(_)
2137 | Clause::Merge(_)
2138 ) || has_mutation_in_clause_exprs(c)
2139 }),
2140 Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2141 _ => false,
2142 }
2143}
2144
2145fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2147 let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2148
2149 match clause {
2150 Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2151 Clause::With(w) => {
2152 w.where_clause.as_ref().is_some_and(check_expr)
2153 || w.items.iter().any(|item| match item {
2154 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2155 ReturnItem::All => false,
2156 })
2157 }
2158 Clause::Return(r) => r.items.iter().any(|item| match item {
2159 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2160 ReturnItem::All => false,
2161 }),
2162 _ => false,
2163 }
2164}
2165
2166fn has_mutation_in_expr(expr: &Expr) -> bool {
2168 match expr {
2169 Expr::Exists { query, .. } => has_mutation_clause(query),
2170 _ => {
2171 let mut found = false;
2172 expr.for_each_child(&mut |child| {
2173 if has_mutation_in_expr(child) {
2174 found = true;
2175 }
2176 });
2177 found
2178 }
2179 }
2180}
2181
2182fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2188 match query {
2189 Query::Single(stmt) => Query::Single(Statement {
2190 clauses: stmt
2191 .clauses
2192 .iter()
2193 .map(|c| rewrite_clause_correlated(c, outer_vars))
2194 .collect(),
2195 }),
2196 Query::Union { left, right, all } => Query::Union {
2197 left: Box::new(rewrite_query_correlated(left, outer_vars)),
2198 right: Box::new(rewrite_query_correlated(right, outer_vars)),
2199 all: *all,
2200 },
2201 other => other.clone(),
2202 }
2203}
2204
2205fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2207 match clause {
2208 Clause::Match(m) => Clause::Match(MatchClause {
2209 optional: m.optional,
2210 pattern: m.pattern.clone(),
2211 where_clause: m
2212 .where_clause
2213 .as_ref()
2214 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2215 }),
2216 Clause::With(w) => Clause::With(WithClause {
2217 distinct: w.distinct,
2218 items: w
2219 .items
2220 .iter()
2221 .map(|item| rewrite_return_item(item, outer_vars))
2222 .collect(),
2223 order_by: w.order_by.as_ref().map(|items| {
2224 items
2225 .iter()
2226 .map(|si| SortItem {
2227 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2228 ascending: si.ascending,
2229 })
2230 .collect()
2231 }),
2232 skip: w
2233 .skip
2234 .as_ref()
2235 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2236 limit: w
2237 .limit
2238 .as_ref()
2239 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2240 where_clause: w
2241 .where_clause
2242 .as_ref()
2243 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2244 }),
2245 Clause::Return(r) => Clause::Return(ReturnClause {
2246 distinct: r.distinct,
2247 items: r
2248 .items
2249 .iter()
2250 .map(|item| rewrite_return_item(item, outer_vars))
2251 .collect(),
2252 order_by: r.order_by.as_ref().map(|items| {
2253 items
2254 .iter()
2255 .map(|si| SortItem {
2256 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2257 ascending: si.ascending,
2258 })
2259 .collect()
2260 }),
2261 skip: r
2262 .skip
2263 .as_ref()
2264 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2265 limit: r
2266 .limit
2267 .as_ref()
2268 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2269 }),
2270 Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2271 expr: rewrite_expr_correlated(&u.expr, outer_vars),
2272 variable: u.variable.clone(),
2273 }),
2274 other => other.clone(),
2275 }
2276}
2277
2278fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2279 match item {
2280 ReturnItem::All => ReturnItem::All,
2281 ReturnItem::Expr {
2282 expr,
2283 alias,
2284 source_text,
2285 } => ReturnItem::Expr {
2286 expr: rewrite_expr_correlated(expr, outer_vars),
2287 alias: alias.clone(),
2288 source_text: source_text.clone(),
2289 },
2290 }
2291}
2292
2293fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2296 match expr {
2297 Expr::Property(base, key) => {
2299 if let Expr::Variable(v) = base.as_ref()
2300 && outer_vars.contains(v)
2301 {
2302 return Expr::Parameter(format!("{}.{}", v, key));
2303 }
2304 Expr::Property(
2305 Box::new(rewrite_expr_correlated(base, outer_vars)),
2306 key.clone(),
2307 )
2308 }
2309 Expr::Exists {
2311 query,
2312 from_pattern_predicate,
2313 } => Expr::Exists {
2314 query: Box::new(rewrite_query_correlated(query, outer_vars)),
2315 from_pattern_predicate: *from_pattern_predicate,
2316 },
2317 Expr::CountSubquery(query) => {
2319 Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2320 }
2321 Expr::CollectSubquery(query) => {
2322 Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2323 }
2324 other => other
2326 .clone()
2327 .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2328 }
2329}