Skip to main content

uni_query/query/df_graph/
pattern_exists.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Vectorized EXISTS evaluation for pattern predicates in WHERE clauses.
5//!
6//! When a WHERE clause contains a bare pattern predicate like
7//! `(m)-[:SENT_BY]->(:Participant {name: $ename})`, the parser wraps it in
8//! `Expr::Exists { from_pattern_predicate: true }`. Instead of running a full
9//! plan-and-execute cycle per row (the generic `ExistsExecExpr` path), this
10//! module evaluates the pattern using batch CSR neighbor lookups — the same
11//! approach used by `PatternComprehensionExecExpr`.
12//!
13//! Returns a `BooleanArray` where each element indicates whether at least one
14//! matching path exists for that row's anchor node.
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Display, Formatter};
19use std::hash::Hash;
20use std::sync::Arc;
21
22use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
23use arrow_schema::{DataType, Schema};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::Vid;
29use uni_common::core::schema::Schema as UniSchema;
30use uni_common::value::Value;
31use uni_cypher::ast::{Expr as CypherExpr, Pattern, PatternElement, Query};
32use uni_store::runtime::l0_visibility;
33use uni_store::storage::direction::Direction;
34
35use super::GraphExecutionContext;
36use super::pattern_comprehension::TraversalStep;
37use crate::query::df_graph::common::column_as_vid_array;
38
39/// A property equality predicate extracted from a node pattern's property map.
40///
41/// For `(:Label {name: $param})`, this stores `property_name = "name"` and
42/// `param_name = Some("param")`. For `(:Label {status: 'active'})`, it stores
43/// `literal_value = Some(Value::String("active"))`.
44#[derive(Debug, Clone)]
45pub struct PropertyPredicate {
46    /// Property name to check on the target node.
47    pub property_name: String,
48    /// Parameter name to resolve at evaluate time, if the value came from `$param`.
49    pub param_name: Option<String>,
50    /// Literal value known at compile time.
51    pub literal_value: Option<Value>,
52}
53
54/// Vectorized EXISTS expression for simple pattern predicates.
55///
56/// Evaluates `(anchor)-[:EDGE]->(:Label {props})` patterns using batch CSR
57/// lookups instead of per-row subquery execution.
58#[derive(Debug)]
59pub struct PatternExistsExecExpr {
60    /// Shared graph context for CSR lookups and property materialization.
61    graph_ctx: Arc<GraphExecutionContext>,
62    /// Column name for anchor VIDs (e.g., `"m._vid"`).
63    anchor_column: String,
64    /// Traversal steps describing each hop in the pattern.
65    traversal_steps: Vec<TraversalStep>,
66    /// Schema of the outer input batch.
67    input_schema: Arc<Schema>,
68    /// Property predicates per traversal step (indexed by step).
69    target_property_predicates: Vec<Vec<PropertyPredicate>>,
70    /// Per-step bound target VID column name, if the target variable is also in
71    /// the outer scope (e.g., `MATCH (n),(m) WHERE (n)-[:R]->(m)` — `m` is bound).
72    /// When `Some`, the step checks adjacency to that specific VID instead of
73    /// checking for the existence of *any* neighbor.
74    bound_target_columns: Vec<Option<String>>,
75    /// Parameters from the outer query for resolving `$param` references.
76    params: HashMap<String, Value>,
77}
78
79impl Clone for PatternExistsExecExpr {
80    fn clone(&self) -> Self {
81        Self {
82            graph_ctx: self.graph_ctx.clone(),
83            anchor_column: self.anchor_column.clone(),
84            traversal_steps: self.traversal_steps.clone(),
85            input_schema: self.input_schema.clone(),
86            target_property_predicates: self.target_property_predicates.clone(),
87            bound_target_columns: self.bound_target_columns.clone(),
88            params: self.params.clone(),
89        }
90    }
91}
92
93impl PatternExistsExecExpr {
94    /// Creates a new vectorized pattern exists expression.
95    pub fn new(
96        graph_ctx: Arc<GraphExecutionContext>,
97        anchor_column: String,
98        traversal_steps: Vec<TraversalStep>,
99        input_schema: Arc<Schema>,
100        target_property_predicates: Vec<Vec<PropertyPredicate>>,
101        bound_target_columns: Vec<Option<String>>,
102        params: HashMap<String, Value>,
103    ) -> Self {
104        Self {
105            graph_ctx,
106            anchor_column,
107            traversal_steps,
108            input_schema,
109            target_property_predicates,
110            bound_target_columns,
111            params,
112        }
113    }
114
115    /// Resolves a `PropertyPredicate` to a concrete `Value` at evaluate time.
116    fn resolve_predicate_value(&self, pred: &PropertyPredicate) -> Option<Value> {
117        if let Some(ref val) = pred.literal_value {
118            Some(val.clone())
119        } else if let Some(ref param_name) = pred.param_name {
120            self.params.get(param_name).cloned()
121        } else {
122            None
123        }
124    }
125}
126
127impl Display for PatternExistsExecExpr {
128    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
129        write!(
130            f,
131            "PatternExists(anchor={}, steps={})",
132            self.anchor_column,
133            self.traversal_steps.len()
134        )
135    }
136}
137
138impl PartialEq for PatternExistsExecExpr {
139    fn eq(&self, other: &Self) -> bool {
140        self.anchor_column == other.anchor_column && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
141    }
142}
143
144impl Eq for PatternExistsExecExpr {}
145
146impl Hash for PatternExistsExecExpr {
147    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
148        self.anchor_column.hash(state);
149        self.traversal_steps.len().hash(state);
150    }
151}
152
153impl PartialEq<dyn Any> for PatternExistsExecExpr {
154    fn eq(&self, other: &dyn Any) -> bool {
155        other
156            .downcast_ref::<Self>()
157            .map(|x| self == x)
158            .unwrap_or(false)
159    }
160}
161
162impl PhysicalExpr for PatternExistsExecExpr {
163    fn as_any(&self) -> &dyn Any {
164        self
165    }
166
167    fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
168        Ok(DataType::Boolean)
169    }
170
171    fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
172        Ok(true)
173    }
174
175    fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
176        let num_rows = batch.num_rows();
177
178        // Step 1: Extract anchor VIDs from the input batch.
179        let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
180            col
181        } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
182            batch.column_by_name(var_name).ok_or_else(|| {
183                DataFusionError::Execution(format!(
184                    "PatternExists: anchor column '{}' not found in batch schema: {:?}",
185                    self.anchor_column,
186                    batch
187                        .schema()
188                        .fields()
189                        .iter()
190                        .map(|f| f.name().as_str())
191                        .collect::<Vec<_>>()
192                ))
193            })?
194        } else {
195            return Err(DataFusionError::Execution(format!(
196                "PatternExists: anchor column '{}' not found in batch schema: {:?}",
197                self.anchor_column,
198                batch
199                    .schema()
200                    .fields()
201                    .iter()
202                    .map(|f| f.name().as_str())
203                    .collect::<Vec<_>>()
204            )));
205        };
206        let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
207        let anchor_vids: &UInt64Array = &anchor_vid_cow;
208
209        // Step 2: Warm CSR for all edge types in the traversal.
210        for step in &self.traversal_steps {
211            std::thread::scope(|s| {
212                s.spawn(|| {
213                    let rt = tokio::runtime::Builder::new_current_thread()
214                        .enable_all()
215                        .build()
216                        .map_err(|e| {
217                            DataFusionError::Execution(format!("Runtime creation failed: {e}"))
218                        })?;
219                    rt.block_on(
220                        self.graph_ctx
221                            .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
222                    )
223                    .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
224                })
225                .join()
226                .unwrap_or_else(|_| {
227                    Err(DataFusionError::Execution(
228                        "CSR warming thread panicked".to_string(),
229                    ))
230                })
231            })?;
232        }
233
234        // Step 3: Evaluate pattern existence per row using batch CSR lookups.
235        let mut result = vec![false; num_rows];
236        let query_ctx = self.graph_ctx.query_context();
237
238        // Build initial frontier: (row_index, vid) pairs for non-null anchors.
239        let mut frontier: Vec<(u32, u64)> = Vec::with_capacity(num_rows);
240        for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
241            if let Some(vid_u64) = vid_opt {
242                frontier.push((row_idx as u32, vid_u64));
243            }
244        }
245
246        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
247            if frontier.is_empty() {
248                break;
249            }
250
251            let is_last_step = step_idx == self.traversal_steps.len() - 1;
252            let has_property_preds = step_idx < self.target_property_predicates.len()
253                && !self.target_property_predicates[step_idx].is_empty();
254            let is_undirected = step.direction == Direction::Both;
255
256            // If the target variable is bound in the outer scope, extract its VIDs
257            // so we check adjacency to a specific node rather than any neighbor.
258            let bound_target_vids: Option<std::borrow::Cow<'_, UInt64Array>> =
259                if let Some(Some(col_name)) = self.bound_target_columns.get(step_idx) {
260                    let col = batch.column_by_name(col_name).or_else(|| {
261                        col_name
262                            .strip_suffix("._vid")
263                            .and_then(|v| batch.column_by_name(v))
264                    });
265                    col.map(|c| column_as_vid_array(c.as_ref())).transpose()?
266                } else {
267                    None
268                };
269
270            // Resolve property predicate values for this step.
271            let resolved_preds: Vec<(String, Value)> = if has_property_preds {
272                self.target_property_predicates[step_idx]
273                    .iter()
274                    .filter_map(|p| {
275                        self.resolve_predicate_value(p)
276                            .map(|v| (p.property_name.clone(), v))
277                    })
278                    .collect()
279            } else {
280                Vec::new()
281            };
282
283            // Expand neighbors for each frontier entry.
284            let mut next_frontier: Vec<(u32, u64)> = Vec::new();
285
286            // Helper closure: check if a target VID passes the label filter.
287            let passes_label_filter = |target_vid: Vid| -> bool {
288                if let Some(ref label_name) = step.target_label_name
289                    && let Some(vertex_labels) =
290                        l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
291                    && !vertex_labels.contains(label_name)
292                {
293                    return false;
294                }
295                true
296            };
297
298            if !resolved_preds.is_empty() {
299                // Property predicates present: collect candidates, batch-load, filter.
300                let mut candidates: Vec<(u32, Vid)> = Vec::new();
301
302                for &(row_idx, src_vid_u64) in &frontier {
303                    if result[row_idx as usize] {
304                        continue;
305                    }
306                    let vid = Vid::from(src_vid_u64);
307                    let mut seen_eids = std::collections::HashSet::new();
308
309                    for &edge_type in &step.edge_type_ids {
310                        let neighbors =
311                            self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
312
313                        for (target_vid, eid) in neighbors {
314                            if is_undirected && !seen_eids.insert(eid.as_u64()) {
315                                continue;
316                            }
317                            if !passes_label_filter(target_vid) {
318                                continue;
319                            }
320                            // Bound target check: skip neighbors that aren't the expected VID.
321                            if let Some(ref bound_vids) = bound_target_vids
322                                && !bound_vids.is_null(row_idx as usize)
323                                && target_vid.as_u64() != bound_vids.value(row_idx as usize)
324                            {
325                                continue;
326                            }
327                            candidates.push((row_idx, target_vid));
328                        }
329                    }
330                }
331
332                // Batch-load properties for all candidate target VIDs.
333                if !candidates.is_empty() {
334                    let unique_vids: Vec<Vid> = {
335                        let mut v: Vec<Vid> = candidates.iter().map(|c| c.1).collect();
336                        v.sort_unstable();
337                        v.dedup();
338                        v
339                    };
340
341                    let prop_names: Vec<&str> =
342                        resolved_preds.iter().map(|(n, _)| n.as_str()).collect();
343
344                    let props_map = std::thread::scope(|s| {
345                        s.spawn(|| {
346                            let rt = tokio::runtime::Builder::new_current_thread()
347                                .enable_all()
348                                .build()
349                                .map_err(|e| {
350                                    DataFusionError::Execution(format!(
351                                        "Runtime creation failed: {e}"
352                                    ))
353                                })?;
354                            rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
355                                &unique_vids,
356                                &prop_names,
357                                Some(&query_ctx),
358                            ))
359                            .map_err(|e| {
360                                DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
361                            })
362                        })
363                        .join()
364                        .unwrap_or_else(|_| {
365                            Err(DataFusionError::Execution(
366                                "Vertex prop load thread panicked".to_string(),
367                            ))
368                        })
369                    })?;
370
371                    for (row_idx, target_vid) in &candidates {
372                        if result[*row_idx as usize] {
373                            continue;
374                        }
375
376                        let matches = if let Some(props) = props_map.get(target_vid) {
377                            resolved_preds
378                                .iter()
379                                .all(|(name, expected)| match props.get(name) {
380                                    Some(actual) => actual == expected,
381                                    None => matches!(expected, Value::Null),
382                                })
383                        } else {
384                            resolved_preds
385                                .iter()
386                                .all(|(_, expected)| matches!(expected, Value::Null))
387                        };
388
389                        if matches {
390                            if is_last_step {
391                                result[*row_idx as usize] = true;
392                            } else {
393                                next_frontier.push((*row_idx, target_vid.as_u64()));
394                            }
395                        }
396                    }
397                }
398            } else {
399                // No property predicates — CSR expansion with label + bound target filter.
400                for &(row_idx, src_vid_u64) in &frontier {
401                    if result[row_idx as usize] {
402                        continue;
403                    }
404                    let vid = Vid::from(src_vid_u64);
405                    let mut found = false;
406                    let mut seen_eids = std::collections::HashSet::new();
407
408                    // For bound targets, extract the expected VID for this row.
409                    let expected_target: Option<u64> = bound_target_vids.as_ref().and_then(|bv| {
410                        if bv.is_null(row_idx as usize) {
411                            None
412                        } else {
413                            Some(bv.value(row_idx as usize))
414                        }
415                    });
416
417                    'edge_types: for &edge_type in &step.edge_type_ids {
418                        let neighbors =
419                            self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
420
421                        for (target_vid, eid) in neighbors {
422                            if is_undirected && !seen_eids.insert(eid.as_u64()) {
423                                continue;
424                            }
425                            if !passes_label_filter(target_vid) {
426                                continue;
427                            }
428                            // Bound target check.
429                            if let Some(expected) = expected_target
430                                && target_vid.as_u64() != expected
431                            {
432                                continue;
433                            }
434
435                            if is_last_step {
436                                found = true;
437                                break 'edge_types;
438                            } else {
439                                next_frontier.push((row_idx, target_vid.as_u64()));
440                            }
441                        }
442                    }
443
444                    if found {
445                        result[row_idx as usize] = true;
446                    }
447                }
448            }
449
450            frontier = next_frontier;
451        }
452
453        // Step 4: Build BooleanArray from the result vector.
454        let bool_array = BooleanArray::from(result);
455        Ok(ColumnarValue::Array(Arc::new(bool_array)))
456    }
457
458    fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459        write!(f, "PatternExists({})", self.anchor_column)
460    }
461
462    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
463        vec![]
464    }
465
466    fn with_new_children(
467        self: Arc<Self>,
468        _children: Vec<Arc<dyn PhysicalExpr>>,
469    ) -> DFResult<Arc<dyn PhysicalExpr>> {
470        Ok(self)
471    }
472}
473
474// ═══════════════════════════════════════════════════════════════════════════════
475// Helper functions for extracting pattern structure from EXISTS queries
476// ═══════════════════════════════════════════════════════════════════════════════
477
478/// Extracts the `Pattern` from a pattern-predicate `Query`.
479///
480/// Pattern predicate queries are always `Query::Single` with exactly one
481/// `Clause::Match` containing the pattern. Returns `Err` if the structure
482/// doesn't match (triggers fallback to `ExistsExecExpr`).
483pub fn extract_pattern_from_exists_query(query: &Query) -> anyhow::Result<Pattern> {
484    match query {
485        Query::Single(stmt) if stmt.clauses.len() == 1 => {
486            if let uni_cypher::ast::Clause::Match(m) = &stmt.clauses[0] {
487                // Reject variable-length paths — they require full subquery execution.
488                for path in &m.pattern.paths {
489                    for elem in &path.elements {
490                        if let PatternElement::Relationship(rel) = elem
491                            && rel.range.is_some()
492                        {
493                            anyhow::bail!(
494                                "Variable-length paths in pattern predicates require subquery evaluation"
495                            );
496                        }
497                    }
498                }
499                Ok(m.pattern.clone())
500            } else {
501                anyhow::bail!("Expected Match clause in pattern predicate EXISTS query")
502            }
503        }
504        _ => anyhow::bail!("Pattern predicate EXISTS query has unexpected structure"),
505    }
506}
507
508/// Extracts property predicates from target nodes in the pattern.
509///
510/// Walks the pattern elements and, for each target node in each traversal step,
511/// extracts property map entries as `PropertyPredicate` values. Only literal
512/// values and `$param` references are supported; anything else triggers
513/// fallback.
514///
515/// The returned vec is indexed by step — `result[i]` contains predicates for
516/// step `i`'s target node.
517pub fn extract_target_property_predicates(
518    pattern: &Pattern,
519    steps: &[TraversalStep],
520    _uni_schema: &UniSchema,
521) -> anyhow::Result<Vec<Vec<PropertyPredicate>>> {
522    if pattern.paths.is_empty() {
523        return Ok(vec![Vec::new(); steps.len()]);
524    }
525
526    let elements = &pattern.paths[0].elements;
527
528    // Find the anchor index (first node whose variable is in an outer scope).
529    // We walk forward from the anchor: elements[anchor+1] = rel, elements[anchor+2] = target, etc.
530    let anchor_idx = elements
531        .iter()
532        .position(|e| matches!(e, PatternElement::Node(_)))
533        .unwrap_or(0);
534
535    let mut result = Vec::with_capacity(steps.len());
536
537    for step_i in 0..steps.len() {
538        // Target node is at anchor_idx + 2*(step_i+1)
539        let target_elem_idx = anchor_idx + 2 * (step_i + 1);
540        let preds = if target_elem_idx < elements.len() {
541            if let PatternElement::Node(node) = &elements[target_elem_idx] {
542                extract_node_property_predicates(node)?
543            } else {
544                Vec::new()
545            }
546        } else {
547            Vec::new()
548        };
549        result.push(preds);
550    }
551
552    Ok(result)
553}
554
555/// Extracts property predicates from a single `NodePattern`'s properties map.
556fn extract_node_property_predicates(
557    node: &uni_cypher::ast::NodePattern,
558) -> anyhow::Result<Vec<PropertyPredicate>> {
559    let Some(ref props_expr) = node.properties else {
560        return Ok(Vec::new());
561    };
562
563    let CypherExpr::Map(entries) = props_expr else {
564        anyhow::bail!("Node properties must be a map literal for pattern exists optimization");
565    };
566
567    let mut predicates = Vec::with_capacity(entries.len());
568    for (key, value_expr) in entries {
569        match value_expr {
570            CypherExpr::Parameter(param_name) => {
571                predicates.push(PropertyPredicate {
572                    property_name: key.clone(),
573                    param_name: Some(param_name.clone()),
574                    literal_value: None,
575                });
576            }
577            CypherExpr::Literal(lit) => {
578                predicates.push(PropertyPredicate {
579                    property_name: key.clone(),
580                    param_name: None,
581                    literal_value: Some(lit.to_value()),
582                });
583            }
584            _ => {
585                anyhow::bail!(
586                    "Unsupported property value expression in pattern exists: {:?}",
587                    value_expr
588                );
589            }
590        }
591    }
592
593    Ok(predicates)
594}