uni_query/query/df_graph/
pattern_exists.rs1use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Display, Formatter};
19use std::hash::Hash;
20use std::sync::Arc;
21
22use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
23use arrow_schema::{DataType, Schema};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::Vid;
29use uni_common::core::schema::Schema as UniSchema;
30use uni_common::value::Value;
31use uni_cypher::ast::{Expr as CypherExpr, Pattern, PatternElement, Query};
32use uni_store::runtime::l0_visibility;
33use uni_store::storage::direction::Direction;
34
35use super::GraphExecutionContext;
36use super::pattern_comprehension::TraversalStep;
37use crate::query::df_graph::common::column_as_vid_array;
38
39#[derive(Debug, Clone)]
45pub struct PropertyPredicate {
46 pub property_name: String,
48 pub param_name: Option<String>,
50 pub literal_value: Option<Value>,
52}
53
54#[derive(Debug)]
59pub struct PatternExistsExecExpr {
60 graph_ctx: Arc<GraphExecutionContext>,
62 anchor_column: String,
64 traversal_steps: Vec<TraversalStep>,
66 input_schema: Arc<Schema>,
68 target_property_predicates: Vec<Vec<PropertyPredicate>>,
70 bound_target_columns: Vec<Option<String>>,
75 params: HashMap<String, Value>,
77}
78
79impl Clone for PatternExistsExecExpr {
80 fn clone(&self) -> Self {
81 Self {
82 graph_ctx: self.graph_ctx.clone(),
83 anchor_column: self.anchor_column.clone(),
84 traversal_steps: self.traversal_steps.clone(),
85 input_schema: self.input_schema.clone(),
86 target_property_predicates: self.target_property_predicates.clone(),
87 bound_target_columns: self.bound_target_columns.clone(),
88 params: self.params.clone(),
89 }
90 }
91}
92
93impl PatternExistsExecExpr {
94 pub fn new(
96 graph_ctx: Arc<GraphExecutionContext>,
97 anchor_column: String,
98 traversal_steps: Vec<TraversalStep>,
99 input_schema: Arc<Schema>,
100 target_property_predicates: Vec<Vec<PropertyPredicate>>,
101 bound_target_columns: Vec<Option<String>>,
102 params: HashMap<String, Value>,
103 ) -> Self {
104 Self {
105 graph_ctx,
106 anchor_column,
107 traversal_steps,
108 input_schema,
109 target_property_predicates,
110 bound_target_columns,
111 params,
112 }
113 }
114
115 fn resolve_predicate_value(&self, pred: &PropertyPredicate) -> Option<Value> {
117 if let Some(ref val) = pred.literal_value {
118 Some(val.clone())
119 } else if let Some(ref param_name) = pred.param_name {
120 self.params.get(param_name).cloned()
121 } else {
122 None
123 }
124 }
125}
126
127impl Display for PatternExistsExecExpr {
128 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
129 write!(
130 f,
131 "PatternExists(anchor={}, steps={})",
132 self.anchor_column,
133 self.traversal_steps.len()
134 )
135 }
136}
137
138impl PartialEq for PatternExistsExecExpr {
139 fn eq(&self, other: &Self) -> bool {
140 self.anchor_column == other.anchor_column && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
141 }
142}
143
144impl Eq for PatternExistsExecExpr {}
145
146impl Hash for PatternExistsExecExpr {
147 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
148 self.anchor_column.hash(state);
149 self.traversal_steps.len().hash(state);
150 }
151}
152
153impl PartialEq<dyn Any> for PatternExistsExecExpr {
154 fn eq(&self, other: &dyn Any) -> bool {
155 other
156 .downcast_ref::<Self>()
157 .map(|x| self == x)
158 .unwrap_or(false)
159 }
160}
161
162impl PhysicalExpr for PatternExistsExecExpr {
163 fn as_any(&self) -> &dyn Any {
164 self
165 }
166
167 fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
168 Ok(DataType::Boolean)
169 }
170
171 fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
172 Ok(true)
173 }
174
175 fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
176 let num_rows = batch.num_rows();
177
178 let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
180 col
181 } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
182 batch.column_by_name(var_name).ok_or_else(|| {
183 DataFusionError::Execution(format!(
184 "PatternExists: anchor column '{}' not found in batch schema: {:?}",
185 self.anchor_column,
186 batch
187 .schema()
188 .fields()
189 .iter()
190 .map(|f| f.name().as_str())
191 .collect::<Vec<_>>()
192 ))
193 })?
194 } else {
195 return Err(DataFusionError::Execution(format!(
196 "PatternExists: anchor column '{}' not found in batch schema: {:?}",
197 self.anchor_column,
198 batch
199 .schema()
200 .fields()
201 .iter()
202 .map(|f| f.name().as_str())
203 .collect::<Vec<_>>()
204 )));
205 };
206 let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
207 let anchor_vids: &UInt64Array = &anchor_vid_cow;
208
209 for step in &self.traversal_steps {
211 std::thread::scope(|s| {
212 s.spawn(|| {
213 let rt = tokio::runtime::Builder::new_current_thread()
214 .enable_all()
215 .build()
216 .map_err(|e| {
217 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
218 })?;
219 rt.block_on(
220 self.graph_ctx
221 .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
222 )
223 .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
224 })
225 .join()
226 .unwrap_or_else(|_| {
227 Err(DataFusionError::Execution(
228 "CSR warming thread panicked".to_string(),
229 ))
230 })
231 })?;
232 }
233
234 let mut result = vec![false; num_rows];
236 let query_ctx = self.graph_ctx.query_context();
237
238 let mut frontier: Vec<(u32, u64)> = Vec::with_capacity(num_rows);
240 for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
241 if let Some(vid_u64) = vid_opt {
242 frontier.push((row_idx as u32, vid_u64));
243 }
244 }
245
246 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
247 if frontier.is_empty() {
248 break;
249 }
250
251 let is_last_step = step_idx == self.traversal_steps.len() - 1;
252 let has_property_preds = step_idx < self.target_property_predicates.len()
253 && !self.target_property_predicates[step_idx].is_empty();
254 let is_undirected = step.direction == Direction::Both;
255
256 let bound_target_vids: Option<std::borrow::Cow<'_, UInt64Array>> =
259 if let Some(Some(col_name)) = self.bound_target_columns.get(step_idx) {
260 let col = batch.column_by_name(col_name).or_else(|| {
261 col_name
262 .strip_suffix("._vid")
263 .and_then(|v| batch.column_by_name(v))
264 });
265 col.map(|c| column_as_vid_array(c.as_ref())).transpose()?
266 } else {
267 None
268 };
269
270 let resolved_preds: Vec<(String, Value)> = if has_property_preds {
272 self.target_property_predicates[step_idx]
273 .iter()
274 .filter_map(|p| {
275 self.resolve_predicate_value(p)
276 .map(|v| (p.property_name.clone(), v))
277 })
278 .collect()
279 } else {
280 Vec::new()
281 };
282
283 let mut next_frontier: Vec<(u32, u64)> = Vec::new();
285
286 let passes_label_filter = |target_vid: Vid| -> bool {
288 if let Some(ref label_name) = step.target_label_name
289 && let Some(vertex_labels) =
290 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
291 && !vertex_labels.contains(label_name)
292 {
293 return false;
294 }
295 true
296 };
297
298 if !resolved_preds.is_empty() {
299 let mut candidates: Vec<(u32, Vid)> = Vec::new();
301
302 for &(row_idx, src_vid_u64) in &frontier {
303 if result[row_idx as usize] {
304 continue;
305 }
306 let vid = Vid::from(src_vid_u64);
307 let mut seen_eids = std::collections::HashSet::new();
308
309 for &edge_type in &step.edge_type_ids {
310 let neighbors =
311 self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
312
313 for (target_vid, eid) in neighbors {
314 if is_undirected && !seen_eids.insert(eid.as_u64()) {
315 continue;
316 }
317 if !passes_label_filter(target_vid) {
318 continue;
319 }
320 if let Some(ref bound_vids) = bound_target_vids
322 && !bound_vids.is_null(row_idx as usize)
323 && target_vid.as_u64() != bound_vids.value(row_idx as usize)
324 {
325 continue;
326 }
327 candidates.push((row_idx, target_vid));
328 }
329 }
330 }
331
332 if !candidates.is_empty() {
334 let unique_vids: Vec<Vid> = {
335 let mut v: Vec<Vid> = candidates.iter().map(|c| c.1).collect();
336 v.sort_unstable();
337 v.dedup();
338 v
339 };
340
341 let prop_names: Vec<&str> =
342 resolved_preds.iter().map(|(n, _)| n.as_str()).collect();
343
344 let props_map = std::thread::scope(|s| {
345 s.spawn(|| {
346 let rt = tokio::runtime::Builder::new_current_thread()
347 .enable_all()
348 .build()
349 .map_err(|e| {
350 DataFusionError::Execution(format!(
351 "Runtime creation failed: {e}"
352 ))
353 })?;
354 rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
355 &unique_vids,
356 &prop_names,
357 Some(&query_ctx),
358 ))
359 .map_err(|e| {
360 DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
361 })
362 })
363 .join()
364 .unwrap_or_else(|_| {
365 Err(DataFusionError::Execution(
366 "Vertex prop load thread panicked".to_string(),
367 ))
368 })
369 })?;
370
371 for (row_idx, target_vid) in &candidates {
372 if result[*row_idx as usize] {
373 continue;
374 }
375
376 let matches = if let Some(props) = props_map.get(target_vid) {
377 resolved_preds
378 .iter()
379 .all(|(name, expected)| match props.get(name) {
380 Some(actual) => actual == expected,
381 None => matches!(expected, Value::Null),
382 })
383 } else {
384 resolved_preds
385 .iter()
386 .all(|(_, expected)| matches!(expected, Value::Null))
387 };
388
389 if matches {
390 if is_last_step {
391 result[*row_idx as usize] = true;
392 } else {
393 next_frontier.push((*row_idx, target_vid.as_u64()));
394 }
395 }
396 }
397 }
398 } else {
399 for &(row_idx, src_vid_u64) in &frontier {
401 if result[row_idx as usize] {
402 continue;
403 }
404 let vid = Vid::from(src_vid_u64);
405 let mut found = false;
406 let mut seen_eids = std::collections::HashSet::new();
407
408 let expected_target: Option<u64> = bound_target_vids.as_ref().and_then(|bv| {
410 if bv.is_null(row_idx as usize) {
411 None
412 } else {
413 Some(bv.value(row_idx as usize))
414 }
415 });
416
417 'edge_types: for &edge_type in &step.edge_type_ids {
418 let neighbors =
419 self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
420
421 for (target_vid, eid) in neighbors {
422 if is_undirected && !seen_eids.insert(eid.as_u64()) {
423 continue;
424 }
425 if !passes_label_filter(target_vid) {
426 continue;
427 }
428 if let Some(expected) = expected_target
430 && target_vid.as_u64() != expected
431 {
432 continue;
433 }
434
435 if is_last_step {
436 found = true;
437 break 'edge_types;
438 } else {
439 next_frontier.push((row_idx, target_vid.as_u64()));
440 }
441 }
442 }
443
444 if found {
445 result[row_idx as usize] = true;
446 }
447 }
448 }
449
450 frontier = next_frontier;
451 }
452
453 let bool_array = BooleanArray::from(result);
455 Ok(ColumnarValue::Array(Arc::new(bool_array)))
456 }
457
458 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459 write!(f, "PatternExists({})", self.anchor_column)
460 }
461
462 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
463 vec![]
464 }
465
466 fn with_new_children(
467 self: Arc<Self>,
468 _children: Vec<Arc<dyn PhysicalExpr>>,
469 ) -> DFResult<Arc<dyn PhysicalExpr>> {
470 Ok(self)
471 }
472}
473
474pub fn extract_pattern_from_exists_query(query: &Query) -> anyhow::Result<Pattern> {
484 match query {
485 Query::Single(stmt) if stmt.clauses.len() == 1 => {
486 if let uni_cypher::ast::Clause::Match(m) = &stmt.clauses[0] {
487 for path in &m.pattern.paths {
489 for elem in &path.elements {
490 if let PatternElement::Relationship(rel) = elem
491 && rel.range.is_some()
492 {
493 anyhow::bail!(
494 "Variable-length paths in pattern predicates require subquery evaluation"
495 );
496 }
497 }
498 }
499 Ok(m.pattern.clone())
500 } else {
501 anyhow::bail!("Expected Match clause in pattern predicate EXISTS query")
502 }
503 }
504 _ => anyhow::bail!("Pattern predicate EXISTS query has unexpected structure"),
505 }
506}
507
508pub fn extract_target_property_predicates(
518 pattern: &Pattern,
519 steps: &[TraversalStep],
520 _uni_schema: &UniSchema,
521) -> anyhow::Result<Vec<Vec<PropertyPredicate>>> {
522 if pattern.paths.is_empty() {
523 return Ok(vec![Vec::new(); steps.len()]);
524 }
525
526 let elements = &pattern.paths[0].elements;
527
528 let anchor_idx = elements
531 .iter()
532 .position(|e| matches!(e, PatternElement::Node(_)))
533 .unwrap_or(0);
534
535 let mut result = Vec::with_capacity(steps.len());
536
537 for step_i in 0..steps.len() {
538 let target_elem_idx = anchor_idx + 2 * (step_i + 1);
540 let preds = if target_elem_idx < elements.len() {
541 if let PatternElement::Node(node) = &elements[target_elem_idx] {
542 extract_node_property_predicates(node)?
543 } else {
544 Vec::new()
545 }
546 } else {
547 Vec::new()
548 };
549 result.push(preds);
550 }
551
552 Ok(result)
553}
554
555fn extract_node_property_predicates(
557 node: &uni_cypher::ast::NodePattern,
558) -> anyhow::Result<Vec<PropertyPredicate>> {
559 let Some(ref props_expr) = node.properties else {
560 return Ok(Vec::new());
561 };
562
563 let CypherExpr::Map(entries) = props_expr else {
564 anyhow::bail!("Node properties must be a map literal for pattern exists optimization");
565 };
566
567 let mut predicates = Vec::with_capacity(entries.len());
568 for (key, value_expr) in entries {
569 match value_expr {
570 CypherExpr::Parameter(param_name) => {
571 predicates.push(PropertyPredicate {
572 property_name: key.clone(),
573 param_name: Some(param_name.clone()),
574 literal_value: None,
575 });
576 }
577 CypherExpr::Literal(lit) => {
578 predicates.push(PropertyPredicate {
579 property_name: key.clone(),
580 param_name: None,
581 literal_value: Some(lit.to_value()),
582 });
583 }
584 _ => {
585 anyhow::bail!(
586 "Unsupported property value expression in pattern exists: {:?}",
587 value_expr
588 );
589 }
590 }
591 }
592
593 Ok(predicates)
594}