uni_query/query/df_graph/
pattern_exists.rs1use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Display, Formatter};
19use std::hash::Hash;
20use std::sync::Arc;
21
22use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
23use arrow_schema::{DataType, Schema};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::Vid;
29use uni_common::core::schema::Schema as UniSchema;
30use uni_common::value::Value;
31use uni_cypher::ast::{Expr as CypherExpr, Pattern, PatternElement, Query};
32use uni_store::storage::direction::Direction;
33
34use super::GraphExecutionContext;
35use super::pattern_comprehension::TraversalStep;
36use crate::query::df_graph::common::column_as_vid_array;
37
38#[derive(Debug, Clone)]
44pub struct PropertyPredicate {
45 pub property_name: String,
47 pub param_name: Option<String>,
49 pub literal_value: Option<Value>,
51}
52
53#[derive(Debug)]
58pub struct PatternExistsExecExpr {
59 graph_ctx: Arc<GraphExecutionContext>,
61 anchor_column: String,
63 traversal_steps: Vec<TraversalStep>,
65 input_schema: Arc<Schema>,
67 target_property_predicates: Vec<Vec<PropertyPredicate>>,
69 bound_target_columns: Vec<Option<String>>,
74 params: HashMap<String, Value>,
76}
77
78impl Clone for PatternExistsExecExpr {
79 fn clone(&self) -> Self {
80 Self {
81 graph_ctx: self.graph_ctx.clone(),
82 anchor_column: self.anchor_column.clone(),
83 traversal_steps: self.traversal_steps.clone(),
84 input_schema: self.input_schema.clone(),
85 target_property_predicates: self.target_property_predicates.clone(),
86 bound_target_columns: self.bound_target_columns.clone(),
87 params: self.params.clone(),
88 }
89 }
90}
91
92impl PatternExistsExecExpr {
93 pub fn new(
95 graph_ctx: Arc<GraphExecutionContext>,
96 anchor_column: String,
97 traversal_steps: Vec<TraversalStep>,
98 input_schema: Arc<Schema>,
99 target_property_predicates: Vec<Vec<PropertyPredicate>>,
100 bound_target_columns: Vec<Option<String>>,
101 params: HashMap<String, Value>,
102 ) -> Self {
103 Self {
104 graph_ctx,
105 anchor_column,
106 traversal_steps,
107 input_schema,
108 target_property_predicates,
109 bound_target_columns,
110 params,
111 }
112 }
113
114 fn resolve_predicate_value(&self, pred: &PropertyPredicate) -> Option<Value> {
116 if let Some(ref val) = pred.literal_value {
117 Some(val.clone())
118 } else if let Some(ref param_name) = pred.param_name {
119 self.params.get(param_name).cloned()
120 } else {
121 None
122 }
123 }
124}
125
126impl Display for PatternExistsExecExpr {
127 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
128 write!(
129 f,
130 "PatternExists(anchor={}, steps={})",
131 self.anchor_column,
132 self.traversal_steps.len()
133 )
134 }
135}
136
137impl PartialEq for PatternExistsExecExpr {
138 fn eq(&self, other: &Self) -> bool {
139 self.anchor_column == other.anchor_column && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
140 }
141}
142
143impl Eq for PatternExistsExecExpr {}
144
145impl Hash for PatternExistsExecExpr {
146 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
147 self.anchor_column.hash(state);
148 self.traversal_steps.len().hash(state);
149 }
150}
151
152impl PartialEq<dyn Any> for PatternExistsExecExpr {
153 fn eq(&self, other: &dyn Any) -> bool {
154 other
155 .downcast_ref::<Self>()
156 .map(|x| self == x)
157 .unwrap_or(false)
158 }
159}
160
161impl PhysicalExpr for PatternExistsExecExpr {
162 fn as_any(&self) -> &dyn Any {
163 self
164 }
165
166 fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
167 Ok(DataType::Boolean)
168 }
169
170 fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
171 Ok(true)
172 }
173
174 fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
175 let num_rows = batch.num_rows();
176
177 let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
179 col
180 } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
181 batch.column_by_name(var_name).ok_or_else(|| {
182 DataFusionError::Execution(format!(
183 "PatternExists: anchor column '{}' not found in batch schema: {:?}",
184 self.anchor_column,
185 batch
186 .schema()
187 .fields()
188 .iter()
189 .map(|f| f.name().as_str())
190 .collect::<Vec<_>>()
191 ))
192 })?
193 } else {
194 return Err(DataFusionError::Execution(format!(
195 "PatternExists: anchor column '{}' not found in batch schema: {:?}",
196 self.anchor_column,
197 batch
198 .schema()
199 .fields()
200 .iter()
201 .map(|f| f.name().as_str())
202 .collect::<Vec<_>>()
203 )));
204 };
205 let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
206 let anchor_vids: &UInt64Array = &anchor_vid_cow;
207
208 for step in &self.traversal_steps {
210 std::thread::scope(|s| {
211 s.spawn(|| {
212 let rt = tokio::runtime::Builder::new_current_thread()
213 .enable_all()
214 .build()
215 .map_err(|e| {
216 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
217 })?;
218 rt.block_on(
219 self.graph_ctx
220 .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
221 )
222 .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
223 })
224 .join()
225 .unwrap_or_else(|_| {
226 Err(DataFusionError::Execution(
227 "CSR warming thread panicked".to_string(),
228 ))
229 })
230 })?;
231 }
232
233 let mut result = vec![false; num_rows];
235 let query_ctx = self.graph_ctx.query_context();
236
237 let mut frontier: Vec<(u32, u64)> = Vec::with_capacity(num_rows);
239 for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
240 if let Some(vid_u64) = vid_opt {
241 frontier.push((row_idx as u32, vid_u64));
242 }
243 }
244
245 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
246 if frontier.is_empty() {
247 break;
248 }
249
250 let is_last_step = step_idx == self.traversal_steps.len() - 1;
251 let has_property_preds = step_idx < self.target_property_predicates.len()
252 && !self.target_property_predicates[step_idx].is_empty();
253 let is_undirected = step.direction == Direction::Both;
254
255 let bound_target_vids: Option<std::borrow::Cow<'_, UInt64Array>> =
258 if let Some(Some(col_name)) = self.bound_target_columns.get(step_idx) {
259 let col = batch.column_by_name(col_name).or_else(|| {
260 col_name
261 .strip_suffix("._vid")
262 .and_then(|v| batch.column_by_name(v))
263 });
264 col.map(|c| column_as_vid_array(c.as_ref())).transpose()?
265 } else {
266 None
267 };
268
269 let resolved_preds: Vec<(String, Value)> = if has_property_preds {
271 self.target_property_predicates[step_idx]
272 .iter()
273 .filter_map(|p| {
274 self.resolve_predicate_value(p)
275 .map(|v| (p.property_name.clone(), v))
276 })
277 .collect()
278 } else {
279 Vec::new()
280 };
281
282 let mut next_frontier: Vec<(u32, u64)> = Vec::new();
284
285 let passes_label_filter = |target_vid: Vid| -> bool {
289 if let Some(ref label_name) = step.target_label_name
290 && let Some(vertex_labels) =
291 self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
292 && !vertex_labels.contains(label_name)
293 {
294 return false;
295 }
296 true
297 };
298
299 if !resolved_preds.is_empty() {
300 let mut candidates: Vec<(u32, Vid)> = Vec::new();
302
303 for &(row_idx, src_vid_u64) in &frontier {
304 if result[row_idx as usize] {
305 continue;
306 }
307 let vid = Vid::from(src_vid_u64);
308 let mut seen_eids = std::collections::HashSet::new();
309
310 for &edge_type in &step.edge_type_ids {
311 let neighbors =
312 self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
313
314 for (target_vid, eid) in neighbors {
315 if is_undirected && !seen_eids.insert(eid.as_u64()) {
316 continue;
317 }
318 if !passes_label_filter(target_vid) {
319 continue;
320 }
321 if let Some(ref bound_vids) = bound_target_vids
323 && !bound_vids.is_null(row_idx as usize)
324 && target_vid.as_u64() != bound_vids.value(row_idx as usize)
325 {
326 continue;
327 }
328 candidates.push((row_idx, target_vid));
329 }
330 }
331 }
332
333 if !candidates.is_empty() {
335 let unique_vids: Vec<Vid> = {
336 let mut v: Vec<Vid> = candidates.iter().map(|c| c.1).collect();
337 v.sort_unstable();
338 v.dedup();
339 v
340 };
341
342 let prop_names: Vec<&str> =
343 resolved_preds.iter().map(|(n, _)| n.as_str()).collect();
344
345 let props_map = std::thread::scope(|s| {
346 s.spawn(|| {
347 let rt = tokio::runtime::Builder::new_current_thread()
348 .enable_all()
349 .build()
350 .map_err(|e| {
351 DataFusionError::Execution(format!(
352 "Runtime creation failed: {e}"
353 ))
354 })?;
355 rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
356 &unique_vids,
357 &prop_names,
358 Some(&query_ctx),
359 ))
360 .map_err(|e| {
361 DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
362 })
363 })
364 .join()
365 .unwrap_or_else(|_| {
366 Err(DataFusionError::Execution(
367 "Vertex prop load thread panicked".to_string(),
368 ))
369 })
370 })?;
371
372 for (row_idx, target_vid) in &candidates {
373 if result[*row_idx as usize] {
374 continue;
375 }
376
377 let matches = if let Some(props) = props_map.get(target_vid) {
378 resolved_preds
379 .iter()
380 .all(|(name, expected)| match props.get(name) {
381 Some(actual) => actual == expected,
382 None => matches!(expected, Value::Null),
383 })
384 } else {
385 resolved_preds
386 .iter()
387 .all(|(_, expected)| matches!(expected, Value::Null))
388 };
389
390 if matches {
391 if is_last_step {
392 result[*row_idx as usize] = true;
393 } else {
394 next_frontier.push((*row_idx, target_vid.as_u64()));
395 }
396 }
397 }
398 }
399 } else {
400 for &(row_idx, src_vid_u64) in &frontier {
402 if result[row_idx as usize] {
403 continue;
404 }
405 let vid = Vid::from(src_vid_u64);
406 let mut found = false;
407 let mut seen_eids = std::collections::HashSet::new();
408
409 let expected_target: Option<u64> = bound_target_vids.as_ref().and_then(|bv| {
411 if bv.is_null(row_idx as usize) {
412 None
413 } else {
414 Some(bv.value(row_idx as usize))
415 }
416 });
417
418 'edge_types: for &edge_type in &step.edge_type_ids {
419 let neighbors =
420 self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
421
422 for (target_vid, eid) in neighbors {
423 if is_undirected && !seen_eids.insert(eid.as_u64()) {
424 continue;
425 }
426 if !passes_label_filter(target_vid) {
427 continue;
428 }
429 if let Some(expected) = expected_target
431 && target_vid.as_u64() != expected
432 {
433 continue;
434 }
435
436 if is_last_step {
437 found = true;
438 break 'edge_types;
439 } else {
440 next_frontier.push((row_idx, target_vid.as_u64()));
441 }
442 }
443 }
444
445 if found {
446 result[row_idx as usize] = true;
447 }
448 }
449 }
450
451 frontier = next_frontier;
452 }
453
454 let bool_array = BooleanArray::from(result);
456 Ok(ColumnarValue::Array(Arc::new(bool_array)))
457 }
458
459 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460 write!(f, "PatternExists({})", self.anchor_column)
461 }
462
463 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
464 vec![]
465 }
466
467 fn with_new_children(
468 self: Arc<Self>,
469 _children: Vec<Arc<dyn PhysicalExpr>>,
470 ) -> DFResult<Arc<dyn PhysicalExpr>> {
471 Ok(self)
472 }
473}
474
475pub fn extract_pattern_from_exists_query(query: &Query) -> anyhow::Result<Pattern> {
485 match query {
486 Query::Single(stmt) if stmt.clauses.len() == 1 => {
487 if let uni_cypher::ast::Clause::Match(m) = &stmt.clauses[0] {
488 for path in &m.pattern.paths {
490 for elem in &path.elements {
491 if let PatternElement::Relationship(rel) = elem
492 && rel.range.is_some()
493 {
494 anyhow::bail!(
495 "Variable-length paths in pattern predicates require subquery evaluation"
496 );
497 }
498 }
499 }
500 Ok(m.pattern.clone())
501 } else {
502 anyhow::bail!("Expected Match clause in pattern predicate EXISTS query")
503 }
504 }
505 _ => anyhow::bail!("Pattern predicate EXISTS query has unexpected structure"),
506 }
507}
508
509pub fn extract_target_property_predicates(
519 pattern: &Pattern,
520 steps: &[TraversalStep],
521 _uni_schema: &UniSchema,
522) -> anyhow::Result<Vec<Vec<PropertyPredicate>>> {
523 if pattern.paths.is_empty() {
524 return Ok(vec![Vec::new(); steps.len()]);
525 }
526
527 let elements = &pattern.paths[0].elements;
528
529 let anchor_idx = elements
532 .iter()
533 .position(|e| matches!(e, PatternElement::Node(_)))
534 .unwrap_or(0);
535
536 let mut result = Vec::with_capacity(steps.len());
537
538 for step_i in 0..steps.len() {
539 let target_elem_idx = anchor_idx + 2 * (step_i + 1);
541 let preds = if target_elem_idx < elements.len() {
542 if let PatternElement::Node(node) = &elements[target_elem_idx] {
543 extract_node_property_predicates(node)?
544 } else {
545 Vec::new()
546 }
547 } else {
548 Vec::new()
549 };
550 result.push(preds);
551 }
552
553 Ok(result)
554}
555
556fn extract_node_property_predicates(
558 node: &uni_cypher::ast::NodePattern,
559) -> anyhow::Result<Vec<PropertyPredicate>> {
560 let Some(ref props_expr) = node.properties else {
561 return Ok(Vec::new());
562 };
563
564 let CypherExpr::Map(entries) = props_expr else {
565 anyhow::bail!("Node properties must be a map literal for pattern exists optimization");
566 };
567
568 let mut predicates = Vec::with_capacity(entries.len());
569 for (key, value_expr) in entries {
570 match value_expr {
571 CypherExpr::Parameter(param_name) => {
572 predicates.push(PropertyPredicate {
573 property_name: key.clone(),
574 param_name: Some(param_name.clone()),
575 literal_value: None,
576 });
577 }
578 CypherExpr::Literal(lit) => {
579 predicates.push(PropertyPredicate {
580 property_name: key.clone(),
581 param_name: None,
582 literal_value: Some(lit.to_value()),
583 });
584 }
585 _ => {
586 anyhow::bail!(
587 "Unsupported property value expression in pattern exists: {:?}",
588 value_expr
589 );
590 }
591 }
592 }
593
594 Ok(predicates)
595}