1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan_with_outer_vars, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9 PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::pattern_exists::{
12 PatternExistsExecExpr, extract_pattern_from_exists_query, extract_target_property_predicates,
13};
14use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
15use crate::query::df_graph::reduce::ReduceExecExpr;
16use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
17use crate::query::planner::QueryPlanner;
18use crate::query::similar_to::SimilarToError;
19use anyhow::{Result, anyhow};
20use arrow_array::builder::BooleanBuilder;
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::execution::context::SessionState;
23use datafusion::physical_expr::expressions::binary;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_planner::PhysicalPlanner;
26use datafusion::prelude::SessionContext;
27use parking_lot::RwLock;
28use std::collections::{HashMap, HashSet};
29use std::sync::Arc;
30use uni_common::Value;
31use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
32use uni_cypher::ast::{
33 BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
34 Statement, UnaryOp, UnwindClause, WithClause,
35};
36use uni_store::storage::manager::StorageManager;
37
38fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
41 dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
42}
43
44fn resolve_list_element_type(
54 list_data_type: &DataType,
55 large_binary_fallback: DataType,
56 context: &str,
57) -> Result<DataType> {
58 match list_data_type {
59 DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
60 DataType::Null => Ok(DataType::Null),
61 DataType::LargeBinary => Ok(large_binary_fallback),
62 _ => Err(anyhow!(
63 "{} input must be a list, got {:?}",
64 context,
65 list_data_type
66 )),
67 }
68}
69
70#[derive(Debug)]
75struct LargeListToCypherValueExpr {
76 child: Arc<dyn PhysicalExpr>,
77}
78
79impl LargeListToCypherValueExpr {
80 fn new(child: Arc<dyn PhysicalExpr>) -> Self {
81 Self { child }
82 }
83}
84
85impl std::fmt::Display for LargeListToCypherValueExpr {
86 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
87 write!(f, "LargeListToCypherValue({})", self.child)
88 }
89}
90
91impl PartialEq for LargeListToCypherValueExpr {
92 fn eq(&self, other: &Self) -> bool {
93 Arc::ptr_eq(&self.child, &other.child)
94 }
95}
96
97impl Eq for LargeListToCypherValueExpr {}
98
99impl std::hash::Hash for LargeListToCypherValueExpr {
100 fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
101 std::any::type_name::<Self>().hash(_state);
103 }
104}
105
106impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
107 fn eq(&self, other: &dyn std::any::Any) -> bool {
108 other
109 .downcast_ref::<Self>()
110 .map(|x| self == x)
111 .unwrap_or(false)
112 }
113}
114
115impl PhysicalExpr for LargeListToCypherValueExpr {
116 fn as_any(&self) -> &dyn std::any::Any {
117 self
118 }
119
120 fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
121 Ok(DataType::LargeBinary)
122 }
123
124 fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
125 self.child.nullable(input_schema)
126 }
127
128 fn evaluate(
129 &self,
130 batch: &arrow_array::RecordBatch,
131 ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
132 use datafusion::arrow::compute::cast;
133 use datafusion::logical_expr::ColumnarValue;
134
135 let child_result = self.child.evaluate(batch)?;
136 let child_array = child_result.into_array(batch.num_rows())?;
137
138 let list_array = if let DataType::List(field) = child_array.data_type() {
140 let target_type = DataType::LargeList(field.clone());
141 cast(&child_array, &target_type).map_err(|e| {
142 datafusion::error::DataFusionError::Execution(format!(
143 "List to LargeList cast failed: {e}"
144 ))
145 })?
146 } else {
147 child_array.clone()
148 };
149
150 if list_array.data_type() == &DataType::LargeBinary {
152 return Ok(ColumnarValue::Array(list_array));
153 }
154
155 if let Some(large_list) = list_array
157 .as_any()
158 .downcast_ref::<datafusion::arrow::array::LargeListArray>()
159 {
160 let cv_array =
161 crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
162 Ok(ColumnarValue::Array(cv_array))
163 } else {
164 Err(datafusion::error::DataFusionError::Execution(format!(
165 "Expected List or LargeList, got {:?}",
166 list_array.data_type()
167 )))
168 }
169 }
170
171 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
172 vec![&self.child]
173 }
174
175 fn with_new_children(
176 self: Arc<Self>,
177 children: Vec<Arc<dyn PhysicalExpr>>,
178 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
179 if children.len() != 1 {
180 return Err(datafusion::error::DataFusionError::Execution(
181 "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
182 ));
183 }
184 Ok(Arc::new(LargeListToCypherValueExpr::new(
185 children[0].clone(),
186 )))
187 }
188
189 fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
190 write!(f, "LargeListToCypherValue({})", self.child)
191 }
192}
193
194pub struct CypherPhysicalExprCompiler<'a> {
196 state: &'a SessionState,
197 translation_ctx: Option<&'a TranslationContext>,
198 graph_ctx: Option<Arc<GraphExecutionContext>>,
199 uni_schema: Option<Arc<UniSchema>>,
200 session_ctx: Option<Arc<RwLock<SessionContext>>>,
202 storage: Option<Arc<StorageManager>>,
204 params: HashMap<String, Value>,
206 outer_entity_vars: HashSet<String>,
210}
211
212impl<'a> CypherPhysicalExprCompiler<'a> {
213 pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
214 Self {
215 state,
216 translation_ctx,
217 graph_ctx: None,
218 uni_schema: None,
219 session_ctx: None,
220 storage: None,
221 params: HashMap::new(),
222 outer_entity_vars: HashSet::new(),
223 }
224 }
225
226 fn scoped_compiler<'b>(
238 &'b self,
239 exclude_vars: &[&str],
240 scoped_ctx_slot: &'b mut Option<TranslationContext>,
241 ) -> CypherPhysicalExprCompiler<'b>
242 where
243 'a: 'b,
244 {
245 let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
246 exclude_vars
247 .iter()
248 .any(|v| ctx.variable_kinds.contains_key(*v))
249 });
250
251 let ctx_ref = if needs_scoping {
252 let ctx = self.translation_ctx.unwrap();
253 let mut new_kinds = ctx.variable_kinds.clone();
254 for v in exclude_vars {
255 new_kinds.remove(*v);
256 }
257 *scoped_ctx_slot = Some(TranslationContext {
258 parameters: ctx.parameters.clone(),
259 outer_values: ctx.outer_values.clone(),
260 variable_labels: ctx.variable_labels.clone(),
261 variable_kinds: new_kinds,
262 node_variable_hints: ctx.node_variable_hints.clone(),
263 mutation_edge_hints: ctx.mutation_edge_hints.clone(),
264 statement_time: ctx.statement_time,
265 });
266 scoped_ctx_slot.as_ref()
267 } else {
268 self.translation_ctx
269 };
270
271 CypherPhysicalExprCompiler {
272 state: self.state,
273 translation_ctx: ctx_ref,
274 graph_ctx: self.graph_ctx.clone(),
275 uni_schema: self.uni_schema.clone(),
276 session_ctx: self.session_ctx.clone(),
277 storage: self.storage.clone(),
278 params: self.params.clone(),
279 outer_entity_vars: self.outer_entity_vars.clone(),
280 }
281 }
282
283 pub fn with_graph_ctx(
285 mut self,
286 graph_ctx: Arc<GraphExecutionContext>,
287 uni_schema: Arc<UniSchema>,
288 ) -> Self {
289 self.graph_ctx = Some(graph_ctx);
290 self.uni_schema = Some(uni_schema);
291 self
292 }
293
294 pub fn with_subquery_ctx(
296 mut self,
297 graph_ctx: Arc<GraphExecutionContext>,
298 uni_schema: Arc<UniSchema>,
299 session_ctx: Arc<RwLock<SessionContext>>,
300 storage: Arc<StorageManager>,
301 params: HashMap<String, Value>,
302 outer_entity_vars: HashSet<String>,
303 ) -> Self {
304 self.graph_ctx = Some(graph_ctx);
305 self.uni_schema = Some(uni_schema);
306 self.session_ctx = Some(session_ctx);
307 self.storage = Some(storage);
308 self.params = params;
309 self.outer_entity_vars = outer_entity_vars;
310 self
311 }
312
313 pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
315 match expr {
316 Expr::ListComprehension {
317 variable,
318 list,
319 where_clause,
320 map_expr,
321 } => self.compile_list_comprehension(
322 variable,
323 list,
324 where_clause.as_deref(),
325 map_expr,
326 input_schema,
327 ),
328 Expr::Quantifier {
329 quantifier,
330 variable,
331 list,
332 predicate,
333 } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
334 Expr::Reduce {
335 accumulator,
336 init,
337 variable,
338 list,
339 expr: expression,
340 } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
341 Expr::BinaryOp { left, op, right } => {
343 self.compile_binary_op_dispatch(left, op, right, input_schema)
344 }
345 Expr::UnaryOp { op, expr: inner } => {
346 if matches!(op, UnaryOp::Not) {
347 let mut inner_phy = self.compile(inner, input_schema)?;
348 if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
349 inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
350 }
351 self.compile_unary_op(op, inner_phy, input_schema)
352 } else if Self::contains_custom_expr(inner) {
353 let inner_phy = self.compile(inner, input_schema)?;
354 self.compile_unary_op(op, inner_phy, input_schema)
355 } else {
356 self.compile_standard(expr, input_schema)
357 }
358 }
359 Expr::IsNull(inner) => {
360 if Self::contains_custom_expr(inner) {
361 let inner_phy = self.compile(inner, input_schema)?;
362 Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
363 .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
364 } else {
365 self.compile_standard(expr, input_schema)
366 }
367 }
368 Expr::IsNotNull(inner) => {
369 if Self::contains_custom_expr(inner) {
370 let inner_phy = self.compile(inner, input_schema)?;
371 Ok(
372 datafusion::physical_expr::expressions::is_not_null(inner_phy)
373 .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
374 )
375 } else {
376 self.compile_standard(expr, input_schema)
377 }
378 }
379 Expr::In {
381 expr: left,
382 list: right,
383 } => {
384 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
385 let left_phy = self.compile(left, input_schema)?;
386 let right_phy = self.compile(right, input_schema)?;
387
388 let left_type = left_phy
389 .data_type(input_schema)
390 .unwrap_or(DataType::LargeBinary);
391 let right_type = right_phy
392 .data_type(input_schema)
393 .unwrap_or(DataType::LargeBinary);
394
395 self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
396 .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
397 } else {
398 self.compile_standard(expr, input_schema)
399 }
400 }
401
402 Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
404 "List literals containing comprehensions not yet supported in compiler"
405 )),
406 Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
407 Err(anyhow!(
408 "Map literals containing comprehensions not yet supported in compiler"
409 ))
410 }
411
412 Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
414
415 Expr::ArrayIndex { array, index } => {
417 self.compile_array_index(array, index, input_schema)
418 }
419
420 Expr::PatternComprehension {
422 path_variable,
423 pattern,
424 where_clause,
425 map_expr,
426 } => self.compile_pattern_comprehension(
427 path_variable,
428 pattern,
429 where_clause.as_deref(),
430 map_expr,
431 input_schema,
432 ),
433
434 Expr::Exists {
436 query,
437 from_pattern_predicate,
438 } => {
439 if *from_pattern_predicate {
440 match self.compile_pattern_exists(query, input_schema) {
441 Ok(expr) => Ok(expr),
442 Err(e) => {
443 log::debug!(
444 "Pattern exists vectorization failed, falling back to subquery: {e}"
445 );
446 self.compile_exists(query)
447 }
448 }
449 } else {
450 self.compile_exists(query)
451 }
452 }
453
454 Expr::FunctionCall {
456 name,
457 args,
458 distinct,
459 ..
460 } => {
461 if name.eq_ignore_ascii_case("similar_to") {
462 return self.compile_similar_to(args, input_schema);
463 }
464 if args.iter().any(Self::contains_custom_expr) {
465 self.compile_function_with_custom_args(name, args, *distinct, input_schema)
466 } else {
467 self.compile_standard(expr, input_schema)
468 }
469 }
470
471 Expr::Case {
473 expr: case_operand,
474 when_then,
475 else_expr,
476 } => {
477 let has_custom = case_operand
479 .as_deref()
480 .is_some_and(Self::contains_custom_expr)
481 || when_then.iter().any(|(w, t)| {
482 Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
483 })
484 || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
485
486 if has_custom {
487 self.compile_case(case_operand, when_then, else_expr, input_schema)
490 } else {
491 self.compile_standard(expr, input_schema)
496 }
497 }
498
499 Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
501
502 _ => self.compile_standard(expr, input_schema),
504 }
505 }
506
507 fn compile_binary_op_dispatch(
509 &self,
510 left: &Expr,
511 op: &BinaryOp,
512 right: &Expr,
513 input_schema: &Schema,
514 ) -> Result<Arc<dyn PhysicalExpr>> {
515 if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
516 && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
517 && let Some(ctx) = self.translation_ctx
518 && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
519 {
520 let identity_prop = match (lk, rk) {
521 (VariableKind::Node, VariableKind::Node) => Some("_vid"),
522 (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
523 _ => None,
524 };
525
526 if let Some(id_prop) = identity_prop {
527 return self.compile_standard(
528 &Expr::BinaryOp {
529 left: Box::new(Expr::Property(
530 Box::new(Expr::Variable(lv.clone())),
531 id_prop.to_string(),
532 )),
533 op: *op,
534 right: Box::new(Expr::Property(
535 Box::new(Expr::Variable(rv.clone())),
536 id_prop.to_string(),
537 )),
538 },
539 input_schema,
540 );
541 }
542 }
543
544 if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
548 return self.compile_standard(
549 &Expr::BinaryOp {
550 left: Box::new(left.clone()),
551 op: *op,
552 right: Box::new(right.clone()),
553 },
554 input_schema,
555 );
556 }
557
558 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
559 let left_phy = self.compile(left, input_schema)?;
560 let right_phy = self.compile(right, input_schema)?;
561 return self.compile_binary_op(op, left_phy, right_phy, input_schema);
562 }
563
564 if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
568 {
569 return self.compile_standard(
570 &Expr::BinaryOp {
571 left: Box::new(left.clone()),
572 op: *op,
573 right: Box::new(right.clone()),
574 },
575 input_schema,
576 );
577 }
578
579 let left_phy = self.compile(left, input_schema)?;
584 let right_phy = self.compile(right, input_schema)?;
585 let left_dt = left_phy.data_type(input_schema).ok();
586 let right_dt = right_phy.data_type(input_schema).ok();
587 let has_cv =
588 is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
589
590 if has_cv {
591 self.compile_binary_op(op, left_phy, right_phy, input_schema)
593 } else {
594 self.compile_standard(
597 &Expr::BinaryOp {
598 left: Box::new(left.clone()),
599 op: *op,
600 right: Box::new(right.clone()),
601 },
602 input_schema,
603 )
604 }
605 }
606
607 fn try_compile_struct_field(
612 &self,
613 var_name: &str,
614 field_name: &str,
615 input_schema: &Schema,
616 ) -> Option<Arc<dyn PhysicalExpr>> {
617 let col_idx = input_schema.index_of(var_name).ok()?;
618 let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
619 return None;
620 };
621
622 if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
624 let output_type = struct_fields[field_idx].data_type().clone();
625 let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
626 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
627 );
628 Some(Arc::new(StructFieldAccessExpr::new(
629 col_expr,
630 field_idx,
631 output_type,
632 )))
633 } else {
634 Some(Arc::new(
635 datafusion::physical_expr::expressions::Literal::new(
636 datafusion::common::ScalarValue::Null,
637 ),
638 ))
639 }
640 }
641
642 fn compile_property_access(
644 &self,
645 base: &Expr,
646 prop: &str,
647 input_schema: &Schema,
648 ) -> Result<Arc<dyn PhysicalExpr>> {
649 if let Expr::Variable(var_name) = base {
650 if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
652 return Ok(expr);
653 }
654 let flat_col = format!("{}.{}", var_name, prop);
656 if let Ok(col_idx) = input_schema.index_of(&flat_col) {
657 return Ok(Arc::new(
658 datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
659 ));
660 }
661 }
662 self.compile_standard(
663 &Expr::Property(Box::new(base.clone()), prop.to_string()),
664 input_schema,
665 )
666 }
667
668 fn compile_array_index(
670 &self,
671 array: &Expr,
672 index: &Expr,
673 input_schema: &Schema,
674 ) -> Result<Arc<dyn PhysicalExpr>> {
675 if let Expr::Variable(var_name) = array
676 && let Expr::Literal(CypherLiteral::String(prop)) = index
677 && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
678 {
679 return Ok(expr);
680 }
681 self.compile_standard(
682 &Expr::ArrayIndex {
683 array: Box::new(array.clone()),
684 index: Box::new(index.clone()),
685 },
686 input_schema,
687 )
688 }
689
690 fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
692 if has_mutation_clause(query) {
694 return Err(anyhow!(
695 "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
696 ));
697 }
698
699 let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
700
701 let graph_ctx = self
702 .graph_ctx
703 .clone()
704 .ok_or_else(|| err("GraphExecutionContext"))?;
705 let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
706 let session_ctx = self
707 .session_ctx
708 .clone()
709 .ok_or_else(|| err("SessionContext"))?;
710 let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
711
712 Ok(Arc::new(ExistsExecExpr::new(
713 query.clone(),
714 graph_ctx,
715 session_ctx,
716 storage,
717 uni_schema,
718 self.params.clone(),
719 self.outer_entity_vars.clone(),
720 )))
721 }
722
723 fn compile_pattern_exists(
727 &self,
728 query: &Query,
729 input_schema: &Schema,
730 ) -> Result<Arc<dyn PhysicalExpr>> {
731 let pattern = extract_pattern_from_exists_query(query)?;
732
733 let graph_ctx = self
734 .graph_ctx
735 .as_ref()
736 .ok_or_else(|| anyhow!("Pattern exists requires GraphExecutionContext"))?;
737 let uni_schema = self
738 .uni_schema
739 .as_ref()
740 .ok_or_else(|| anyhow!("Pattern exists requires UniSchema"))?;
741
742 let (anchor_col, steps) = analyze_pattern(&pattern, input_schema, uni_schema)?;
743
744 let mut bound_target_columns: Vec<Option<String>> = Vec::with_capacity(steps.len());
749 for step in &steps {
750 if let Some(var) = &step.target_variable {
751 let vid_col = format!("{}._vid", var);
752 if input_schema.column_with_name(&vid_col).is_some() {
753 bound_target_columns.push(Some(vid_col));
754 } else if self.outer_entity_vars.contains(var) {
755 anyhow::bail!(
757 "Pattern target '{}' is a correlated reference from an outer scope",
758 var
759 );
760 } else {
761 bound_target_columns.push(None);
763 }
764 } else {
765 bound_target_columns.push(None);
766 }
767 }
768
769 let property_preds = extract_target_property_predicates(&pattern, &steps, uni_schema)?;
770
771 Ok(Arc::new(PatternExistsExecExpr::new(
772 graph_ctx.clone(),
773 anchor_col,
774 steps,
775 Arc::new(input_schema.clone()),
776 property_preds,
777 bound_target_columns,
778 self.params.clone(),
779 )))
780 }
781
782 fn compile_similar_to(
784 &self,
785 args: &[Expr],
786 input_schema: &Schema,
787 ) -> Result<Arc<dyn PhysicalExpr>> {
788 if args.len() < 2 || args.len() > 3 {
789 return Err(SimilarToError::InvalidArity { count: args.len() }.into());
790 }
791
792 let graph_ctx = self
793 .graph_ctx
794 .clone()
795 .ok_or(SimilarToError::NoGraphContext)?;
796
797 let source_variable = extract_source_variable(&args[0]);
799 let source_property_names = extract_property_names(&args[0]);
800
801 let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
804
805 let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
807 .iter()
808 .map(|e| self.compile(e, input_schema))
809 .collect::<Result<Vec<_>>>()?;
810 let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
811 .iter()
812 .map(|e| self.compile(e, input_schema))
813 .collect::<Result<Vec<_>>>()?;
814
815 let options_child = if args.len() == 3 {
817 Some(self.compile(&args[2], input_schema)?)
818 } else {
819 None
820 };
821
822 let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
824 .iter()
825 .map(|prop_name| {
826 prop_name.as_ref().and_then(|prop| {
827 self.uni_schema
828 .as_ref()
829 .and_then(|schema| resolve_metric_for_property(schema, prop))
830 })
831 })
832 .collect();
833
834 Ok(Arc::new(SimilarToExecExpr::new(
835 source_children,
836 query_children,
837 options_child,
838 graph_ctx,
839 source_variable,
840 source_property_names,
841 source_metrics,
842 )))
843 }
844
845 fn needs_vid_extraction_for_variable(
847 variable: &str,
848 map_expr: &Expr,
849 where_clause: Option<&Expr>,
850 ) -> bool {
851 fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
852 match expr {
853 Expr::PatternComprehension { pattern, .. } => {
854 pattern.paths.iter().any(|path| {
856 path.elements.iter().any(|elem| match elem {
857 uni_cypher::ast::PatternElement::Node(n) => {
858 n.variable.as_deref() == Some(var)
859 }
860 uni_cypher::ast::PatternElement::Relationship(r) => {
861 r.variable.as_deref() == Some(var)
862 }
863 _ => false,
864 })
865 })
866 }
867 Expr::FunctionCall { args, .. } => args
868 .iter()
869 .any(|a| expr_has_pattern_comp_referencing(a, var)),
870 Expr::BinaryOp { left, right, .. } => {
871 expr_has_pattern_comp_referencing(left, var)
872 || expr_has_pattern_comp_referencing(right, var)
873 }
874 Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
875 expr_has_pattern_comp_referencing(e, var)
876 }
877 Expr::List(items) => items
878 .iter()
879 .any(|i| expr_has_pattern_comp_referencing(i, var)),
880 Expr::ListComprehension {
881 list,
882 map_expr,
883 where_clause,
884 ..
885 } => {
886 expr_has_pattern_comp_referencing(list, var)
887 || expr_has_pattern_comp_referencing(map_expr, var)
888 || where_clause
889 .as_ref()
890 .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
891 }
892 _ => false,
893 }
894 }
895
896 expr_has_pattern_comp_referencing(map_expr, variable)
897 || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
898 }
899
900 pub fn contains_custom_expr(expr: &Expr) -> bool {
902 match expr {
903 Expr::ListComprehension { .. } => true,
904 Expr::Quantifier { .. } => true,
905 Expr::Reduce { .. } => true,
906 Expr::PatternComprehension { .. } => true,
907 Expr::BinaryOp { left, right, .. } => {
908 Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
909 }
910 Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
911 Expr::FunctionCall { name, args, .. } => {
912 name.eq_ignore_ascii_case("similar_to")
913 || args.iter().any(Self::contains_custom_expr)
914 }
915 Expr::Case {
916 when_then,
917 else_expr,
918 ..
919 } => {
920 when_then
921 .iter()
922 .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
923 || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
924 }
925 Expr::List(items) => items.iter().any(Self::contains_custom_expr),
926 Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
927 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
928 Expr::In { expr: l, list: r } => {
929 Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
930 }
931 Expr::Exists { .. } => true,
932 Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
933 _ => false,
934 }
935 }
936
937 fn is_list_producing(expr: &Expr) -> bool {
940 match expr {
941 Expr::List(_) => true,
942 Expr::ListComprehension { .. } => true,
943 Expr::ArraySlice { .. } => true,
944 Expr::BinaryOp {
946 left,
947 op: BinaryOp::Add,
948 right,
949 } => Self::is_list_producing(left) || Self::is_list_producing(right),
950 Expr::FunctionCall { name, .. } => {
951 matches!(
953 name.as_str(),
954 "range"
955 | "tail"
956 | "reverse"
957 | "collect"
958 | "keys"
959 | "labels"
960 | "nodes"
961 | "relationships"
962 )
963 }
964 _ => false,
965 }
966 }
967
968 fn compile_standard(
969 &self,
970 expr: &Expr,
971 input_schema: &Schema,
972 ) -> Result<Arc<dyn PhysicalExpr>> {
973 let resolved = Self::resolve_flat_column_properties(expr, input_schema);
978 let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
979 let resolved_expr = self.resolve_udfs(df_expr)?;
980
981 let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
982
983 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
985
986 let coerced_expr = self.resolve_udfs(coerced_expr)?;
988
989 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
990 planner
991 .create_physical_expr(&coerced_expr, &df_schema, self.state)
992 .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
993 }
994
995 fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
1001 match expr {
1002 Expr::Property(base, prop) => {
1003 if let Expr::Variable(var) = base.as_ref() {
1004 let flat_col = format!("{}.{}", var, prop);
1005 if schema.index_of(&flat_col).is_ok() {
1006 return Expr::Variable(flat_col);
1007 }
1008 }
1009 Expr::Property(
1011 Box::new(Self::resolve_flat_column_properties(base, schema)),
1012 prop.clone(),
1013 )
1014 }
1015 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
1016 left: Box::new(Self::resolve_flat_column_properties(left, schema)),
1017 op: *op,
1018 right: Box::new(Self::resolve_flat_column_properties(right, schema)),
1019 },
1020 Expr::FunctionCall {
1021 name,
1022 args,
1023 distinct,
1024 window_spec,
1025 } => Expr::FunctionCall {
1026 name: name.clone(),
1027 args: args
1028 .iter()
1029 .map(|a| Self::resolve_flat_column_properties(a, schema))
1030 .collect(),
1031 distinct: *distinct,
1032 window_spec: window_spec.clone(),
1033 },
1034 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
1035 op: *op,
1036 expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
1037 },
1038 Expr::List(items) => Expr::List(
1039 items
1040 .iter()
1041 .map(|i| Self::resolve_flat_column_properties(i, schema))
1042 .collect(),
1043 ),
1044 other => other.clone(),
1046 }
1047 }
1048
1049 fn resolve_udfs(
1054 &self,
1055 expr: datafusion::logical_expr::Expr,
1056 ) -> Result<datafusion::logical_expr::Expr> {
1057 use datafusion::common::tree_node::{Transformed, TreeNode};
1058 use datafusion::logical_expr::Expr as DfExpr;
1059
1060 let result = expr
1061 .transform_up(|node| {
1062 if let DfExpr::ScalarFunction(ref func) = node {
1063 let udf_name = func.func.name();
1064 if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
1065 return Ok(Transformed::yes(DfExpr::ScalarFunction(
1066 datafusion::logical_expr::expr::ScalarFunction {
1067 func: registered_udf.clone(),
1068 args: func.args.clone(),
1069 },
1070 )));
1071 }
1072 }
1073 Ok(Transformed::no(node))
1074 })
1075 .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
1076 Ok(result.data)
1077 }
1078
1079 fn compile_list_comprehension(
1080 &self,
1081 variable: &str,
1082 list: &Expr,
1083 where_clause: Option<&Expr>,
1084 map_expr: &Expr,
1085 input_schema: &Schema,
1086 ) -> Result<Arc<dyn PhysicalExpr>> {
1087 let input_list_phy = self.compile(list, input_schema)?;
1088
1089 let list_data_type = input_list_phy.data_type(input_schema)?;
1091 let inner_data_type = resolve_list_element_type(
1092 &list_data_type,
1093 DataType::LargeBinary,
1094 "List comprehension",
1095 )?;
1096
1097 let mut fields = input_schema.fields().to_vec();
1099 let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1100
1101 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1102 fields[pos] = loop_var_field;
1103 } else {
1104 fields.push(loop_var_field);
1105 }
1106
1107 let needs_vid_extraction =
1109 Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1110 if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1111 let vid_field = Arc::new(Field::new(
1113 format!("{}._vid", variable),
1114 DataType::UInt64,
1115 true,
1116 ));
1117 fields.push(vid_field);
1118 }
1119
1120 let inner_schema = Arc::new(Schema::new(fields));
1121
1122 let mut scoped_ctx = None;
1124 let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1125
1126 let predicate_phy = if let Some(pred) = where_clause {
1127 Some(inner_compiler.compile(pred, &inner_schema)?)
1128 } else {
1129 None
1130 };
1131
1132 let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1133 let output_item_type = map_phy.data_type(&inner_schema)?;
1134
1135 Ok(Arc::new(ListComprehensionExecExpr::new(
1136 input_list_phy,
1137 map_phy,
1138 predicate_phy,
1139 variable.to_string(),
1140 Arc::new(input_schema.clone()),
1141 output_item_type,
1142 needs_vid_extraction,
1143 )))
1144 }
1145
1146 fn compile_reduce(
1147 &self,
1148 accumulator: &str,
1149 initial: &Expr,
1150 variable: &str,
1151 list: &Expr,
1152 reduce_expr: &Expr,
1153 input_schema: &Schema,
1154 ) -> Result<Arc<dyn PhysicalExpr>> {
1155 let list_phy = self.compile(list, input_schema)?;
1156
1157 let initial_phy = self.compile(initial, input_schema)?;
1158 let acc_type = initial_phy.data_type(input_schema)?;
1159
1160 let list_data_type = list_phy.data_type(input_schema)?;
1161 let inner_data_type =
1164 resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1165
1166 let mut fields = input_schema.fields().to_vec();
1168
1169 let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1170 if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1171 fields[pos] = acc_field;
1172 } else {
1173 fields.push(acc_field);
1174 }
1175
1176 let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1177 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1178 fields[pos] = var_field;
1179 } else {
1180 fields.push(var_field);
1181 }
1182
1183 let inner_schema = Arc::new(Schema::new(fields));
1184
1185 let mut scoped_ctx = None;
1187 let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1188
1189 let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1190 let output_type = reduce_phy.data_type(&inner_schema)?;
1191
1192 Ok(Arc::new(ReduceExecExpr::new(
1193 accumulator.to_string(),
1194 initial_phy,
1195 variable.to_string(),
1196 list_phy,
1197 reduce_phy,
1198 Arc::new(input_schema.clone()),
1199 output_type,
1200 )))
1201 }
1202
1203 fn compile_quantifier(
1204 &self,
1205 quantifier: &uni_cypher::ast::Quantifier,
1206 variable: &str,
1207 list: &Expr,
1208 predicate: &Expr,
1209 input_schema: &Schema,
1210 ) -> Result<Arc<dyn PhysicalExpr>> {
1211 let input_list_phy = self.compile(list, input_schema)?;
1212
1213 let list_data_type = input_list_phy.data_type(input_schema)?;
1215 let inner_data_type =
1216 resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1217
1218 let mut fields = input_schema.fields().to_vec();
1222 let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1223
1224 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1226 fields[pos] = loop_var_field;
1227 } else {
1228 fields.push(loop_var_field);
1229 }
1230
1231 let inner_schema = Arc::new(Schema::new(fields));
1232
1233 let mut scoped_ctx = None;
1237 let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1238
1239 let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1240
1241 if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1243 predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1244 }
1245
1246 let qt = match quantifier {
1247 uni_cypher::ast::Quantifier::All => QuantifierType::All,
1248 uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1249 uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1250 uni_cypher::ast::Quantifier::None => QuantifierType::None,
1251 };
1252
1253 Ok(Arc::new(QuantifierExecExpr::new(
1254 input_list_phy,
1255 predicate_phy,
1256 variable.to_string(),
1257 Arc::new(input_schema.clone()),
1258 qt,
1259 )))
1260 }
1261
1262 fn compile_pattern_comprehension(
1263 &self,
1264 path_variable: &Option<String>,
1265 pattern: &uni_cypher::ast::Pattern,
1266 where_clause: Option<&Expr>,
1267 map_expr: &Expr,
1268 input_schema: &Schema,
1269 ) -> Result<Arc<dyn PhysicalExpr>> {
1270 let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1271
1272 let graph_ctx = self
1273 .graph_ctx
1274 .as_ref()
1275 .ok_or_else(|| err("GraphExecutionContext"))?;
1276 let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1277
1278 let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1280
1281 let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1283
1284 let inner_schema = build_inner_schema(
1286 input_schema,
1287 &steps,
1288 &vertex_props,
1289 &edge_props,
1290 path_variable.as_deref(),
1291 );
1292
1293 let pred_phy = where_clause
1295 .map(|p| self.compile(p, &inner_schema))
1296 .transpose()?;
1297 let map_phy = self.compile(map_expr, &inner_schema)?;
1298 let output_type = map_phy.data_type(&inner_schema)?;
1299
1300 Ok(Arc::new(PatternComprehensionExecExpr::new(
1302 graph_ctx.clone(),
1303 anchor_col,
1304 steps,
1305 path_variable.clone(),
1306 pred_phy,
1307 map_phy,
1308 Arc::new(input_schema.clone()),
1309 Arc::new(inner_schema),
1310 output_type,
1311 vertex_props,
1312 edge_props,
1313 )))
1314 }
1315
1316 fn compile_function_with_custom_args(
1322 &self,
1323 name: &str,
1324 args: &[Expr],
1325 _distinct: bool,
1326 input_schema: &Schema,
1327 ) -> Result<Arc<dyn PhysicalExpr>> {
1328 let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1330 .iter()
1331 .map(|arg| self.compile(arg, input_schema))
1332 .collect::<Result<Vec<_>>>()?;
1333
1334 let udf_name = Self::cypher_fn_to_udf(name);
1336 let udf = self
1337 .state
1338 .scalar_functions()
1339 .get(udf_name.as_str())
1340 .ok_or_else(|| {
1341 anyhow!(
1342 "UDF '{}' not found in registry for function '{}'",
1343 udf_name,
1344 name
1345 )
1346 })?;
1347
1348 let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1350 let operand_types: Vec<(&str, DataType)> = compiled_args
1351 .iter()
1352 .enumerate()
1353 .map(|(i, arg)| {
1354 let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1355 let placeholder = placeholders[i.min(3)];
1356 (placeholder, dt)
1357 })
1358 .collect();
1359
1360 let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1361 .iter()
1362 .map(|(name, _)| {
1363 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1364 None::<String>,
1365 *name,
1366 ))
1367 })
1368 .collect();
1369
1370 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1371 datafusion::logical_expr::expr::ScalarFunction {
1372 func: udf.clone(),
1373 args: dummy_cols,
1374 },
1375 );
1376
1377 self.plan_udf_physical_expr(
1379 &udf_expr,
1380 &operand_types,
1381 compiled_args,
1382 &format!("function {}", name),
1383 )
1384 }
1385
1386 fn cypher_fn_to_udf(name: &str) -> String {
1391 match name.to_uppercase().as_str() {
1392 "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1393 "REVERSE" => "_cypher_reverse".to_string(),
1394 "TOSTRING" => "tostring".to_string(),
1395 "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1396 "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1397 "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1398 "HEAD" => "head".to_string(),
1399 "LAST" => "last".to_string(),
1400 "TAIL" => "tail".to_string(),
1401 "KEYS" => "keys".to_string(),
1402 "TYPE" => "type".to_string(),
1403 "PROPERTIES" => "properties".to_string(),
1404 "LABELS" => "labels".to_string(),
1405 "COALESCE" => "coalesce".to_string(),
1406 "ID" => "id".to_string(),
1407 _ => name.to_lowercase(),
1409 }
1410 }
1411
1412 fn compile_case(
1421 &self,
1422 operand: &Option<Box<Expr>>,
1423 when_then: &[(Expr, Expr)],
1424 else_expr: &Option<Box<Expr>>,
1425 input_schema: &Schema,
1426 ) -> Result<Arc<dyn PhysicalExpr>> {
1427 let operand_phy = operand
1428 .as_deref()
1429 .map(|e| self.compile(e, input_schema))
1430 .transpose()?;
1431
1432 let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1433 .iter()
1434 .map(|(w, t)| {
1435 let w_phy = self.compile(w, input_schema)?;
1436 let t_phy = self.compile(t, input_schema)?;
1437 Ok((w_phy, t_phy))
1438 })
1439 .collect::<Result<Vec<_>>>()?;
1440
1441 let mut else_phy = else_expr
1442 .as_deref()
1443 .map(|e| self.compile(e, input_schema))
1444 .transpose()?;
1445
1446 for (w_phy, _) in &mut when_then_phy {
1448 if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1449 *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1450 }
1451 }
1452
1453 let branch_types: Vec<DataType> = when_then_phy
1455 .iter()
1456 .map(|(_, t)| t.data_type(input_schema))
1457 .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1458 .filter_map(Result::ok)
1459 .collect();
1460
1461 let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1462 let has_list = branch_types
1463 .iter()
1464 .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1465
1466 if has_large_binary && has_list {
1468 for (_, t_phy) in &mut when_then_phy {
1469 if let Ok(dt) = t_phy.data_type(input_schema)
1470 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1471 {
1472 *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1473 }
1474 }
1475 if let Some(e_phy) = else_phy.take() {
1476 if let Ok(dt) = e_phy.data_type(input_schema)
1477 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1478 {
1479 else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1480 } else {
1481 else_phy = Some(e_phy);
1482 }
1483 }
1484 }
1485
1486 let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1487 operand_phy,
1488 when_then_phy,
1489 else_phy,
1490 )
1491 .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1492
1493 Ok(Arc::new(case_expr))
1494 }
1495
1496 fn compile_binary_op(
1497 &self,
1498 op: &BinaryOp,
1499 left: Arc<dyn PhysicalExpr>,
1500 right: Arc<dyn PhysicalExpr>,
1501 input_schema: &Schema,
1502 ) -> Result<Arc<dyn PhysicalExpr>> {
1503 use datafusion::logical_expr::Operator;
1504
1505 let string_op = match op {
1507 BinaryOp::StartsWith => Some(StringOp::StartsWith),
1508 BinaryOp::EndsWith => Some(StringOp::EndsWith),
1509 BinaryOp::Contains => Some(StringOp::Contains),
1510 _ => None,
1511 };
1512 if let Some(sop) = string_op {
1513 return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1514 }
1515
1516 let df_op = match op {
1517 BinaryOp::Add => Operator::Plus,
1518 BinaryOp::Sub => Operator::Minus,
1519 BinaryOp::Mul => Operator::Multiply,
1520 BinaryOp::Div => Operator::Divide,
1521 BinaryOp::Mod => Operator::Modulo,
1522 BinaryOp::Eq => Operator::Eq,
1523 BinaryOp::NotEq => Operator::NotEq,
1524 BinaryOp::Gt => Operator::Gt,
1525 BinaryOp::GtEq => Operator::GtEq,
1526 BinaryOp::Lt => Operator::Lt,
1527 BinaryOp::LtEq => Operator::LtEq,
1528 BinaryOp::And => Operator::And,
1529 BinaryOp::Or => Operator::Or,
1530 BinaryOp::Xor => {
1531 return Err(anyhow!(
1532 "XOR not supported via binary helper, use bitwise_xor"
1533 ));
1534 }
1535 BinaryOp::Regex => Operator::RegexMatch,
1536 BinaryOp::ApproxEq => {
1537 return Err(anyhow!(
1538 "ApproxEq (~=) not yet supported in physical compiler"
1539 ));
1540 }
1541 BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1542 _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1543 };
1544
1545 let mut left = left;
1549 let mut right = right;
1550 let mut left_type = left.data_type(input_schema).ok();
1551 let mut right_type = right.data_type(input_schema).ok();
1552
1553 let left_is_list = matches!(
1556 left_type.as_ref(),
1557 Some(DataType::List(_) | DataType::LargeList(_))
1558 );
1559 let right_is_list = matches!(
1560 right_type.as_ref(),
1561 Some(DataType::List(_) | DataType::LargeList(_))
1562 );
1563
1564 if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1565 left = Arc::new(LargeListToCypherValueExpr::new(left));
1566 left_type = Some(DataType::LargeBinary);
1567 } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1568 right = Arc::new(LargeListToCypherValueExpr::new(right));
1569 right_type = Some(DataType::LargeBinary);
1570 }
1571
1572 let has_cv =
1573 is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1574
1575 if has_cv {
1576 if let Some(result) = self.compile_cv_comparison(
1577 df_op,
1578 left.clone(),
1579 right.clone(),
1580 &left_type,
1581 &right_type,
1582 )? {
1583 return Ok(result);
1584 }
1585 if let Some(result) = self.compile_cv_list_concat(
1586 left.clone(),
1587 right.clone(),
1588 &left_type,
1589 &right_type,
1590 df_op,
1591 )? {
1592 return Ok(result);
1593 }
1594 if let Some(result) = self.compile_cv_arithmetic(
1595 df_op,
1596 left.clone(),
1597 right.clone(),
1598 &left_type,
1599 &right_type,
1600 input_schema,
1601 )? {
1602 return Ok(result);
1603 }
1604 }
1605
1606 binary(left, df_op, right, input_schema)
1608 .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1609 }
1610
1611 fn compile_cv_comparison(
1613 &self,
1614 df_op: datafusion::logical_expr::Operator,
1615 left: Arc<dyn PhysicalExpr>,
1616 right: Arc<dyn PhysicalExpr>,
1617 left_type: &Option<DataType>,
1618 right_type: &Option<DataType>,
1619 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1620 use datafusion::logical_expr::Operator;
1621
1622 let udf_name = match df_op {
1623 Operator::Eq => "_cypher_equal",
1624 Operator::NotEq => "_cypher_not_equal",
1625 Operator::Gt => "_cypher_gt",
1626 Operator::GtEq => "_cypher_gt_eq",
1627 Operator::Lt => "_cypher_lt",
1628 Operator::LtEq => "_cypher_lt_eq",
1629 _ => return Ok(None),
1630 };
1631
1632 self.plan_binary_udf(
1633 udf_name,
1634 left,
1635 right,
1636 left_type.clone().unwrap_or(DataType::LargeBinary),
1637 right_type.clone().unwrap_or(DataType::LargeBinary),
1638 )
1639 }
1640
1641 fn compile_cv_list_concat(
1643 &self,
1644 left: Arc<dyn PhysicalExpr>,
1645 right: Arc<dyn PhysicalExpr>,
1646 left_type: &Option<DataType>,
1647 right_type: &Option<DataType>,
1648 df_op: datafusion::logical_expr::Operator,
1649 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1650 use datafusion::logical_expr::Operator;
1651
1652 if df_op != Operator::Plus {
1653 return Ok(None);
1654 }
1655
1656 let is_list = |t: &Option<DataType>| {
1658 t.as_ref()
1659 .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1660 };
1661
1662 if !is_list(left_type) && !is_list(right_type) {
1663 return Ok(None);
1664 }
1665
1666 self.plan_binary_udf(
1667 "_cypher_list_concat",
1668 left,
1669 right,
1670 left_type.clone().unwrap_or(DataType::LargeBinary),
1671 right_type.clone().unwrap_or(DataType::LargeBinary),
1672 )
1673 }
1674
1675 fn compile_cv_arithmetic(
1680 &self,
1681 df_op: datafusion::logical_expr::Operator,
1682 left: Arc<dyn PhysicalExpr>,
1683 right: Arc<dyn PhysicalExpr>,
1684 left_type: &Option<DataType>,
1685 right_type: &Option<DataType>,
1686 _input_schema: &Schema,
1687 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1688 use datafusion::logical_expr::Operator;
1689
1690 let udf_name = match df_op {
1691 Operator::Plus => "_cypher_add",
1692 Operator::Minus => "_cypher_sub",
1693 Operator::Multiply => "_cypher_mul",
1694 Operator::Divide => "_cypher_div",
1695 Operator::Modulo => "_cypher_mod",
1696 _ => return Ok(None),
1697 };
1698
1699 self.plan_binary_udf(
1700 udf_name,
1701 left,
1702 right,
1703 left_type.clone().unwrap_or(DataType::LargeBinary),
1704 right_type.clone().unwrap_or(DataType::LargeBinary),
1705 )
1706 }
1707
1708 fn plan_udf_physical_expr(
1710 &self,
1711 udf_expr: &datafusion::logical_expr::Expr,
1712 operand_types: &[(&str, DataType)],
1713 children: Vec<Arc<dyn PhysicalExpr>>,
1714 error_context: &str,
1715 ) -> Result<Arc<dyn PhysicalExpr>> {
1716 let tmp_schema = Schema::new(
1717 operand_types
1718 .iter()
1719 .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1720 .collect::<Vec<_>>(),
1721 );
1722 let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1723 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1724 let udf_phy = planner
1725 .create_physical_expr(udf_expr, &df_schema, self.state)
1726 .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1727 udf_phy
1728 .with_new_children(children)
1729 .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1730 }
1731
1732 fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1736 let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1737 return Err(anyhow!("_cv_to_bool UDF not found"));
1738 };
1739
1740 let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1741 None::<String>,
1742 "__cv__",
1743 ));
1744 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1745 datafusion::logical_expr::expr::ScalarFunction {
1746 func: udf.clone(),
1747 args: vec![dummy_col],
1748 },
1749 );
1750
1751 self.plan_udf_physical_expr(
1752 &udf_expr,
1753 &[("__cv__", DataType::LargeBinary)],
1754 vec![expr],
1755 "CypherValue to bool",
1756 )
1757 }
1758
1759 fn plan_binary_udf(
1761 &self,
1762 udf_name: &str,
1763 left: Arc<dyn PhysicalExpr>,
1764 right: Arc<dyn PhysicalExpr>,
1765 left_type: DataType,
1766 right_type: DataType,
1767 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1768 let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1769 return Ok(None);
1770 };
1771 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1772 datafusion::logical_expr::expr::ScalarFunction {
1773 func: udf.clone(),
1774 args: vec![
1775 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1776 None::<String>,
1777 "__left__",
1778 )),
1779 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1780 None::<String>,
1781 "__right__",
1782 )),
1783 ],
1784 },
1785 );
1786 let result = self.plan_udf_physical_expr(
1787 &udf_expr,
1788 &[("__left__", left_type), ("__right__", right_type)],
1789 vec![left, right],
1790 udf_name,
1791 )?;
1792 Ok(Some(result))
1793 }
1794
1795 fn compile_unary_op(
1796 &self,
1797 op: &UnaryOp,
1798 expr: Arc<dyn PhysicalExpr>,
1799 input_schema: &Schema,
1800 ) -> Result<Arc<dyn PhysicalExpr>> {
1801 match op {
1802 UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1803 UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1804 }
1805 .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1806 }
1807}
1808
1809fn extract_source_variable(expr: &Expr) -> Option<String> {
1817 match expr {
1818 Expr::Property(inner, _) => extract_source_variable(inner),
1819 Expr::Variable(name) => Some(name.clone()),
1820 Expr::List(items) => items.first().and_then(extract_source_variable),
1821 _ => None,
1822 }
1823}
1824
1825fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1829 match expr {
1830 Expr::Property(_, prop) => vec![Some(prop.clone())],
1831 Expr::List(items) => items
1832 .iter()
1833 .map(|item| {
1834 if let Expr::Property(_, prop) = item {
1835 Some(prop.clone())
1836 } else {
1837 None
1838 }
1839 })
1840 .collect(),
1841 _ => vec![None],
1842 }
1843}
1844
1845fn normalize_similar_to_args<'e>(
1850 sources: &'e Expr,
1851 queries: &'e Expr,
1852) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1853 match (sources, queries) {
1854 (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1856 (src_items.iter().collect(), qry_items.iter().collect())
1857 }
1858 (Expr::List(src_items), _) if src_items.len() > 1 => {
1860 let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1861 (src_items.iter().collect(), queries_broadcast)
1862 }
1863 _ => (vec![sources], vec![queries]),
1865 }
1866}
1867
1868use datafusion::physical_plan::DisplayAs;
1869use datafusion::physical_plan::DisplayFormatType;
1870
1871#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1872enum StringOp {
1873 StartsWith,
1874 EndsWith,
1875 Contains,
1876}
1877
1878impl std::fmt::Display for StringOp {
1879 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1880 match self {
1881 StringOp::StartsWith => write!(f, "STARTS WITH"),
1882 StringOp::EndsWith => write!(f, "ENDS WITH"),
1883 StringOp::Contains => write!(f, "CONTAINS"),
1884 }
1885 }
1886}
1887
1888#[derive(Debug, Eq)]
1889struct CypherStringMatchExpr {
1890 left: Arc<dyn PhysicalExpr>,
1891 right: Arc<dyn PhysicalExpr>,
1892 op: StringOp,
1893}
1894
1895impl PartialEq for CypherStringMatchExpr {
1896 fn eq(&self, other: &Self) -> bool {
1897 self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1898 }
1899}
1900
1901impl std::hash::Hash for CypherStringMatchExpr {
1902 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1903 self.op.hash(state);
1904 self.left.hash(state);
1905 self.right.hash(state);
1906 }
1907}
1908
1909impl CypherStringMatchExpr {
1910 fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1911 Self { left, right, op }
1912 }
1913}
1914
1915impl std::fmt::Display for CypherStringMatchExpr {
1916 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1917 write!(f, "{} {} {}", self.left, self.op, self.right)
1918 }
1919}
1920
1921impl DisplayAs for CypherStringMatchExpr {
1922 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1923 write!(f, "{}", self)
1924 }
1925}
1926
1927impl PhysicalExpr for CypherStringMatchExpr {
1928 fn as_any(&self) -> &dyn std::any::Any {
1929 self
1930 }
1931
1932 fn data_type(
1933 &self,
1934 _input_schema: &Schema,
1935 ) -> datafusion::error::Result<arrow_schema::DataType> {
1936 Ok(arrow_schema::DataType::Boolean)
1937 }
1938
1939 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1940 Ok(true)
1941 }
1942
1943 fn evaluate(
1944 &self,
1945 batch: &arrow_array::RecordBatch,
1946 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1947 use crate::query::df_udfs::invoke_cypher_string_op;
1948 use arrow_schema::Field;
1949 use datafusion::config::ConfigOptions;
1950 use datafusion::logical_expr::ScalarFunctionArgs;
1951
1952 let left_val = self.left.evaluate(batch)?;
1953 let right_val = self.right.evaluate(batch)?;
1954
1955 let args = ScalarFunctionArgs {
1956 args: vec![left_val, right_val],
1957 number_rows: batch.num_rows(),
1958 return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1959 config_options: Arc::new(ConfigOptions::default()),
1960 arg_fields: vec![], };
1962
1963 match self.op {
1964 StringOp::StartsWith => {
1965 invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1966 }
1967 StringOp::EndsWith => {
1968 invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1969 }
1970 StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1971 }
1972 }
1973
1974 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1975 vec![&self.left, &self.right]
1976 }
1977
1978 fn with_new_children(
1979 self: Arc<Self>,
1980 children: Vec<Arc<dyn PhysicalExpr>>,
1981 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1982 Ok(Arc::new(CypherStringMatchExpr::new(
1983 children[0].clone(),
1984 children[1].clone(),
1985 self.op,
1986 )))
1987 }
1988
1989 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1990 write!(f, "{}", self)
1991 }
1992}
1993
1994impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1995 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1996 if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1997 self == other
1998 } else {
1999 false
2000 }
2001 }
2002}
2003
2004#[derive(Debug, Eq)]
2009struct StructFieldAccessExpr {
2010 input: Arc<dyn PhysicalExpr>,
2012 field_idx: usize,
2014 output_type: arrow_schema::DataType,
2016}
2017
2018impl PartialEq for StructFieldAccessExpr {
2019 fn eq(&self, other: &Self) -> bool {
2020 self.field_idx == other.field_idx
2021 && self.input.eq(&other.input)
2022 && self.output_type == other.output_type
2023 }
2024}
2025
2026impl std::hash::Hash for StructFieldAccessExpr {
2027 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2028 self.input.hash(state);
2029 self.field_idx.hash(state);
2030 }
2031}
2032
2033impl StructFieldAccessExpr {
2034 fn new(
2035 input: Arc<dyn PhysicalExpr>,
2036 field_idx: usize,
2037 output_type: arrow_schema::DataType,
2038 ) -> Self {
2039 Self {
2040 input,
2041 field_idx,
2042 output_type,
2043 }
2044 }
2045}
2046
2047impl std::fmt::Display for StructFieldAccessExpr {
2048 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2049 write!(f, "{}[{}]", self.input, self.field_idx)
2050 }
2051}
2052
2053impl DisplayAs for StructFieldAccessExpr {
2054 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2055 write!(f, "{}", self)
2056 }
2057}
2058
2059impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
2060 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2061 if let Some(other) = other.as_any().downcast_ref::<Self>() {
2062 self.field_idx == other.field_idx && self.input.eq(&other.input)
2063 } else {
2064 false
2065 }
2066 }
2067}
2068
2069impl PhysicalExpr for StructFieldAccessExpr {
2070 fn as_any(&self) -> &dyn std::any::Any {
2071 self
2072 }
2073
2074 fn data_type(
2075 &self,
2076 _input_schema: &Schema,
2077 ) -> datafusion::error::Result<arrow_schema::DataType> {
2078 Ok(self.output_type.clone())
2079 }
2080
2081 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2082 Ok(true)
2083 }
2084
2085 fn evaluate(
2086 &self,
2087 batch: &arrow_array::RecordBatch,
2088 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2089 use arrow_array::StructArray;
2090
2091 let input_val = self.input.evaluate(batch)?;
2092 let array = input_val.into_array(batch.num_rows())?;
2093
2094 let struct_array = array
2095 .as_any()
2096 .downcast_ref::<StructArray>()
2097 .ok_or_else(|| {
2098 datafusion::error::DataFusionError::Execution(
2099 "StructFieldAccessExpr: input is not a StructArray".to_string(),
2100 )
2101 })?;
2102
2103 let field_col = struct_array.column(self.field_idx).clone();
2104 Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2105 }
2106
2107 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2108 vec![&self.input]
2109 }
2110
2111 fn with_new_children(
2112 self: Arc<Self>,
2113 children: Vec<Arc<dyn PhysicalExpr>>,
2114 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2115 Ok(Arc::new(StructFieldAccessExpr::new(
2116 children[0].clone(),
2117 self.field_idx,
2118 self.output_type.clone(),
2119 )))
2120 }
2121
2122 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2123 write!(f, "{}", self)
2124 }
2125}
2126
2127struct ExistsExecExpr {
2140 query: Query,
2141 graph_ctx: Arc<GraphExecutionContext>,
2142 session_ctx: Arc<RwLock<SessionContext>>,
2143 storage: Arc<StorageManager>,
2144 uni_schema: Arc<UniSchema>,
2145 params: HashMap<String, Value>,
2146 outer_entity_vars: HashSet<String>,
2149}
2150
2151impl std::fmt::Debug for ExistsExecExpr {
2152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2153 f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2154 }
2155}
2156
2157impl ExistsExecExpr {
2158 fn new(
2159 query: Query,
2160 graph_ctx: Arc<GraphExecutionContext>,
2161 session_ctx: Arc<RwLock<SessionContext>>,
2162 storage: Arc<StorageManager>,
2163 uni_schema: Arc<UniSchema>,
2164 params: HashMap<String, Value>,
2165 outer_entity_vars: HashSet<String>,
2166 ) -> Self {
2167 Self {
2168 query,
2169 graph_ctx,
2170 session_ctx,
2171 storage,
2172 uni_schema,
2173 params,
2174 outer_entity_vars,
2175 }
2176 }
2177}
2178
2179impl std::fmt::Display for ExistsExecExpr {
2180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2181 write!(f, "EXISTS(<subquery>)")
2182 }
2183}
2184
2185impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2186 fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2187 false
2188 }
2189}
2190
2191impl PartialEq for ExistsExecExpr {
2192 fn eq(&self, _other: &Self) -> bool {
2193 false
2194 }
2195}
2196
2197impl Eq for ExistsExecExpr {}
2198
2199impl std::hash::Hash for ExistsExecExpr {
2200 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2201 "ExistsExecExpr".hash(state);
2202 }
2203}
2204
2205impl DisplayAs for ExistsExecExpr {
2206 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2207 write!(f, "{}", self)
2208 }
2209}
2210
2211impl PhysicalExpr for ExistsExecExpr {
2212 fn as_any(&self) -> &dyn std::any::Any {
2213 self
2214 }
2215
2216 fn data_type(
2217 &self,
2218 _input_schema: &Schema,
2219 ) -> datafusion::error::Result<arrow_schema::DataType> {
2220 Ok(DataType::Boolean)
2221 }
2222
2223 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2224 Ok(true)
2225 }
2226
2227 fn evaluate(
2228 &self,
2229 batch: &arrow_array::RecordBatch,
2230 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2231 let num_rows = batch.num_rows();
2232 let mut builder = BooleanBuilder::with_capacity(num_rows);
2233
2234 let schema = batch.schema();
2245 let mut entity_vars: HashSet<String> = HashSet::new();
2246 for field in schema.fields() {
2247 let name = field.name();
2248 if let Some(base) = name.strip_suffix("._vid") {
2249 entity_vars.insert(base.to_string());
2250 }
2251 if matches!(field.data_type(), DataType::Struct(_)) {
2252 entity_vars.insert(name.to_string());
2253 }
2254 if !name.contains('.')
2258 && !name.starts_with('_')
2259 && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2260 {
2261 entity_vars.insert(name.to_string());
2262 }
2263 }
2264 let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2265
2266 let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2269
2270 let planner = QueryPlanner::new(self.uni_schema.clone());
2272 let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2273 Ok(plan) => plan,
2274 Err(e) => {
2275 return Err(datafusion::error::DataFusionError::Execution(format!(
2276 "EXISTS subquery planning failed: {}",
2277 e
2278 )));
2279 }
2280 };
2281
2282 let graph_ctx = self.graph_ctx.clone();
2285 let session_ctx = self.session_ctx.clone();
2286 let storage = self.storage.clone();
2287 let uni_schema = self.uni_schema.clone();
2288 let base_params = self.params.clone();
2289
2290 let result = std::thread::scope(|s| {
2291 s.spawn(|| {
2292 let rt = tokio::runtime::Builder::new_current_thread()
2293 .enable_all()
2294 .build()
2295 .map_err(|e| {
2296 datafusion::error::DataFusionError::Execution(format!(
2297 "Failed to create runtime for EXISTS: {}",
2298 e
2299 ))
2300 })?;
2301
2302 let mut combined_entity_vars = self.outer_entity_vars.clone();
2305 combined_entity_vars.extend(entity_vars.iter().cloned());
2306
2307 for row_idx in 0..num_rows {
2308 let row_params = extract_row_params(batch, row_idx);
2309 let mut sub_params = base_params.clone();
2310 sub_params.extend(row_params);
2311
2312 for var in &entity_vars {
2315 let vid_key = format!("{}._vid", var);
2316 if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2317 sub_params.insert(var.clone(), vid_val);
2318 }
2319 }
2320
2321 let batches = rt.block_on(execute_subplan_with_outer_vars(
2322 &logical_plan,
2323 &sub_params,
2324 &HashMap::new(), &graph_ctx,
2326 &session_ctx,
2327 &storage,
2328 &uni_schema,
2329 &combined_entity_vars,
2330 None, ))?;
2332
2333 let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2334 builder.append_value(has_rows);
2335 }
2336
2337 Ok::<_, datafusion::error::DataFusionError>(())
2338 })
2339 .join()
2340 .unwrap_or_else(|_| {
2341 Err(datafusion::error::DataFusionError::Execution(
2342 "EXISTS subquery thread panicked".to_string(),
2343 ))
2344 })
2345 });
2346
2347 if let Err(e) = result {
2348 return Err(datafusion::error::DataFusionError::Execution(format!(
2349 "EXISTS subquery execution failed: {}",
2350 e
2351 )));
2352 }
2353
2354 Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2355 builder.finish(),
2356 )))
2357 }
2358
2359 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2360 vec![]
2361 }
2362
2363 fn with_new_children(
2364 self: Arc<Self>,
2365 children: Vec<Arc<dyn PhysicalExpr>>,
2366 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2367 if !children.is_empty() {
2368 return Err(datafusion::error::DataFusionError::Plan(
2369 "ExistsExecExpr has no children".to_string(),
2370 ));
2371 }
2372 Ok(self)
2373 }
2374
2375 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2376 write!(f, "{}", self)
2377 }
2378}
2379
2380fn has_mutation_clause(query: &Query) -> bool {
2387 match query {
2388 Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2389 matches!(
2390 c,
2391 Clause::Create(_)
2392 | Clause::Delete(_)
2393 | Clause::Set(_)
2394 | Clause::Remove(_)
2395 | Clause::Merge(_)
2396 ) || has_mutation_in_clause_exprs(c)
2397 }),
2398 Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2399 _ => false,
2400 }
2401}
2402
2403fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2405 let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2406
2407 match clause {
2408 Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2409 Clause::With(w) => {
2410 w.where_clause.as_ref().is_some_and(check_expr)
2411 || w.items.iter().any(|item| match item {
2412 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2413 ReturnItem::All => false,
2414 })
2415 }
2416 Clause::Return(r) => r.items.iter().any(|item| match item {
2417 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2418 ReturnItem::All => false,
2419 }),
2420 _ => false,
2421 }
2422}
2423
2424fn has_mutation_in_expr(expr: &Expr) -> bool {
2426 match expr {
2427 Expr::Exists { query, .. } => has_mutation_clause(query),
2428 _ => {
2429 let mut found = false;
2430 expr.for_each_child(&mut |child| {
2431 if has_mutation_in_expr(child) {
2432 found = true;
2433 }
2434 });
2435 found
2436 }
2437 }
2438}
2439
2440fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2446 match query {
2447 Query::Single(stmt) => Query::Single(Statement {
2448 clauses: stmt
2449 .clauses
2450 .iter()
2451 .map(|c| rewrite_clause_correlated(c, outer_vars))
2452 .collect(),
2453 }),
2454 Query::Union { left, right, all } => Query::Union {
2455 left: Box::new(rewrite_query_correlated(left, outer_vars)),
2456 right: Box::new(rewrite_query_correlated(right, outer_vars)),
2457 all: *all,
2458 },
2459 other => other.clone(),
2460 }
2461}
2462
2463fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2465 match clause {
2466 Clause::Match(m) => Clause::Match(MatchClause {
2467 optional: m.optional,
2468 pattern: m.pattern.clone(),
2469 where_clause: m
2470 .where_clause
2471 .as_ref()
2472 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2473 for_update: m.for_update,
2474 }),
2475 Clause::With(w) => Clause::With(WithClause {
2476 distinct: w.distinct,
2477 items: w
2478 .items
2479 .iter()
2480 .map(|item| rewrite_return_item(item, outer_vars))
2481 .collect(),
2482 order_by: w.order_by.as_ref().map(|items| {
2483 items
2484 .iter()
2485 .map(|si| SortItem {
2486 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2487 ascending: si.ascending,
2488 })
2489 .collect()
2490 }),
2491 skip: w
2492 .skip
2493 .as_ref()
2494 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2495 limit: w
2496 .limit
2497 .as_ref()
2498 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2499 where_clause: w
2500 .where_clause
2501 .as_ref()
2502 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2503 }),
2504 Clause::Return(r) => Clause::Return(ReturnClause {
2505 distinct: r.distinct,
2506 items: r
2507 .items
2508 .iter()
2509 .map(|item| rewrite_return_item(item, outer_vars))
2510 .collect(),
2511 order_by: r.order_by.as_ref().map(|items| {
2512 items
2513 .iter()
2514 .map(|si| SortItem {
2515 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2516 ascending: si.ascending,
2517 })
2518 .collect()
2519 }),
2520 skip: r
2521 .skip
2522 .as_ref()
2523 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2524 limit: r
2525 .limit
2526 .as_ref()
2527 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2528 }),
2529 Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2530 expr: rewrite_expr_correlated(&u.expr, outer_vars),
2531 variable: u.variable.clone(),
2532 }),
2533 other => other.clone(),
2534 }
2535}
2536
2537fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2538 match item {
2539 ReturnItem::All => ReturnItem::All,
2540 ReturnItem::Expr {
2541 expr,
2542 alias,
2543 source_text,
2544 } => ReturnItem::Expr {
2545 expr: rewrite_expr_correlated(expr, outer_vars),
2546 alias: alias.clone(),
2547 source_text: source_text.clone(),
2548 },
2549 }
2550}
2551
2552fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2555 match expr {
2556 Expr::Property(base, key) => {
2558 if let Expr::Variable(v) = base.as_ref()
2559 && outer_vars.contains(v)
2560 {
2561 return Expr::Parameter(format!("{}.{}", v, key));
2562 }
2563 Expr::Property(
2564 Box::new(rewrite_expr_correlated(base, outer_vars)),
2565 key.clone(),
2566 )
2567 }
2568 Expr::Exists {
2570 query,
2571 from_pattern_predicate,
2572 } => Expr::Exists {
2573 query: Box::new(rewrite_query_correlated(query, outer_vars)),
2574 from_pattern_predicate: *from_pattern_predicate,
2575 },
2576 Expr::CountSubquery(query) => {
2578 Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2579 }
2580 Expr::CollectSubquery(query) => {
2581 Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2582 }
2583 other => other
2585 .clone()
2586 .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2587 }
2588}
2589
2590fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2594 for idx in &schema.indexes {
2595 if let IndexDefinition::Vector(config) = idx
2596 && config.property == property
2597 {
2598 return Some(config.metric.clone());
2599 }
2600 }
2601 None
2602}