1use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan_with_outer_vars, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9 PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::pattern_exists::{
12 PatternExistsExecExpr, extract_pattern_from_exists_query, extract_target_property_predicates,
13};
14use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
15use crate::query::df_graph::raw_bytes_marker;
16use crate::query::df_graph::reduce::ReduceExecExpr;
17use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
18use crate::query::planner::QueryPlanner;
19use crate::query::similar_to::SimilarToError;
20use anyhow::{Result, anyhow};
21use arrow_array::builder::BooleanBuilder;
22use arrow_schema::{DataType, Field, Schema};
23use datafusion::execution::context::SessionState;
24use datafusion::physical_expr::expressions::binary;
25use datafusion::physical_plan::PhysicalExpr;
26use datafusion::physical_planner::PhysicalPlanner;
27use datafusion::prelude::SessionContext;
28use parking_lot::RwLock;
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use uni_common::Value;
32use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
33use uni_cypher::ast::{
34 BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
35 Statement, UnaryOp, UnwindClause, WithClause,
36};
37use uni_store::storage::manager::StorageManager;
38
39fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
42 dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
43}
44
45fn resolve_list_element_type(
55 list_data_type: &DataType,
56 large_binary_fallback: DataType,
57 context: &str,
58) -> Result<DataType> {
59 match list_data_type {
60 DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
61 DataType::Null => Ok(DataType::Null),
62 DataType::LargeBinary => Ok(large_binary_fallback),
63 _ => Err(anyhow!(
64 "{} input must be a list, got {:?}",
65 context,
66 list_data_type
67 )),
68 }
69}
70
71#[derive(Debug)]
76struct LargeListToCypherValueExpr {
77 child: Arc<dyn PhysicalExpr>,
78}
79
80impl LargeListToCypherValueExpr {
81 fn new(child: Arc<dyn PhysicalExpr>) -> Self {
82 Self { child }
83 }
84}
85
86impl std::fmt::Display for LargeListToCypherValueExpr {
87 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
88 write!(f, "LargeListToCypherValue({})", self.child)
89 }
90}
91
92impl PartialEq for LargeListToCypherValueExpr {
93 fn eq(&self, other: &Self) -> bool {
94 Arc::ptr_eq(&self.child, &other.child)
95 }
96}
97
98impl Eq for LargeListToCypherValueExpr {}
99
100impl std::hash::Hash for LargeListToCypherValueExpr {
101 fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
102 std::any::type_name::<Self>().hash(_state);
104 }
105}
106
107impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
108 fn eq(&self, other: &dyn std::any::Any) -> bool {
109 other
110 .downcast_ref::<Self>()
111 .map(|x| self == x)
112 .unwrap_or(false)
113 }
114}
115
116impl PhysicalExpr for LargeListToCypherValueExpr {
117 fn as_any(&self) -> &dyn std::any::Any {
118 self
119 }
120
121 fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
122 Ok(DataType::LargeBinary)
123 }
124
125 fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
126 self.child.nullable(input_schema)
127 }
128
129 fn evaluate(
130 &self,
131 batch: &arrow_array::RecordBatch,
132 ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
133 use datafusion::arrow::compute::cast;
134 use datafusion::logical_expr::ColumnarValue;
135
136 let child_result = self.child.evaluate(batch)?;
137 let child_array = child_result.into_array(batch.num_rows())?;
138
139 let list_array = if let DataType::List(field) = child_array.data_type() {
141 let target_type = DataType::LargeList(field.clone());
142 cast(&child_array, &target_type).map_err(|e| {
143 datafusion::error::DataFusionError::Execution(format!(
144 "List to LargeList cast failed: {e}"
145 ))
146 })?
147 } else {
148 child_array.clone()
149 };
150
151 if list_array.data_type() == &DataType::LargeBinary {
153 return Ok(ColumnarValue::Array(list_array));
154 }
155
156 if let Some(large_list) = list_array
158 .as_any()
159 .downcast_ref::<datafusion::arrow::array::LargeListArray>()
160 {
161 let cv_array =
162 crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
163 Ok(ColumnarValue::Array(cv_array))
164 } else {
165 Err(datafusion::error::DataFusionError::Execution(format!(
166 "Expected List or LargeList, got {:?}",
167 list_array.data_type()
168 )))
169 }
170 }
171
172 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
173 vec![&self.child]
174 }
175
176 fn with_new_children(
177 self: Arc<Self>,
178 children: Vec<Arc<dyn PhysicalExpr>>,
179 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
180 if children.len() != 1 {
181 return Err(datafusion::error::DataFusionError::Execution(
182 "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
183 ));
184 }
185 Ok(Arc::new(LargeListToCypherValueExpr::new(
186 children[0].clone(),
187 )))
188 }
189
190 fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
191 write!(f, "LargeListToCypherValue({})", self.child)
192 }
193}
194
195pub struct CypherPhysicalExprCompiler<'a> {
197 state: &'a SessionState,
198 translation_ctx: Option<&'a TranslationContext>,
199 graph_ctx: Option<Arc<GraphExecutionContext>>,
200 uni_schema: Option<Arc<UniSchema>>,
201 session_ctx: Option<Arc<RwLock<SessionContext>>>,
203 storage: Option<Arc<StorageManager>>,
205 params: HashMap<String, Value>,
207 outer_entity_vars: HashSet<String>,
211}
212
213impl<'a> CypherPhysicalExprCompiler<'a> {
214 pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
215 Self {
216 state,
217 translation_ctx,
218 graph_ctx: None,
219 uni_schema: None,
220 session_ctx: None,
221 storage: None,
222 params: HashMap::new(),
223 outer_entity_vars: HashSet::new(),
224 }
225 }
226
227 fn scoped_compiler<'b>(
239 &'b self,
240 exclude_vars: &[&str],
241 scoped_ctx_slot: &'b mut Option<TranslationContext>,
242 ) -> CypherPhysicalExprCompiler<'b>
243 where
244 'a: 'b,
245 {
246 let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
247 exclude_vars
248 .iter()
249 .any(|v| ctx.variable_kinds.contains_key(*v))
250 });
251
252 let ctx_ref = if needs_scoping {
253 let ctx = self.translation_ctx.unwrap();
254 let mut new_kinds = ctx.variable_kinds.clone();
255 for v in exclude_vars {
256 new_kinds.remove(*v);
257 }
258 *scoped_ctx_slot = Some(TranslationContext {
259 parameters: ctx.parameters.clone(),
260 outer_values: ctx.outer_values.clone(),
261 variable_labels: ctx.variable_labels.clone(),
262 variable_kinds: new_kinds,
263 node_variable_hints: ctx.node_variable_hints.clone(),
264 mutation_edge_hints: ctx.mutation_edge_hints.clone(),
265 statement_time: ctx.statement_time,
266 });
267 scoped_ctx_slot.as_ref()
268 } else {
269 self.translation_ctx
270 };
271
272 CypherPhysicalExprCompiler {
273 state: self.state,
274 translation_ctx: ctx_ref,
275 graph_ctx: self.graph_ctx.clone(),
276 uni_schema: self.uni_schema.clone(),
277 session_ctx: self.session_ctx.clone(),
278 storage: self.storage.clone(),
279 params: self.params.clone(),
280 outer_entity_vars: self.outer_entity_vars.clone(),
281 }
282 }
283
284 pub fn with_graph_ctx(
286 mut self,
287 graph_ctx: Arc<GraphExecutionContext>,
288 uni_schema: Arc<UniSchema>,
289 ) -> Self {
290 self.graph_ctx = Some(graph_ctx);
291 self.uni_schema = Some(uni_schema);
292 self
293 }
294
295 pub fn with_subquery_ctx(
297 mut self,
298 graph_ctx: Arc<GraphExecutionContext>,
299 uni_schema: Arc<UniSchema>,
300 session_ctx: Arc<RwLock<SessionContext>>,
301 storage: Arc<StorageManager>,
302 params: HashMap<String, Value>,
303 outer_entity_vars: HashSet<String>,
304 ) -> Self {
305 self.graph_ctx = Some(graph_ctx);
306 self.uni_schema = Some(uni_schema);
307 self.session_ctx = Some(session_ctx);
308 self.storage = Some(storage);
309 self.params = params;
310 self.outer_entity_vars = outer_entity_vars;
311 self
312 }
313
314 pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
316 match expr {
317 Expr::ListComprehension {
318 variable,
319 list,
320 where_clause,
321 map_expr,
322 } => self.compile_list_comprehension(
323 variable,
324 list,
325 where_clause.as_deref(),
326 map_expr,
327 input_schema,
328 ),
329 Expr::Quantifier {
330 quantifier,
331 variable,
332 list,
333 predicate,
334 } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
335 Expr::Reduce {
336 accumulator,
337 init,
338 variable,
339 list,
340 expr: expression,
341 } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
342 Expr::BinaryOp { left, op, right } => {
344 self.compile_binary_op_dispatch(left, op, right, input_schema)
345 }
346 Expr::UnaryOp { op, expr: inner } => {
347 if matches!(op, UnaryOp::Not) {
348 let mut inner_phy = self.compile(inner, input_schema)?;
349 if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
350 inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
351 }
352 self.compile_unary_op(op, inner_phy, input_schema)
353 } else if Self::contains_custom_expr(inner) {
354 let inner_phy = self.compile(inner, input_schema)?;
355 self.compile_unary_op(op, inner_phy, input_schema)
356 } else {
357 self.compile_standard(expr, input_schema)
358 }
359 }
360 Expr::IsNull(inner) => {
361 if Self::contains_custom_expr(inner) {
362 let inner_phy = self.compile(inner, input_schema)?;
363 Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
364 .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
365 } else {
366 self.compile_standard(expr, input_schema)
367 }
368 }
369 Expr::IsNotNull(inner) => {
370 if Self::contains_custom_expr(inner) {
371 let inner_phy = self.compile(inner, input_schema)?;
372 Ok(
373 datafusion::physical_expr::expressions::is_not_null(inner_phy)
374 .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
375 )
376 } else {
377 self.compile_standard(expr, input_schema)
378 }
379 }
380 Expr::In {
382 expr: left,
383 list: right,
384 } => {
385 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
386 let left_phy = self.compile(left, input_schema)?;
387 let right_phy = self.compile(right, input_schema)?;
388
389 let left_type = left_phy
390 .data_type(input_schema)
391 .unwrap_or(DataType::LargeBinary);
392 let right_type = right_phy
393 .data_type(input_schema)
394 .unwrap_or(DataType::LargeBinary);
395
396 self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
397 .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
398 } else {
399 self.compile_standard(expr, input_schema)
400 }
401 }
402
403 Expr::List(_) if raw_bytes_marker::is_markable_list(expr, input_schema) => {
409 let inner = self.compile_standard(expr, input_schema)?;
410 Ok(Arc::new(raw_bytes_marker::RawBytesMarkerExpr::list_child(
411 inner,
412 )))
413 }
414
415 Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
417 "List literals containing comprehensions not yet supported in compiler"
418 )),
419 Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
420 Err(anyhow!(
421 "Map literals containing comprehensions not yet supported in compiler"
422 ))
423 }
424
425 Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
427
428 Expr::ArrayIndex { array, index } => {
430 self.compile_array_index(array, index, input_schema)
431 }
432
433 Expr::PatternComprehension {
435 path_variable,
436 pattern,
437 where_clause,
438 map_expr,
439 } => self.compile_pattern_comprehension(
440 path_variable,
441 pattern,
442 where_clause.as_deref(),
443 map_expr,
444 input_schema,
445 ),
446
447 Expr::Exists {
449 query,
450 from_pattern_predicate,
451 } => {
452 if *from_pattern_predicate {
453 match self.compile_pattern_exists(query, input_schema) {
454 Ok(expr) => Ok(expr),
455 Err(e) => {
456 log::debug!(
457 "Pattern exists vectorization failed, falling back to subquery: {e}"
458 );
459 self.compile_exists(query)
460 }
461 }
462 } else {
463 self.compile_exists(query)
464 }
465 }
466
467 Expr::FunctionCall { name, args, .. }
475 if name.eq_ignore_ascii_case("coalesce")
476 && args.len() > 1
477 && raw_bytes_marker::coalesce_needs_cv_unify(args, input_schema) =>
478 {
479 let cv_wrap = |a: &Expr| Expr::FunctionCall {
480 name: "_cypher_scalar_to_cv".to_string(),
481 args: vec![a.clone()],
482 distinct: false,
483 window_spec: None,
484 };
485 let (last, init) = args.split_last().unwrap();
486 let when_then = init
487 .iter()
488 .map(|a| (Expr::IsNotNull(Box::new(a.clone())), cv_wrap(a)))
489 .collect();
490 let case = Expr::Case {
491 expr: None,
492 when_then,
493 else_expr: Some(Box::new(cv_wrap(last))),
494 };
495 self.compile_standard(&case, input_schema)
496 }
497 Expr::FunctionCall {
498 name,
499 args,
500 distinct,
501 ..
502 } => {
503 if name.eq_ignore_ascii_case("similar_to") {
504 return self.compile_similar_to(args, input_schema);
505 }
506 if args.iter().any(Self::contains_custom_expr)
510 || args
511 .iter()
512 .any(|a| raw_bytes_marker::is_markable_list(a, input_schema))
513 {
514 self.compile_function_with_custom_args(name, args, *distinct, input_schema)
515 } else {
516 self.compile_standard(expr, input_schema)
517 }
518 }
519
520 Expr::Case {
522 expr: case_operand,
523 when_then,
524 else_expr,
525 } => {
526 let has_custom = case_operand
528 .as_deref()
529 .is_some_and(Self::contains_custom_expr)
530 || when_then.iter().any(|(w, t)| {
531 Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
532 })
533 || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
534
535 if has_custom {
536 self.compile_case(case_operand, when_then, else_expr, input_schema)
539 } else {
540 self.compile_standard(expr, input_schema)
545 }
546 }
547
548 Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
550
551 _ => self.compile_standard(expr, input_schema),
553 }
554 }
555
556 fn compile_binary_op_dispatch(
558 &self,
559 left: &Expr,
560 op: &BinaryOp,
561 right: &Expr,
562 input_schema: &Schema,
563 ) -> Result<Arc<dyn PhysicalExpr>> {
564 if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
565 && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
566 && let Some(ctx) = self.translation_ctx
567 && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
568 {
569 let identity_prop = match (lk, rk) {
570 (VariableKind::Node, VariableKind::Node) => Some("_vid"),
571 (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
572 _ => None,
573 };
574
575 if let Some(id_prop) = identity_prop {
576 return self.compile_standard(
577 &Expr::BinaryOp {
578 left: Box::new(Expr::Property(
579 Box::new(Expr::Variable(lv.clone())),
580 id_prop.to_string(),
581 )),
582 op: *op,
583 right: Box::new(Expr::Property(
584 Box::new(Expr::Variable(rv.clone())),
585 id_prop.to_string(),
586 )),
587 },
588 input_schema,
589 );
590 }
591 }
592
593 if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
597 return self.compile_standard(
598 &Expr::BinaryOp {
599 left: Box::new(left.clone()),
600 op: *op,
601 right: Box::new(right.clone()),
602 },
603 input_schema,
604 );
605 }
606
607 if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
608 let left_phy = self.compile(left, input_schema)?;
609 let right_phy = self.compile(right, input_schema)?;
610 return self.compile_binary_op(op, left_phy, right_phy, input_schema);
611 }
612
613 if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
617 {
618 return self.compile_standard(
619 &Expr::BinaryOp {
620 left: Box::new(left.clone()),
621 op: *op,
622 right: Box::new(right.clone()),
623 },
624 input_schema,
625 );
626 }
627
628 let left_phy = self.compile(left, input_schema)?;
633 let right_phy = self.compile(right, input_schema)?;
634 let left_dt = left_phy.data_type(input_schema).ok();
635 let right_dt = right_phy.data_type(input_schema).ok();
636 let has_cv =
637 is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
638
639 if has_cv {
640 self.compile_binary_op(op, left_phy, right_phy, input_schema)
642 } else {
643 self.compile_standard(
646 &Expr::BinaryOp {
647 left: Box::new(left.clone()),
648 op: *op,
649 right: Box::new(right.clone()),
650 },
651 input_schema,
652 )
653 }
654 }
655
656 fn try_compile_struct_field(
661 &self,
662 var_name: &str,
663 field_name: &str,
664 input_schema: &Schema,
665 ) -> Option<Arc<dyn PhysicalExpr>> {
666 let col_idx = input_schema.index_of(var_name).ok()?;
667 let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
668 return None;
669 };
670
671 if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
673 let output_type = struct_fields[field_idx].data_type().clone();
674 let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
675 datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
676 );
677 Some(Arc::new(StructFieldAccessExpr::new(
678 col_expr,
679 field_idx,
680 output_type,
681 )))
682 } else {
683 Some(Arc::new(
684 datafusion::physical_expr::expressions::Literal::new(
685 datafusion::common::ScalarValue::Null,
686 ),
687 ))
688 }
689 }
690
691 fn compile_property_access(
693 &self,
694 base: &Expr,
695 prop: &str,
696 input_schema: &Schema,
697 ) -> Result<Arc<dyn PhysicalExpr>> {
698 if let Expr::Variable(var_name) = base {
699 if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
701 return Ok(expr);
702 }
703 let flat_col = format!("{}.{}", var_name, prop);
705 if let Ok(col_idx) = input_schema.index_of(&flat_col) {
706 return Ok(Arc::new(
707 datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
708 ));
709 }
710 }
711 self.compile_standard(
712 &Expr::Property(Box::new(base.clone()), prop.to_string()),
713 input_schema,
714 )
715 }
716
717 fn compile_array_index(
719 &self,
720 array: &Expr,
721 index: &Expr,
722 input_schema: &Schema,
723 ) -> Result<Arc<dyn PhysicalExpr>> {
724 if let Expr::Variable(var_name) = array
725 && let Expr::Literal(CypherLiteral::String(prop)) = index
726 && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
727 {
728 return Ok(expr);
729 }
730 if raw_bytes_marker::is_markable_list(array, input_schema) {
734 return self.compile_function_with_custom_args(
735 "index",
736 &[array.clone(), index.clone()],
737 false,
738 input_schema,
739 );
740 }
741 self.compile_standard(
742 &Expr::ArrayIndex {
743 array: Box::new(array.clone()),
744 index: Box::new(index.clone()),
745 },
746 input_schema,
747 )
748 }
749
750 fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
752 if has_mutation_clause(query) {
754 return Err(anyhow!(
755 "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
756 ));
757 }
758
759 let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
760
761 let graph_ctx = self
762 .graph_ctx
763 .clone()
764 .ok_or_else(|| err("GraphExecutionContext"))?;
765 let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
766 let session_ctx = self
767 .session_ctx
768 .clone()
769 .ok_or_else(|| err("SessionContext"))?;
770 let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
771
772 Ok(Arc::new(ExistsExecExpr::new(
773 query.clone(),
774 graph_ctx,
775 session_ctx,
776 storage,
777 uni_schema,
778 self.params.clone(),
779 self.outer_entity_vars.clone(),
780 )))
781 }
782
783 fn compile_pattern_exists(
787 &self,
788 query: &Query,
789 input_schema: &Schema,
790 ) -> Result<Arc<dyn PhysicalExpr>> {
791 let pattern = extract_pattern_from_exists_query(query)?;
792
793 let graph_ctx = self
794 .graph_ctx
795 .as_ref()
796 .ok_or_else(|| anyhow!("Pattern exists requires GraphExecutionContext"))?;
797 let uni_schema = self
798 .uni_schema
799 .as_ref()
800 .ok_or_else(|| anyhow!("Pattern exists requires UniSchema"))?;
801
802 let (anchor_col, steps) = analyze_pattern(&pattern, input_schema, uni_schema)?;
803
804 let mut bound_target_columns: Vec<Option<String>> = Vec::with_capacity(steps.len());
809 for step in &steps {
810 if let Some(var) = &step.target_variable {
811 let vid_col = format!("{}._vid", var);
812 if input_schema.column_with_name(&vid_col).is_some() {
813 bound_target_columns.push(Some(vid_col));
814 } else if self.outer_entity_vars.contains(var) {
815 anyhow::bail!(
817 "Pattern target '{}' is a correlated reference from an outer scope",
818 var
819 );
820 } else {
821 bound_target_columns.push(None);
823 }
824 } else {
825 bound_target_columns.push(None);
826 }
827 }
828
829 let property_preds = extract_target_property_predicates(&pattern, &steps, uni_schema)?;
830
831 Ok(Arc::new(PatternExistsExecExpr::new(
832 graph_ctx.clone(),
833 anchor_col,
834 steps,
835 Arc::new(input_schema.clone()),
836 property_preds,
837 bound_target_columns,
838 self.params.clone(),
839 )))
840 }
841
842 fn compile_similar_to(
844 &self,
845 args: &[Expr],
846 input_schema: &Schema,
847 ) -> Result<Arc<dyn PhysicalExpr>> {
848 if args.len() < 2 || args.len() > 3 {
849 return Err(SimilarToError::InvalidArity { count: args.len() }.into());
850 }
851
852 let graph_ctx = self
853 .graph_ctx
854 .clone()
855 .ok_or(SimilarToError::NoGraphContext)?;
856
857 let source_variable = extract_source_variable(&args[0]);
859 let source_property_names = extract_property_names(&args[0]);
860
861 let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
864
865 let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
867 .iter()
868 .map(|e| self.compile(e, input_schema))
869 .collect::<Result<Vec<_>>>()?;
870 let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
871 .iter()
872 .map(|e| self.compile(e, input_schema))
873 .collect::<Result<Vec<_>>>()?;
874
875 let options_child = if args.len() == 3 {
877 Some(self.compile(&args[2], input_schema)?)
878 } else {
879 None
880 };
881
882 let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
884 .iter()
885 .map(|prop_name| {
886 prop_name.as_ref().and_then(|prop| {
887 self.uni_schema
888 .as_ref()
889 .and_then(|schema| resolve_metric_for_property(schema, prop))
890 })
891 })
892 .collect();
893
894 Ok(Arc::new(SimilarToExecExpr::new(
895 source_children,
896 query_children,
897 options_child,
898 graph_ctx,
899 source_variable,
900 source_property_names,
901 source_metrics,
902 )))
903 }
904
905 fn needs_vid_extraction_for_variable(
907 variable: &str,
908 map_expr: &Expr,
909 where_clause: Option<&Expr>,
910 ) -> bool {
911 fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
912 match expr {
913 Expr::PatternComprehension { pattern, .. } => {
914 pattern.paths.iter().any(|path| {
916 path.elements.iter().any(|elem| match elem {
917 uni_cypher::ast::PatternElement::Node(n) => {
918 n.variable.as_deref() == Some(var)
919 }
920 uni_cypher::ast::PatternElement::Relationship(r) => {
921 r.variable.as_deref() == Some(var)
922 }
923 _ => false,
924 })
925 })
926 }
927 Expr::FunctionCall { args, .. } => args
928 .iter()
929 .any(|a| expr_has_pattern_comp_referencing(a, var)),
930 Expr::BinaryOp { left, right, .. } => {
931 expr_has_pattern_comp_referencing(left, var)
932 || expr_has_pattern_comp_referencing(right, var)
933 }
934 Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
935 expr_has_pattern_comp_referencing(e, var)
936 }
937 Expr::List(items) => items
938 .iter()
939 .any(|i| expr_has_pattern_comp_referencing(i, var)),
940 Expr::ListComprehension {
941 list,
942 map_expr,
943 where_clause,
944 ..
945 } => {
946 expr_has_pattern_comp_referencing(list, var)
947 || expr_has_pattern_comp_referencing(map_expr, var)
948 || where_clause
949 .as_ref()
950 .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
951 }
952 _ => false,
953 }
954 }
955
956 expr_has_pattern_comp_referencing(map_expr, variable)
957 || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
958 }
959
960 pub fn contains_custom_expr(expr: &Expr) -> bool {
962 match expr {
963 Expr::ListComprehension { .. } => true,
964 Expr::Quantifier { .. } => true,
965 Expr::Reduce { .. } => true,
966 Expr::PatternComprehension { .. } => true,
967 Expr::BinaryOp { left, right, .. } => {
968 Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
969 }
970 Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
971 Expr::FunctionCall { name, args, .. } => {
972 name.eq_ignore_ascii_case("similar_to")
973 || args.iter().any(Self::contains_custom_expr)
974 }
975 Expr::Case {
976 when_then,
977 else_expr,
978 ..
979 } => {
980 when_then
981 .iter()
982 .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
983 || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
984 }
985 Expr::List(items) => items.iter().any(Self::contains_custom_expr),
986 Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
987 Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
988 Expr::In { expr: l, list: r } => {
989 Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
990 }
991 Expr::Exists { .. } => true,
992 Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
993 _ => false,
994 }
995 }
996
997 fn is_list_producing(expr: &Expr) -> bool {
1000 match expr {
1001 Expr::List(_) => true,
1002 Expr::ListComprehension { .. } => true,
1003 Expr::ArraySlice { .. } => true,
1004 Expr::BinaryOp {
1006 left,
1007 op: BinaryOp::Add,
1008 right,
1009 } => Self::is_list_producing(left) || Self::is_list_producing(right),
1010 Expr::FunctionCall { name, .. } => {
1011 matches!(
1013 name.as_str(),
1014 "range"
1015 | "tail"
1016 | "reverse"
1017 | "collect"
1018 | "keys"
1019 | "labels"
1020 | "nodes"
1021 | "relationships"
1022 )
1023 }
1024 _ => false,
1025 }
1026 }
1027
1028 fn compile_standard(
1029 &self,
1030 expr: &Expr,
1031 input_schema: &Schema,
1032 ) -> Result<Arc<dyn PhysicalExpr>> {
1033 let resolved = Self::resolve_flat_column_properties(expr, input_schema);
1038 let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
1039 let resolved_expr = self.resolve_udfs(df_expr)?;
1040
1041 let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
1042
1043 let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
1045
1046 let coerced_expr = self.resolve_udfs(coerced_expr)?;
1048
1049 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1050 planner
1051 .create_physical_expr(&coerced_expr, &df_schema, self.state)
1052 .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
1053 }
1054
1055 fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
1061 match expr {
1062 Expr::Property(base, prop) => {
1063 if let Expr::Variable(var) = base.as_ref() {
1064 let flat_col = format!("{}.{}", var, prop);
1065 if schema.index_of(&flat_col).is_ok() {
1066 return Expr::Variable(flat_col);
1067 }
1068 }
1069 Expr::Property(
1071 Box::new(Self::resolve_flat_column_properties(base, schema)),
1072 prop.clone(),
1073 )
1074 }
1075 Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
1076 left: Box::new(Self::resolve_flat_column_properties(left, schema)),
1077 op: *op,
1078 right: Box::new(Self::resolve_flat_column_properties(right, schema)),
1079 },
1080 Expr::FunctionCall {
1081 name,
1082 args,
1083 distinct,
1084 window_spec,
1085 } => Expr::FunctionCall {
1086 name: name.clone(),
1087 args: args
1088 .iter()
1089 .map(|a| Self::resolve_flat_column_properties(a, schema))
1090 .collect(),
1091 distinct: *distinct,
1092 window_spec: window_spec.clone(),
1093 },
1094 Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
1095 op: *op,
1096 expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
1097 },
1098 Expr::List(items) => Expr::List(
1099 items
1100 .iter()
1101 .map(|i| Self::resolve_flat_column_properties(i, schema))
1102 .collect(),
1103 ),
1104 other => other.clone(),
1106 }
1107 }
1108
1109 fn resolve_udfs(
1114 &self,
1115 expr: datafusion::logical_expr::Expr,
1116 ) -> Result<datafusion::logical_expr::Expr> {
1117 use datafusion::common::tree_node::{Transformed, TreeNode};
1118 use datafusion::logical_expr::Expr as DfExpr;
1119
1120 let result = expr
1121 .transform_up(|node| {
1122 if let DfExpr::ScalarFunction(ref func) = node {
1123 let udf_name = func.func.name();
1124 if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
1125 return Ok(Transformed::yes(DfExpr::ScalarFunction(
1126 datafusion::logical_expr::expr::ScalarFunction {
1127 func: registered_udf.clone(),
1128 args: func.args.clone(),
1129 },
1130 )));
1131 }
1132 }
1133 Ok(Transformed::no(node))
1134 })
1135 .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
1136 Ok(result.data)
1137 }
1138
1139 fn compile_list_comprehension(
1140 &self,
1141 variable: &str,
1142 list: &Expr,
1143 where_clause: Option<&Expr>,
1144 map_expr: &Expr,
1145 input_schema: &Schema,
1146 ) -> Result<Arc<dyn PhysicalExpr>> {
1147 let input_list_phy = self.compile(list, input_schema)?;
1148
1149 let list_data_type = input_list_phy.data_type(input_schema)?;
1151 let inner_data_type = resolve_list_element_type(
1152 &list_data_type,
1153 DataType::LargeBinary,
1154 "List comprehension",
1155 )?;
1156
1157 let mut fields = input_schema.fields().to_vec();
1159 let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1160
1161 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1162 fields[pos] = loop_var_field;
1163 } else {
1164 fields.push(loop_var_field);
1165 }
1166
1167 let needs_vid_extraction =
1169 Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1170 if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1171 let vid_field = Arc::new(Field::new(
1173 format!("{}._vid", variable),
1174 DataType::UInt64,
1175 true,
1176 ));
1177 fields.push(vid_field);
1178 }
1179
1180 let inner_schema = Arc::new(Schema::new(fields));
1181
1182 let mut scoped_ctx = None;
1184 let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1185
1186 let predicate_phy = if let Some(pred) = where_clause {
1187 Some(inner_compiler.compile(pred, &inner_schema)?)
1188 } else {
1189 None
1190 };
1191
1192 let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1193 let output_item_type = map_phy.data_type(&inner_schema)?;
1194
1195 Ok(Arc::new(ListComprehensionExecExpr::new(
1196 input_list_phy,
1197 map_phy,
1198 predicate_phy,
1199 variable.to_string(),
1200 Arc::new(input_schema.clone()),
1201 output_item_type,
1202 needs_vid_extraction,
1203 )))
1204 }
1205
1206 fn compile_reduce(
1207 &self,
1208 accumulator: &str,
1209 initial: &Expr,
1210 variable: &str,
1211 list: &Expr,
1212 reduce_expr: &Expr,
1213 input_schema: &Schema,
1214 ) -> Result<Arc<dyn PhysicalExpr>> {
1215 let list_phy = self.compile(list, input_schema)?;
1216
1217 let initial_phy = self.compile(initial, input_schema)?;
1218 let acc_type = initial_phy.data_type(input_schema)?;
1219
1220 let list_data_type = list_phy.data_type(input_schema)?;
1221 let inner_data_type =
1224 resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1225
1226 let mut fields = input_schema.fields().to_vec();
1228
1229 let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1230 if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1231 fields[pos] = acc_field;
1232 } else {
1233 fields.push(acc_field);
1234 }
1235
1236 let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1237 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1238 fields[pos] = var_field;
1239 } else {
1240 fields.push(var_field);
1241 }
1242
1243 let inner_schema = Arc::new(Schema::new(fields));
1244
1245 let mut scoped_ctx = None;
1247 let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1248
1249 let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1250 let output_type = reduce_phy.data_type(&inner_schema)?;
1251
1252 Ok(Arc::new(ReduceExecExpr::new(
1253 accumulator.to_string(),
1254 initial_phy,
1255 variable.to_string(),
1256 list_phy,
1257 reduce_phy,
1258 Arc::new(input_schema.clone()),
1259 output_type,
1260 )))
1261 }
1262
1263 fn compile_quantifier(
1264 &self,
1265 quantifier: &uni_cypher::ast::Quantifier,
1266 variable: &str,
1267 list: &Expr,
1268 predicate: &Expr,
1269 input_schema: &Schema,
1270 ) -> Result<Arc<dyn PhysicalExpr>> {
1271 let input_list_phy = self.compile(list, input_schema)?;
1272
1273 let list_data_type = input_list_phy.data_type(input_schema)?;
1275 let inner_data_type =
1276 resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1277
1278 let mut fields = input_schema.fields().to_vec();
1282 let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1283
1284 if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1286 fields[pos] = loop_var_field;
1287 } else {
1288 fields.push(loop_var_field);
1289 }
1290
1291 let inner_schema = Arc::new(Schema::new(fields));
1292
1293 let mut scoped_ctx = None;
1297 let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1298
1299 let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1300
1301 if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1303 predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1304 }
1305
1306 let qt = match quantifier {
1307 uni_cypher::ast::Quantifier::All => QuantifierType::All,
1308 uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1309 uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1310 uni_cypher::ast::Quantifier::None => QuantifierType::None,
1311 };
1312
1313 Ok(Arc::new(QuantifierExecExpr::new(
1314 input_list_phy,
1315 predicate_phy,
1316 variable.to_string(),
1317 Arc::new(input_schema.clone()),
1318 qt,
1319 )))
1320 }
1321
1322 fn compile_pattern_comprehension(
1323 &self,
1324 path_variable: &Option<String>,
1325 pattern: &uni_cypher::ast::Pattern,
1326 where_clause: Option<&Expr>,
1327 map_expr: &Expr,
1328 input_schema: &Schema,
1329 ) -> Result<Arc<dyn PhysicalExpr>> {
1330 let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1331
1332 let graph_ctx = self
1333 .graph_ctx
1334 .as_ref()
1335 .ok_or_else(|| err("GraphExecutionContext"))?;
1336 let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1337
1338 let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1340
1341 let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1343
1344 let inner_schema = build_inner_schema(
1346 input_schema,
1347 &steps,
1348 &vertex_props,
1349 &edge_props,
1350 path_variable.as_deref(),
1351 );
1352
1353 let pred_phy = where_clause
1355 .map(|p| self.compile(p, &inner_schema))
1356 .transpose()?;
1357 let map_phy = self.compile(map_expr, &inner_schema)?;
1358 let output_type = map_phy.data_type(&inner_schema)?;
1359
1360 Ok(Arc::new(PatternComprehensionExecExpr::new(
1362 graph_ctx.clone(),
1363 anchor_col,
1364 steps,
1365 path_variable.clone(),
1366 pred_phy,
1367 map_phy,
1368 Arc::new(input_schema.clone()),
1369 Arc::new(inner_schema),
1370 output_type,
1371 vertex_props,
1372 edge_props,
1373 )))
1374 }
1375
1376 fn compile_function_with_custom_args(
1382 &self,
1383 name: &str,
1384 args: &[Expr],
1385 _distinct: bool,
1386 input_schema: &Schema,
1387 ) -> Result<Arc<dyn PhysicalExpr>> {
1388 let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1390 .iter()
1391 .map(|arg| self.compile(arg, input_schema))
1392 .collect::<Result<Vec<_>>>()?;
1393
1394 let udf_name = Self::cypher_fn_to_udf(name);
1396 let udf = self
1397 .state
1398 .scalar_functions()
1399 .get(udf_name.as_str())
1400 .ok_or_else(|| {
1401 anyhow!(
1402 "UDF '{}' not found in registry for function '{}'",
1403 udf_name,
1404 name
1405 )
1406 })?;
1407
1408 let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1410 let operand_types: Vec<(&str, DataType)> = compiled_args
1411 .iter()
1412 .enumerate()
1413 .map(|(i, arg)| {
1414 let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1415 let placeholder = placeholders[i.min(3)];
1416 (placeholder, dt)
1417 })
1418 .collect();
1419
1420 let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1421 .iter()
1422 .map(|(name, _)| {
1423 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1424 None::<String>,
1425 *name,
1426 ))
1427 })
1428 .collect();
1429
1430 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1431 datafusion::logical_expr::expr::ScalarFunction {
1432 func: udf.clone(),
1433 args: dummy_cols,
1434 },
1435 );
1436
1437 self.plan_udf_physical_expr(
1439 &udf_expr,
1440 &operand_types,
1441 compiled_args,
1442 &format!("function {}", name),
1443 )
1444 }
1445
1446 fn cypher_fn_to_udf(name: &str) -> String {
1451 match name.to_uppercase().as_str() {
1452 "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1453 "REVERSE" => "_cypher_reverse".to_string(),
1454 "TOSTRING" => "tostring".to_string(),
1455 "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1456 "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1457 "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1458 "HEAD" => "head".to_string(),
1459 "LAST" => "last".to_string(),
1460 "TAIL" => "_cypher_tail".to_string(),
1461 "KEYS" => "keys".to_string(),
1462 "TYPE" => "type".to_string(),
1463 "PROPERTIES" => "properties".to_string(),
1464 "LABELS" => "labels".to_string(),
1465 "COALESCE" => "coalesce".to_string(),
1466 "ID" => "id".to_string(),
1467 _ => name.to_lowercase(),
1469 }
1470 }
1471
1472 fn compile_case(
1481 &self,
1482 operand: &Option<Box<Expr>>,
1483 when_then: &[(Expr, Expr)],
1484 else_expr: &Option<Box<Expr>>,
1485 input_schema: &Schema,
1486 ) -> Result<Arc<dyn PhysicalExpr>> {
1487 let operand_phy = operand
1488 .as_deref()
1489 .map(|e| self.compile(e, input_schema))
1490 .transpose()?;
1491
1492 let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1493 .iter()
1494 .map(|(w, t)| {
1495 let w_phy = self.compile(w, input_schema)?;
1496 let t_phy = self.compile(t, input_schema)?;
1497 Ok((w_phy, t_phy))
1498 })
1499 .collect::<Result<Vec<_>>>()?;
1500
1501 let mut else_phy = else_expr
1502 .as_deref()
1503 .map(|e| self.compile(e, input_schema))
1504 .transpose()?;
1505
1506 for (w_phy, _) in &mut when_then_phy {
1508 if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1509 *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1510 }
1511 }
1512
1513 let branch_types: Vec<DataType> = when_then_phy
1515 .iter()
1516 .map(|(_, t)| t.data_type(input_schema))
1517 .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1518 .filter_map(Result::ok)
1519 .collect();
1520
1521 let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1522 let has_list = branch_types
1523 .iter()
1524 .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1525
1526 if has_large_binary && has_list {
1528 for (_, t_phy) in &mut when_then_phy {
1529 if let Ok(dt) = t_phy.data_type(input_schema)
1530 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1531 {
1532 *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1533 }
1534 }
1535 if let Some(e_phy) = else_phy.take() {
1536 if let Ok(dt) = e_phy.data_type(input_schema)
1537 && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1538 {
1539 else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1540 } else {
1541 else_phy = Some(e_phy);
1542 }
1543 }
1544 }
1545
1546 let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1547 operand_phy,
1548 when_then_phy,
1549 else_phy,
1550 )
1551 .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1552
1553 Ok(Arc::new(case_expr))
1554 }
1555
1556 fn compile_binary_op(
1557 &self,
1558 op: &BinaryOp,
1559 left: Arc<dyn PhysicalExpr>,
1560 right: Arc<dyn PhysicalExpr>,
1561 input_schema: &Schema,
1562 ) -> Result<Arc<dyn PhysicalExpr>> {
1563 use datafusion::logical_expr::Operator;
1564
1565 let string_op = match op {
1567 BinaryOp::StartsWith => Some(StringOp::StartsWith),
1568 BinaryOp::EndsWith => Some(StringOp::EndsWith),
1569 BinaryOp::Contains => Some(StringOp::Contains),
1570 _ => None,
1571 };
1572 if let Some(sop) = string_op {
1573 return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1574 }
1575
1576 let df_op = match op {
1577 BinaryOp::Add => Operator::Plus,
1578 BinaryOp::Sub => Operator::Minus,
1579 BinaryOp::Mul => Operator::Multiply,
1580 BinaryOp::Div => Operator::Divide,
1581 BinaryOp::Mod => Operator::Modulo,
1582 BinaryOp::Eq => Operator::Eq,
1583 BinaryOp::NotEq => Operator::NotEq,
1584 BinaryOp::Gt => Operator::Gt,
1585 BinaryOp::GtEq => Operator::GtEq,
1586 BinaryOp::Lt => Operator::Lt,
1587 BinaryOp::LtEq => Operator::LtEq,
1588 BinaryOp::And => Operator::And,
1589 BinaryOp::Or => Operator::Or,
1590 BinaryOp::Xor => {
1591 return Err(anyhow!(
1592 "XOR not supported via binary helper, use bitwise_xor"
1593 ));
1594 }
1595 BinaryOp::Regex => Operator::RegexMatch,
1596 BinaryOp::ApproxEq => {
1597 return Err(anyhow!(
1598 "ApproxEq (~=) not yet supported in physical compiler"
1599 ));
1600 }
1601 BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1602 _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1603 };
1604
1605 let mut left = left;
1609 let mut right = right;
1610 let mut left_type = left.data_type(input_schema).ok();
1611 let mut right_type = right.data_type(input_schema).ok();
1612
1613 let left_is_list = matches!(
1616 left_type.as_ref(),
1617 Some(DataType::List(_) | DataType::LargeList(_))
1618 );
1619 let right_is_list = matches!(
1620 right_type.as_ref(),
1621 Some(DataType::List(_) | DataType::LargeList(_))
1622 );
1623
1624 if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1625 left = Arc::new(LargeListToCypherValueExpr::new(left));
1626 left_type = Some(DataType::LargeBinary);
1627 } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1628 right = Arc::new(LargeListToCypherValueExpr::new(right));
1629 right_type = Some(DataType::LargeBinary);
1630 }
1631
1632 let has_cv =
1633 is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1634
1635 if has_cv {
1636 if let Some(result) = self.compile_cv_comparison(
1637 df_op,
1638 left.clone(),
1639 right.clone(),
1640 &left_type,
1641 &right_type,
1642 )? {
1643 return Ok(result);
1644 }
1645 if let Some(result) = self.compile_cv_list_concat(
1646 left.clone(),
1647 right.clone(),
1648 &left_type,
1649 &right_type,
1650 df_op,
1651 )? {
1652 return Ok(result);
1653 }
1654 if let Some(result) = self.compile_cv_arithmetic(
1655 df_op,
1656 left.clone(),
1657 right.clone(),
1658 &left_type,
1659 &right_type,
1660 input_schema,
1661 )? {
1662 return Ok(result);
1663 }
1664 }
1665
1666 binary(left, df_op, right, input_schema)
1668 .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1669 }
1670
1671 fn compile_cv_comparison(
1673 &self,
1674 df_op: datafusion::logical_expr::Operator,
1675 left: Arc<dyn PhysicalExpr>,
1676 right: Arc<dyn PhysicalExpr>,
1677 left_type: &Option<DataType>,
1678 right_type: &Option<DataType>,
1679 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1680 use datafusion::logical_expr::Operator;
1681
1682 let udf_name = match df_op {
1683 Operator::Eq => "_cypher_equal",
1684 Operator::NotEq => "_cypher_not_equal",
1685 Operator::Gt => "_cypher_gt",
1686 Operator::GtEq => "_cypher_gt_eq",
1687 Operator::Lt => "_cypher_lt",
1688 Operator::LtEq => "_cypher_lt_eq",
1689 _ => return Ok(None),
1690 };
1691
1692 self.plan_binary_udf(
1693 udf_name,
1694 left,
1695 right,
1696 left_type.clone().unwrap_or(DataType::LargeBinary),
1697 right_type.clone().unwrap_or(DataType::LargeBinary),
1698 )
1699 }
1700
1701 fn compile_cv_list_concat(
1703 &self,
1704 left: Arc<dyn PhysicalExpr>,
1705 right: Arc<dyn PhysicalExpr>,
1706 left_type: &Option<DataType>,
1707 right_type: &Option<DataType>,
1708 df_op: datafusion::logical_expr::Operator,
1709 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1710 use datafusion::logical_expr::Operator;
1711
1712 if df_op != Operator::Plus {
1713 return Ok(None);
1714 }
1715
1716 let is_list = |t: &Option<DataType>| {
1718 t.as_ref()
1719 .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1720 };
1721
1722 if !is_list(left_type) && !is_list(right_type) {
1723 return Ok(None);
1724 }
1725
1726 self.plan_binary_udf(
1727 "_cypher_list_concat",
1728 left,
1729 right,
1730 left_type.clone().unwrap_or(DataType::LargeBinary),
1731 right_type.clone().unwrap_or(DataType::LargeBinary),
1732 )
1733 }
1734
1735 fn compile_cv_arithmetic(
1740 &self,
1741 df_op: datafusion::logical_expr::Operator,
1742 left: Arc<dyn PhysicalExpr>,
1743 right: Arc<dyn PhysicalExpr>,
1744 left_type: &Option<DataType>,
1745 right_type: &Option<DataType>,
1746 _input_schema: &Schema,
1747 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1748 use datafusion::logical_expr::Operator;
1749
1750 let udf_name = match df_op {
1751 Operator::Plus => "_cypher_add",
1752 Operator::Minus => "_cypher_sub",
1753 Operator::Multiply => "_cypher_mul",
1754 Operator::Divide => "_cypher_div",
1755 Operator::Modulo => "_cypher_mod",
1756 _ => return Ok(None),
1757 };
1758
1759 self.plan_binary_udf(
1760 udf_name,
1761 left,
1762 right,
1763 left_type.clone().unwrap_or(DataType::LargeBinary),
1764 right_type.clone().unwrap_or(DataType::LargeBinary),
1765 )
1766 }
1767
1768 fn plan_udf_physical_expr(
1770 &self,
1771 udf_expr: &datafusion::logical_expr::Expr,
1772 operand_types: &[(&str, DataType)],
1773 children: Vec<Arc<dyn PhysicalExpr>>,
1774 error_context: &str,
1775 ) -> Result<Arc<dyn PhysicalExpr>> {
1776 let tmp_schema = Schema::new(
1777 operand_types
1778 .iter()
1779 .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1780 .collect::<Vec<_>>(),
1781 );
1782 let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1783 let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1784 let udf_phy = planner
1785 .create_physical_expr(udf_expr, &df_schema, self.state)
1786 .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1787 udf_phy
1788 .with_new_children(children)
1789 .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1790 }
1791
1792 fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1796 let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1797 return Err(anyhow!("_cv_to_bool UDF not found"));
1798 };
1799
1800 let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1801 None::<String>,
1802 "__cv__",
1803 ));
1804 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1805 datafusion::logical_expr::expr::ScalarFunction {
1806 func: udf.clone(),
1807 args: vec![dummy_col],
1808 },
1809 );
1810
1811 self.plan_udf_physical_expr(
1812 &udf_expr,
1813 &[("__cv__", DataType::LargeBinary)],
1814 vec![expr],
1815 "CypherValue to bool",
1816 )
1817 }
1818
1819 fn plan_binary_udf(
1821 &self,
1822 udf_name: &str,
1823 left: Arc<dyn PhysicalExpr>,
1824 right: Arc<dyn PhysicalExpr>,
1825 left_type: DataType,
1826 right_type: DataType,
1827 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1828 let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1829 return Ok(None);
1830 };
1831 let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1832 datafusion::logical_expr::expr::ScalarFunction {
1833 func: udf.clone(),
1834 args: vec![
1835 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1836 None::<String>,
1837 "__left__",
1838 )),
1839 datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1840 None::<String>,
1841 "__right__",
1842 )),
1843 ],
1844 },
1845 );
1846 let result = self.plan_udf_physical_expr(
1847 &udf_expr,
1848 &[("__left__", left_type), ("__right__", right_type)],
1849 vec![left, right],
1850 udf_name,
1851 )?;
1852 Ok(Some(result))
1853 }
1854
1855 fn compile_unary_op(
1856 &self,
1857 op: &UnaryOp,
1858 expr: Arc<dyn PhysicalExpr>,
1859 input_schema: &Schema,
1860 ) -> Result<Arc<dyn PhysicalExpr>> {
1861 match op {
1862 UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1863 UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1864 }
1865 .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1866 }
1867}
1868
1869fn extract_source_variable(expr: &Expr) -> Option<String> {
1877 match expr {
1878 Expr::Property(inner, _) => extract_source_variable(inner),
1879 Expr::Variable(name) => Some(name.clone()),
1880 Expr::List(items) => items.first().and_then(extract_source_variable),
1881 _ => None,
1882 }
1883}
1884
1885fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1889 match expr {
1890 Expr::Property(_, prop) => vec![Some(prop.clone())],
1891 Expr::List(items) => items
1892 .iter()
1893 .map(|item| {
1894 if let Expr::Property(_, prop) = item {
1895 Some(prop.clone())
1896 } else {
1897 None
1898 }
1899 })
1900 .collect(),
1901 _ => vec![None],
1902 }
1903}
1904
1905fn normalize_similar_to_args<'e>(
1910 sources: &'e Expr,
1911 queries: &'e Expr,
1912) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1913 match (sources, queries) {
1914 (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1916 (src_items.iter().collect(), qry_items.iter().collect())
1917 }
1918 (Expr::List(src_items), _) if src_items.len() > 1 => {
1920 let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1921 (src_items.iter().collect(), queries_broadcast)
1922 }
1923 _ => (vec![sources], vec![queries]),
1925 }
1926}
1927
1928use datafusion::physical_plan::DisplayAs;
1929use datafusion::physical_plan::DisplayFormatType;
1930
1931#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1932enum StringOp {
1933 StartsWith,
1934 EndsWith,
1935 Contains,
1936}
1937
1938impl std::fmt::Display for StringOp {
1939 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940 match self {
1941 StringOp::StartsWith => write!(f, "STARTS WITH"),
1942 StringOp::EndsWith => write!(f, "ENDS WITH"),
1943 StringOp::Contains => write!(f, "CONTAINS"),
1944 }
1945 }
1946}
1947
1948#[derive(Debug, Eq)]
1949struct CypherStringMatchExpr {
1950 left: Arc<dyn PhysicalExpr>,
1951 right: Arc<dyn PhysicalExpr>,
1952 op: StringOp,
1953}
1954
1955impl PartialEq for CypherStringMatchExpr {
1956 fn eq(&self, other: &Self) -> bool {
1957 self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1958 }
1959}
1960
1961impl std::hash::Hash for CypherStringMatchExpr {
1962 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1963 self.op.hash(state);
1964 self.left.hash(state);
1965 self.right.hash(state);
1966 }
1967}
1968
1969impl CypherStringMatchExpr {
1970 fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1971 Self { left, right, op }
1972 }
1973}
1974
1975impl std::fmt::Display for CypherStringMatchExpr {
1976 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1977 write!(f, "{} {} {}", self.left, self.op, self.right)
1978 }
1979}
1980
1981impl DisplayAs for CypherStringMatchExpr {
1982 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1983 write!(f, "{}", self)
1984 }
1985}
1986
1987impl PhysicalExpr for CypherStringMatchExpr {
1988 fn as_any(&self) -> &dyn std::any::Any {
1989 self
1990 }
1991
1992 fn data_type(
1993 &self,
1994 _input_schema: &Schema,
1995 ) -> datafusion::error::Result<arrow_schema::DataType> {
1996 Ok(arrow_schema::DataType::Boolean)
1997 }
1998
1999 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2000 Ok(true)
2001 }
2002
2003 fn evaluate(
2004 &self,
2005 batch: &arrow_array::RecordBatch,
2006 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2007 use crate::query::df_udfs::invoke_cypher_string_op;
2008 use arrow_schema::Field;
2009 use datafusion::config::ConfigOptions;
2010 use datafusion::logical_expr::ScalarFunctionArgs;
2011
2012 let left_val = self.left.evaluate(batch)?;
2013 let right_val = self.right.evaluate(batch)?;
2014
2015 let args = ScalarFunctionArgs {
2016 args: vec![left_val, right_val],
2017 number_rows: batch.num_rows(),
2018 return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
2019 config_options: Arc::new(ConfigOptions::default()),
2020 arg_fields: vec![], };
2022
2023 match self.op {
2024 StringOp::StartsWith => {
2025 invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
2026 }
2027 StringOp::EndsWith => {
2028 invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
2029 }
2030 StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
2031 }
2032 }
2033
2034 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2035 vec![&self.left, &self.right]
2036 }
2037
2038 fn with_new_children(
2039 self: Arc<Self>,
2040 children: Vec<Arc<dyn PhysicalExpr>>,
2041 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2042 Ok(Arc::new(CypherStringMatchExpr::new(
2043 children[0].clone(),
2044 children[1].clone(),
2045 self.op,
2046 )))
2047 }
2048
2049 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2050 write!(f, "{}", self)
2051 }
2052}
2053
2054impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
2055 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2056 if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
2057 self == other
2058 } else {
2059 false
2060 }
2061 }
2062}
2063
2064#[derive(Debug, Eq)]
2069struct StructFieldAccessExpr {
2070 input: Arc<dyn PhysicalExpr>,
2072 field_idx: usize,
2074 output_type: arrow_schema::DataType,
2076}
2077
2078impl PartialEq for StructFieldAccessExpr {
2079 fn eq(&self, other: &Self) -> bool {
2080 self.field_idx == other.field_idx
2081 && self.input.eq(&other.input)
2082 && self.output_type == other.output_type
2083 }
2084}
2085
2086impl std::hash::Hash for StructFieldAccessExpr {
2087 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2088 self.input.hash(state);
2089 self.field_idx.hash(state);
2090 }
2091}
2092
2093impl StructFieldAccessExpr {
2094 fn new(
2095 input: Arc<dyn PhysicalExpr>,
2096 field_idx: usize,
2097 output_type: arrow_schema::DataType,
2098 ) -> Self {
2099 Self {
2100 input,
2101 field_idx,
2102 output_type,
2103 }
2104 }
2105}
2106
2107impl std::fmt::Display for StructFieldAccessExpr {
2108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2109 write!(f, "{}[{}]", self.input, self.field_idx)
2110 }
2111}
2112
2113impl DisplayAs for StructFieldAccessExpr {
2114 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2115 write!(f, "{}", self)
2116 }
2117}
2118
2119impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
2120 fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2121 if let Some(other) = other.as_any().downcast_ref::<Self>() {
2122 self.field_idx == other.field_idx && self.input.eq(&other.input)
2123 } else {
2124 false
2125 }
2126 }
2127}
2128
2129impl PhysicalExpr for StructFieldAccessExpr {
2130 fn as_any(&self) -> &dyn std::any::Any {
2131 self
2132 }
2133
2134 fn data_type(
2135 &self,
2136 _input_schema: &Schema,
2137 ) -> datafusion::error::Result<arrow_schema::DataType> {
2138 Ok(self.output_type.clone())
2139 }
2140
2141 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2142 Ok(true)
2143 }
2144
2145 fn evaluate(
2146 &self,
2147 batch: &arrow_array::RecordBatch,
2148 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2149 use arrow_array::StructArray;
2150
2151 let input_val = self.input.evaluate(batch)?;
2152 let array = input_val.into_array(batch.num_rows())?;
2153
2154 let struct_array = array
2155 .as_any()
2156 .downcast_ref::<StructArray>()
2157 .ok_or_else(|| {
2158 datafusion::error::DataFusionError::Execution(
2159 "StructFieldAccessExpr: input is not a StructArray".to_string(),
2160 )
2161 })?;
2162
2163 let field_col = struct_array.column(self.field_idx).clone();
2164 Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2165 }
2166
2167 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2168 vec![&self.input]
2169 }
2170
2171 fn with_new_children(
2172 self: Arc<Self>,
2173 children: Vec<Arc<dyn PhysicalExpr>>,
2174 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2175 Ok(Arc::new(StructFieldAccessExpr::new(
2176 children[0].clone(),
2177 self.field_idx,
2178 self.output_type.clone(),
2179 )))
2180 }
2181
2182 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2183 write!(f, "{}", self)
2184 }
2185}
2186
2187struct ExistsExecExpr {
2200 query: Query,
2201 graph_ctx: Arc<GraphExecutionContext>,
2202 session_ctx: Arc<RwLock<SessionContext>>,
2203 storage: Arc<StorageManager>,
2204 uni_schema: Arc<UniSchema>,
2205 params: HashMap<String, Value>,
2206 outer_entity_vars: HashSet<String>,
2209}
2210
2211impl std::fmt::Debug for ExistsExecExpr {
2212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2213 f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2214 }
2215}
2216
2217impl ExistsExecExpr {
2218 fn new(
2219 query: Query,
2220 graph_ctx: Arc<GraphExecutionContext>,
2221 session_ctx: Arc<RwLock<SessionContext>>,
2222 storage: Arc<StorageManager>,
2223 uni_schema: Arc<UniSchema>,
2224 params: HashMap<String, Value>,
2225 outer_entity_vars: HashSet<String>,
2226 ) -> Self {
2227 Self {
2228 query,
2229 graph_ctx,
2230 session_ctx,
2231 storage,
2232 uni_schema,
2233 params,
2234 outer_entity_vars,
2235 }
2236 }
2237}
2238
2239impl std::fmt::Display for ExistsExecExpr {
2240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2241 write!(f, "EXISTS(<subquery>)")
2242 }
2243}
2244
2245impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2246 fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2247 false
2248 }
2249}
2250
2251impl PartialEq for ExistsExecExpr {
2252 fn eq(&self, _other: &Self) -> bool {
2253 false
2254 }
2255}
2256
2257impl Eq for ExistsExecExpr {}
2258
2259impl std::hash::Hash for ExistsExecExpr {
2260 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2261 "ExistsExecExpr".hash(state);
2262 }
2263}
2264
2265impl DisplayAs for ExistsExecExpr {
2266 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2267 write!(f, "{}", self)
2268 }
2269}
2270
2271impl PhysicalExpr for ExistsExecExpr {
2272 fn as_any(&self) -> &dyn std::any::Any {
2273 self
2274 }
2275
2276 fn data_type(
2277 &self,
2278 _input_schema: &Schema,
2279 ) -> datafusion::error::Result<arrow_schema::DataType> {
2280 Ok(DataType::Boolean)
2281 }
2282
2283 fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2284 Ok(true)
2285 }
2286
2287 fn evaluate(
2288 &self,
2289 batch: &arrow_array::RecordBatch,
2290 ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2291 let num_rows = batch.num_rows();
2292 let mut builder = BooleanBuilder::with_capacity(num_rows);
2293
2294 let schema = batch.schema();
2305 let mut entity_vars: HashSet<String> = HashSet::new();
2306 for field in schema.fields() {
2307 let name = field.name();
2308 if let Some(base) = name.strip_suffix("._vid") {
2309 entity_vars.insert(base.to_string());
2310 }
2311 if matches!(field.data_type(), DataType::Struct(_)) {
2312 entity_vars.insert(name.to_string());
2313 }
2314 if !name.contains('.')
2318 && !name.starts_with('_')
2319 && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2320 {
2321 entity_vars.insert(name.to_string());
2322 }
2323 }
2324 let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2325
2326 let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2329
2330 let planner = QueryPlanner::new(self.uni_schema.clone());
2332 let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2333 Ok(plan) => plan,
2334 Err(e) => {
2335 return Err(datafusion::error::DataFusionError::Execution(format!(
2336 "EXISTS subquery planning failed: {}",
2337 e
2338 )));
2339 }
2340 };
2341
2342 let graph_ctx = self.graph_ctx.clone();
2345 let session_ctx = self.session_ctx.clone();
2346 let storage = self.storage.clone();
2347 let uni_schema = self.uni_schema.clone();
2348 let base_params = self.params.clone();
2349
2350 let result = std::thread::scope(|s| {
2351 s.spawn(|| {
2352 let rt = tokio::runtime::Builder::new_current_thread()
2353 .enable_all()
2354 .build()
2355 .map_err(|e| {
2356 datafusion::error::DataFusionError::Execution(format!(
2357 "Failed to create runtime for EXISTS: {}",
2358 e
2359 ))
2360 })?;
2361
2362 let mut combined_entity_vars = self.outer_entity_vars.clone();
2365 combined_entity_vars.extend(entity_vars.iter().cloned());
2366
2367 for row_idx in 0..num_rows {
2368 let row_params = extract_row_params(batch, row_idx);
2369 let mut sub_params = base_params.clone();
2370 sub_params.extend(row_params);
2371
2372 for var in &entity_vars {
2375 let vid_key = format!("{}._vid", var);
2376 if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2377 sub_params.insert(var.clone(), vid_val);
2378 }
2379 }
2380
2381 let (batches, _plan) = rt.block_on(execute_subplan_with_outer_vars(
2382 &logical_plan,
2383 &sub_params,
2384 &HashMap::new(), &graph_ctx,
2386 &session_ctx,
2387 &storage,
2388 &uni_schema,
2389 &combined_entity_vars,
2390 None, ))?;
2392
2393 let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2394 builder.append_value(has_rows);
2395 }
2396
2397 Ok::<_, datafusion::error::DataFusionError>(())
2398 })
2399 .join()
2400 .unwrap_or_else(|_| {
2401 Err(datafusion::error::DataFusionError::Execution(
2402 "EXISTS subquery thread panicked".to_string(),
2403 ))
2404 })
2405 });
2406
2407 if let Err(e) = result {
2408 return Err(datafusion::error::DataFusionError::Execution(format!(
2409 "EXISTS subquery execution failed: {}",
2410 e
2411 )));
2412 }
2413
2414 Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2415 builder.finish(),
2416 )))
2417 }
2418
2419 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2420 vec![]
2421 }
2422
2423 fn with_new_children(
2424 self: Arc<Self>,
2425 children: Vec<Arc<dyn PhysicalExpr>>,
2426 ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2427 if !children.is_empty() {
2428 return Err(datafusion::error::DataFusionError::Plan(
2429 "ExistsExecExpr has no children".to_string(),
2430 ));
2431 }
2432 Ok(self)
2433 }
2434
2435 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2436 write!(f, "{}", self)
2437 }
2438}
2439
2440fn has_mutation_clause(query: &Query) -> bool {
2447 match query {
2448 Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2449 matches!(
2450 c,
2451 Clause::Create(_)
2452 | Clause::Delete(_)
2453 | Clause::Set(_)
2454 | Clause::Remove(_)
2455 | Clause::Merge(_)
2456 ) || has_mutation_in_clause_exprs(c)
2457 }),
2458 Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2459 _ => false,
2460 }
2461}
2462
2463fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2465 let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2466
2467 match clause {
2468 Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2469 Clause::With(w) => {
2470 w.where_clause.as_ref().is_some_and(check_expr)
2471 || w.items.iter().any(|item| match item {
2472 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2473 ReturnItem::All => false,
2474 })
2475 }
2476 Clause::Return(r) => r.items.iter().any(|item| match item {
2477 ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2478 ReturnItem::All => false,
2479 }),
2480 _ => false,
2481 }
2482}
2483
2484fn has_mutation_in_expr(expr: &Expr) -> bool {
2486 match expr {
2487 Expr::Exists { query, .. } => has_mutation_clause(query),
2488 _ => {
2489 let mut found = false;
2490 expr.for_each_child(&mut |child| {
2491 if has_mutation_in_expr(child) {
2492 found = true;
2493 }
2494 });
2495 found
2496 }
2497 }
2498}
2499
2500fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2506 match query {
2507 Query::Single(stmt) => Query::Single(Statement {
2508 clauses: stmt
2509 .clauses
2510 .iter()
2511 .map(|c| rewrite_clause_correlated(c, outer_vars))
2512 .collect(),
2513 }),
2514 Query::Union { left, right, all } => Query::Union {
2515 left: Box::new(rewrite_query_correlated(left, outer_vars)),
2516 right: Box::new(rewrite_query_correlated(right, outer_vars)),
2517 all: *all,
2518 },
2519 other => other.clone(),
2520 }
2521}
2522
2523fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2525 match clause {
2526 Clause::Match(m) => Clause::Match(MatchClause {
2527 optional: m.optional,
2528 pattern: m.pattern.clone(),
2529 where_clause: m
2530 .where_clause
2531 .as_ref()
2532 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2533 for_update: m.for_update,
2534 }),
2535 Clause::With(w) => Clause::With(WithClause {
2536 distinct: w.distinct,
2537 items: w
2538 .items
2539 .iter()
2540 .map(|item| rewrite_return_item(item, outer_vars))
2541 .collect(),
2542 order_by: w.order_by.as_ref().map(|items| {
2543 items
2544 .iter()
2545 .map(|si| SortItem {
2546 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2547 ascending: si.ascending,
2548 })
2549 .collect()
2550 }),
2551 skip: w
2552 .skip
2553 .as_ref()
2554 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2555 limit: w
2556 .limit
2557 .as_ref()
2558 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2559 where_clause: w
2560 .where_clause
2561 .as_ref()
2562 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2563 }),
2564 Clause::Return(r) => Clause::Return(ReturnClause {
2565 distinct: r.distinct,
2566 items: r
2567 .items
2568 .iter()
2569 .map(|item| rewrite_return_item(item, outer_vars))
2570 .collect(),
2571 order_by: r.order_by.as_ref().map(|items| {
2572 items
2573 .iter()
2574 .map(|si| SortItem {
2575 expr: rewrite_expr_correlated(&si.expr, outer_vars),
2576 ascending: si.ascending,
2577 })
2578 .collect()
2579 }),
2580 skip: r
2581 .skip
2582 .as_ref()
2583 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2584 limit: r
2585 .limit
2586 .as_ref()
2587 .map(|e| rewrite_expr_correlated(e, outer_vars)),
2588 }),
2589 Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2590 expr: rewrite_expr_correlated(&u.expr, outer_vars),
2591 variable: u.variable.clone(),
2592 }),
2593 other => other.clone(),
2594 }
2595}
2596
2597fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2598 match item {
2599 ReturnItem::All => ReturnItem::All,
2600 ReturnItem::Expr {
2601 expr,
2602 alias,
2603 source_text,
2604 } => ReturnItem::Expr {
2605 expr: rewrite_expr_correlated(expr, outer_vars),
2606 alias: alias.clone(),
2607 source_text: source_text.clone(),
2608 },
2609 }
2610}
2611
2612fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2615 match expr {
2616 Expr::Property(base, key) => {
2618 if let Expr::Variable(v) = base.as_ref()
2619 && outer_vars.contains(v)
2620 {
2621 return Expr::Parameter(format!("{}.{}", v, key));
2622 }
2623 Expr::Property(
2624 Box::new(rewrite_expr_correlated(base, outer_vars)),
2625 key.clone(),
2626 )
2627 }
2628 Expr::Exists {
2630 query,
2631 from_pattern_predicate,
2632 } => Expr::Exists {
2633 query: Box::new(rewrite_query_correlated(query, outer_vars)),
2634 from_pattern_predicate: *from_pattern_predicate,
2635 },
2636 Expr::CountSubquery(query) => {
2638 Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2639 }
2640 Expr::CollectSubquery(query) => {
2641 Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2642 }
2643 other => other
2645 .clone()
2646 .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2647 }
2648}
2649
2650fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2654 for idx in &schema.indexes {
2655 if let IndexDefinition::Vector(config) = idx
2656 && config.property == property
2657 {
2658 return Some(config.metric.clone());
2659 }
2660 }
2661 None
2662}