Skip to main content

uni_query/query/df_graph/
pattern_exists.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Vectorized EXISTS evaluation for pattern predicates in WHERE clauses.
5//!
6//! When a WHERE clause contains a bare pattern predicate like
7//! `(m)-[:SENT_BY]->(:Participant {name: $ename})`, the parser wraps it in
8//! `Expr::Exists { from_pattern_predicate: true }`. Instead of running a full
9//! plan-and-execute cycle per row (the generic `ExistsExecExpr` path), this
10//! module evaluates the pattern using batch CSR neighbor lookups — the same
11//! approach used by `PatternComprehensionExecExpr`.
12//!
13//! Returns a `BooleanArray` where each element indicates whether at least one
14//! matching path exists for that row's anchor node.
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::fmt::{self, Display, Formatter};
19use std::hash::Hash;
20use std::sync::Arc;
21
22use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
23use arrow_schema::{DataType, Schema};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::Vid;
29use uni_common::core::schema::Schema as UniSchema;
30use uni_common::value::Value;
31use uni_cypher::ast::{Expr as CypherExpr, Pattern, PatternElement, Query};
32use uni_store::storage::direction::Direction;
33
34use super::GraphExecutionContext;
35use super::pattern_comprehension::TraversalStep;
36use crate::query::df_graph::common::column_as_vid_array;
37
38/// A property equality predicate extracted from a node pattern's property map.
39///
40/// For `(:Label {name: $param})`, this stores `property_name = "name"` and
41/// `param_name = Some("param")`. For `(:Label {status: 'active'})`, it stores
42/// `literal_value = Some(Value::String("active"))`.
43#[derive(Debug, Clone)]
44pub struct PropertyPredicate {
45    /// Property name to check on the target node.
46    pub property_name: String,
47    /// Parameter name to resolve at evaluate time, if the value came from `$param`.
48    pub param_name: Option<String>,
49    /// Literal value known at compile time.
50    pub literal_value: Option<Value>,
51}
52
53/// Vectorized EXISTS expression for simple pattern predicates.
54///
55/// Evaluates `(anchor)-[:EDGE]->(:Label {props})` patterns using batch CSR
56/// lookups instead of per-row subquery execution.
57#[derive(Debug)]
58pub struct PatternExistsExecExpr {
59    /// Shared graph context for CSR lookups and property materialization.
60    graph_ctx: Arc<GraphExecutionContext>,
61    /// Column name for anchor VIDs (e.g., `"m._vid"`).
62    anchor_column: String,
63    /// Traversal steps describing each hop in the pattern.
64    traversal_steps: Vec<TraversalStep>,
65    /// Schema of the outer input batch.
66    input_schema: Arc<Schema>,
67    /// Property predicates per traversal step (indexed by step).
68    target_property_predicates: Vec<Vec<PropertyPredicate>>,
69    /// Per-step bound target VID column name, if the target variable is also in
70    /// the outer scope (e.g., `MATCH (n),(m) WHERE (n)-[:R]->(m)` — `m` is bound).
71    /// When `Some`, the step checks adjacency to that specific VID instead of
72    /// checking for the existence of *any* neighbor.
73    bound_target_columns: Vec<Option<String>>,
74    /// Parameters from the outer query for resolving `$param` references.
75    params: HashMap<String, Value>,
76}
77
78impl Clone for PatternExistsExecExpr {
79    fn clone(&self) -> Self {
80        Self {
81            graph_ctx: self.graph_ctx.clone(),
82            anchor_column: self.anchor_column.clone(),
83            traversal_steps: self.traversal_steps.clone(),
84            input_schema: self.input_schema.clone(),
85            target_property_predicates: self.target_property_predicates.clone(),
86            bound_target_columns: self.bound_target_columns.clone(),
87            params: self.params.clone(),
88        }
89    }
90}
91
92impl PatternExistsExecExpr {
93    /// Creates a new vectorized pattern exists expression.
94    pub fn new(
95        graph_ctx: Arc<GraphExecutionContext>,
96        anchor_column: String,
97        traversal_steps: Vec<TraversalStep>,
98        input_schema: Arc<Schema>,
99        target_property_predicates: Vec<Vec<PropertyPredicate>>,
100        bound_target_columns: Vec<Option<String>>,
101        params: HashMap<String, Value>,
102    ) -> Self {
103        Self {
104            graph_ctx,
105            anchor_column,
106            traversal_steps,
107            input_schema,
108            target_property_predicates,
109            bound_target_columns,
110            params,
111        }
112    }
113
114    /// Resolves a `PropertyPredicate` to a concrete `Value` at evaluate time.
115    fn resolve_predicate_value(&self, pred: &PropertyPredicate) -> Option<Value> {
116        if let Some(ref val) = pred.literal_value {
117            Some(val.clone())
118        } else if let Some(ref param_name) = pred.param_name {
119            self.params.get(param_name).cloned()
120        } else {
121            None
122        }
123    }
124}
125
126impl Display for PatternExistsExecExpr {
127    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
128        write!(
129            f,
130            "PatternExists(anchor={}, steps={})",
131            self.anchor_column,
132            self.traversal_steps.len()
133        )
134    }
135}
136
137impl PartialEq for PatternExistsExecExpr {
138    fn eq(&self, other: &Self) -> bool {
139        self.anchor_column == other.anchor_column && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
140    }
141}
142
143impl Eq for PatternExistsExecExpr {}
144
145impl Hash for PatternExistsExecExpr {
146    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
147        self.anchor_column.hash(state);
148        self.traversal_steps.len().hash(state);
149    }
150}
151
152impl PartialEq<dyn Any> for PatternExistsExecExpr {
153    fn eq(&self, other: &dyn Any) -> bool {
154        other
155            .downcast_ref::<Self>()
156            .map(|x| self == x)
157            .unwrap_or(false)
158    }
159}
160
161impl PhysicalExpr for PatternExistsExecExpr {
162    fn as_any(&self) -> &dyn Any {
163        self
164    }
165
166    fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
167        Ok(DataType::Boolean)
168    }
169
170    fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
171        Ok(true)
172    }
173
174    fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
175        let num_rows = batch.num_rows();
176
177        // Step 1: Extract anchor VIDs from the input batch.
178        let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
179            col
180        } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
181            batch.column_by_name(var_name).ok_or_else(|| {
182                DataFusionError::Execution(format!(
183                    "PatternExists: anchor column '{}' not found in batch schema: {:?}",
184                    self.anchor_column,
185                    batch
186                        .schema()
187                        .fields()
188                        .iter()
189                        .map(|f| f.name().as_str())
190                        .collect::<Vec<_>>()
191                ))
192            })?
193        } else {
194            return Err(DataFusionError::Execution(format!(
195                "PatternExists: anchor column '{}' not found in batch schema: {:?}",
196                self.anchor_column,
197                batch
198                    .schema()
199                    .fields()
200                    .iter()
201                    .map(|f| f.name().as_str())
202                    .collect::<Vec<_>>()
203            )));
204        };
205        let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
206        let anchor_vids: &UInt64Array = &anchor_vid_cow;
207
208        // Step 2: Warm CSR for all edge types in the traversal.
209        for step in &self.traversal_steps {
210            std::thread::scope(|s| {
211                s.spawn(|| {
212                    let rt = tokio::runtime::Builder::new_current_thread()
213                        .enable_all()
214                        .build()
215                        .map_err(|e| {
216                            DataFusionError::Execution(format!("Runtime creation failed: {e}"))
217                        })?;
218                    rt.block_on(
219                        self.graph_ctx
220                            .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
221                    )
222                    .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
223                })
224                .join()
225                .unwrap_or_else(|_| {
226                    Err(DataFusionError::Execution(
227                        "CSR warming thread panicked".to_string(),
228                    ))
229                })
230            })?;
231        }
232
233        // Step 3: Evaluate pattern existence per row using batch CSR lookups.
234        let mut result = vec![false; num_rows];
235        let query_ctx = self.graph_ctx.query_context();
236
237        // Build initial frontier: (row_index, vid) pairs for non-null anchors.
238        let mut frontier: Vec<(u32, u64)> = Vec::with_capacity(num_rows);
239        for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
240            if let Some(vid_u64) = vid_opt {
241                frontier.push((row_idx as u32, vid_u64));
242            }
243        }
244
245        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
246            if frontier.is_empty() {
247                break;
248            }
249
250            let is_last_step = step_idx == self.traversal_steps.len() - 1;
251            let has_property_preds = step_idx < self.target_property_predicates.len()
252                && !self.target_property_predicates[step_idx].is_empty();
253            let is_undirected = step.direction == Direction::Both;
254
255            // If the target variable is bound in the outer scope, extract its VIDs
256            // so we check adjacency to a specific node rather than any neighbor.
257            let bound_target_vids: Option<std::borrow::Cow<'_, UInt64Array>> =
258                if let Some(Some(col_name)) = self.bound_target_columns.get(step_idx) {
259                    let col = batch.column_by_name(col_name).or_else(|| {
260                        col_name
261                            .strip_suffix("._vid")
262                            .and_then(|v| batch.column_by_name(v))
263                    });
264                    col.map(|c| column_as_vid_array(c.as_ref())).transpose()?
265                } else {
266                    None
267                };
268
269            // Resolve property predicate values for this step.
270            let resolved_preds: Vec<(String, Value)> = if has_property_preds {
271                self.target_property_predicates[step_idx]
272                    .iter()
273                    .filter_map(|p| {
274                        self.resolve_predicate_value(p)
275                            .map(|v| (p.property_name.clone(), v))
276                    })
277                    .collect()
278            } else {
279                Vec::new()
280            };
281
282            // Expand neighbors for each frontier entry.
283            let mut next_frontier: Vec<(u32, u64)> = Vec::new();
284
285            // Helper closure: check if a target VID passes the label filter.
286            // Resolves labels from the L0 chain then the persisted index, so
287            // Lance-only vertices (e.g. on a fork) are matched correctly.
288            let passes_label_filter = |target_vid: Vid| -> bool {
289                if let Some(ref label_name) = step.target_label_name
290                    && let Some(vertex_labels) =
291                        self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
292                    && !vertex_labels.contains(label_name)
293                {
294                    return false;
295                }
296                true
297            };
298
299            if !resolved_preds.is_empty() {
300                // Property predicates present: collect candidates, batch-load, filter.
301                let mut candidates: Vec<(u32, Vid)> = Vec::new();
302
303                for &(row_idx, src_vid_u64) in &frontier {
304                    if result[row_idx as usize] {
305                        continue;
306                    }
307                    let vid = Vid::from(src_vid_u64);
308                    let mut seen_eids = std::collections::HashSet::new();
309
310                    for &edge_type in &step.edge_type_ids {
311                        let neighbors =
312                            self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
313
314                        for (target_vid, eid) in neighbors {
315                            if is_undirected && !seen_eids.insert(eid.as_u64()) {
316                                continue;
317                            }
318                            if !passes_label_filter(target_vid) {
319                                continue;
320                            }
321                            // Bound target check: skip neighbors that aren't the expected VID.
322                            if let Some(ref bound_vids) = bound_target_vids
323                                && !bound_vids.is_null(row_idx as usize)
324                                && target_vid.as_u64() != bound_vids.value(row_idx as usize)
325                            {
326                                continue;
327                            }
328                            candidates.push((row_idx, target_vid));
329                        }
330                    }
331                }
332
333                // Batch-load properties for all candidate target VIDs.
334                if !candidates.is_empty() {
335                    let unique_vids: Vec<Vid> = {
336                        let mut v: Vec<Vid> = candidates.iter().map(|c| c.1).collect();
337                        v.sort_unstable();
338                        v.dedup();
339                        v
340                    };
341
342                    let prop_names: Vec<&str> =
343                        resolved_preds.iter().map(|(n, _)| n.as_str()).collect();
344
345                    let props_map = std::thread::scope(|s| {
346                        s.spawn(|| {
347                            let rt = tokio::runtime::Builder::new_current_thread()
348                                .enable_all()
349                                .build()
350                                .map_err(|e| {
351                                    DataFusionError::Execution(format!(
352                                        "Runtime creation failed: {e}"
353                                    ))
354                                })?;
355                            rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
356                                &unique_vids,
357                                &prop_names,
358                                Some(&query_ctx),
359                            ))
360                            .map_err(|e| {
361                                DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
362                            })
363                        })
364                        .join()
365                        .unwrap_or_else(|_| {
366                            Err(DataFusionError::Execution(
367                                "Vertex prop load thread panicked".to_string(),
368                            ))
369                        })
370                    })?;
371
372                    for (row_idx, target_vid) in &candidates {
373                        if result[*row_idx as usize] {
374                            continue;
375                        }
376
377                        let matches = if let Some(props) = props_map.get(target_vid) {
378                            resolved_preds
379                                .iter()
380                                .all(|(name, expected)| match props.get(name) {
381                                    Some(actual) => actual == expected,
382                                    None => matches!(expected, Value::Null),
383                                })
384                        } else {
385                            resolved_preds
386                                .iter()
387                                .all(|(_, expected)| matches!(expected, Value::Null))
388                        };
389
390                        if matches {
391                            if is_last_step {
392                                result[*row_idx as usize] = true;
393                            } else {
394                                next_frontier.push((*row_idx, target_vid.as_u64()));
395                            }
396                        }
397                    }
398                }
399            } else {
400                // No property predicates — CSR expansion with label + bound target filter.
401                for &(row_idx, src_vid_u64) in &frontier {
402                    if result[row_idx as usize] {
403                        continue;
404                    }
405                    let vid = Vid::from(src_vid_u64);
406                    let mut found = false;
407                    let mut seen_eids = std::collections::HashSet::new();
408
409                    // For bound targets, extract the expected VID for this row.
410                    let expected_target: Option<u64> = bound_target_vids.as_ref().and_then(|bv| {
411                        if bv.is_null(row_idx as usize) {
412                            None
413                        } else {
414                            Some(bv.value(row_idx as usize))
415                        }
416                    });
417
418                    'edge_types: for &edge_type in &step.edge_type_ids {
419                        let neighbors =
420                            self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
421
422                        for (target_vid, eid) in neighbors {
423                            if is_undirected && !seen_eids.insert(eid.as_u64()) {
424                                continue;
425                            }
426                            if !passes_label_filter(target_vid) {
427                                continue;
428                            }
429                            // Bound target check.
430                            if let Some(expected) = expected_target
431                                && target_vid.as_u64() != expected
432                            {
433                                continue;
434                            }
435
436                            if is_last_step {
437                                found = true;
438                                break 'edge_types;
439                            } else {
440                                next_frontier.push((row_idx, target_vid.as_u64()));
441                            }
442                        }
443                    }
444
445                    if found {
446                        result[row_idx as usize] = true;
447                    }
448                }
449            }
450
451            frontier = next_frontier;
452        }
453
454        // Step 4: Build BooleanArray from the result vector.
455        let bool_array = BooleanArray::from(result);
456        Ok(ColumnarValue::Array(Arc::new(bool_array)))
457    }
458
459    fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460        write!(f, "PatternExists({})", self.anchor_column)
461    }
462
463    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
464        vec![]
465    }
466
467    fn with_new_children(
468        self: Arc<Self>,
469        _children: Vec<Arc<dyn PhysicalExpr>>,
470    ) -> DFResult<Arc<dyn PhysicalExpr>> {
471        Ok(self)
472    }
473}
474
475// ═══════════════════════════════════════════════════════════════════════════════
476// Helper functions for extracting pattern structure from EXISTS queries
477// ═══════════════════════════════════════════════════════════════════════════════
478
479/// Extracts the `Pattern` from a pattern-predicate `Query`.
480///
481/// Pattern predicate queries are always `Query::Single` with exactly one
482/// `Clause::Match` containing the pattern. Returns `Err` if the structure
483/// doesn't match (triggers fallback to `ExistsExecExpr`).
484pub fn extract_pattern_from_exists_query(query: &Query) -> anyhow::Result<Pattern> {
485    match query {
486        Query::Single(stmt) if stmt.clauses.len() == 1 => {
487            if let uni_cypher::ast::Clause::Match(m) = &stmt.clauses[0] {
488                // Reject variable-length paths — they require full subquery execution.
489                for path in &m.pattern.paths {
490                    for elem in &path.elements {
491                        if let PatternElement::Relationship(rel) = elem
492                            && rel.range.is_some()
493                        {
494                            anyhow::bail!(
495                                "Variable-length paths in pattern predicates require subquery evaluation"
496                            );
497                        }
498                    }
499                }
500                Ok(m.pattern.clone())
501            } else {
502                anyhow::bail!("Expected Match clause in pattern predicate EXISTS query")
503            }
504        }
505        _ => anyhow::bail!("Pattern predicate EXISTS query has unexpected structure"),
506    }
507}
508
509/// Extracts property predicates from target nodes in the pattern.
510///
511/// Walks the pattern elements and, for each target node in each traversal step,
512/// extracts property map entries as `PropertyPredicate` values. Only literal
513/// values and `$param` references are supported; anything else triggers
514/// fallback.
515///
516/// The returned vec is indexed by step — `result[i]` contains predicates for
517/// step `i`'s target node.
518pub fn extract_target_property_predicates(
519    pattern: &Pattern,
520    steps: &[TraversalStep],
521    _uni_schema: &UniSchema,
522) -> anyhow::Result<Vec<Vec<PropertyPredicate>>> {
523    if pattern.paths.is_empty() {
524        return Ok(vec![Vec::new(); steps.len()]);
525    }
526
527    let elements = &pattern.paths[0].elements;
528
529    // Find the anchor index (first node whose variable is in an outer scope).
530    // We walk forward from the anchor: elements[anchor+1] = rel, elements[anchor+2] = target, etc.
531    let anchor_idx = elements
532        .iter()
533        .position(|e| matches!(e, PatternElement::Node(_)))
534        .unwrap_or(0);
535
536    let mut result = Vec::with_capacity(steps.len());
537
538    for step_i in 0..steps.len() {
539        // Target node is at anchor_idx + 2*(step_i+1)
540        let target_elem_idx = anchor_idx + 2 * (step_i + 1);
541        let preds = if target_elem_idx < elements.len() {
542            if let PatternElement::Node(node) = &elements[target_elem_idx] {
543                extract_node_property_predicates(node)?
544            } else {
545                Vec::new()
546            }
547        } else {
548            Vec::new()
549        };
550        result.push(preds);
551    }
552
553    Ok(result)
554}
555
556/// Extracts property predicates from a single `NodePattern`'s properties map.
557fn extract_node_property_predicates(
558    node: &uni_cypher::ast::NodePattern,
559) -> anyhow::Result<Vec<PropertyPredicate>> {
560    let Some(ref props_expr) = node.properties else {
561        return Ok(Vec::new());
562    };
563
564    let CypherExpr::Map(entries) = props_expr else {
565        anyhow::bail!("Node properties must be a map literal for pattern exists optimization");
566    };
567
568    let mut predicates = Vec::with_capacity(entries.len());
569    for (key, value_expr) in entries {
570        match value_expr {
571            CypherExpr::Parameter(param_name) => {
572                predicates.push(PropertyPredicate {
573                    property_name: key.clone(),
574                    param_name: Some(param_name.clone()),
575                    literal_value: None,
576                });
577            }
578            CypherExpr::Literal(lit) => {
579                predicates.push(PropertyPredicate {
580                    property_name: key.clone(),
581                    param_name: None,
582                    literal_value: Some(lit.to_value()),
583                });
584            }
585            _ => {
586                anyhow::bail!(
587                    "Unsupported property value expression in pattern exists: {:?}",
588                    value_expr
589                );
590            }
591        }
592    }
593
594    Ok(predicates)
595}