Skip to main content

uni_query/query/df_graph/
pattern_comprehension.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Vectorized pattern comprehension for Cypher `[(a)-[:REL]->(b) WHERE pred | expr]`.
5//!
6//! Implements a `PhysicalExpr` that:
7//! 1. Extracts anchor VIDs from the input batch
8//! 2. Expands neighbors via CSR adjacency (synchronous)
9//! 3. Materializes needed properties via `block_in_place`
10//! 4. Applies optional predicate filter
11//! 5. Evaluates the map expression
12//! 6. Reconstructs a `LargeList` column grouped by parent row
13
14use std::any::Any;
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display, Formatter};
17use std::hash::Hash;
18use std::sync::Arc;
19
20use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt32Array, UInt64Array};
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
23use datafusion::arrow::compute::{cast, filter, filter_record_batch, take};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema::Schema as UniSchema;
30use uni_cypher::ast::{
31    Direction as AstDirection, Expr, NodePattern, Pattern, PatternElement, RelationshipPattern,
32};
33use uni_store::QueryContext;
34use uni_store::storage::direction::Direction;
35
36use super::GraphExecutionContext;
37use crate::query::df_graph::common::{build_path_struct_field, column_as_vid_array};
38use crate::query::df_graph::scan::build_property_column_static;
39
40/// A single hop derived from the pattern's elements.
41#[derive(Debug, Clone)]
42pub struct TraversalStep {
43    /// Resolved edge type IDs for this hop.
44    pub edge_type_ids: Vec<u32>,
45    /// Direction of the hop.
46    pub direction: Direction,
47    /// Variable name for the target node, if any.
48    pub target_variable: Option<String>,
49    /// Label filter for the target node, if any.
50    pub target_label_name: Option<String>,
51    /// Variable name for the edge, if any.
52    pub edge_variable: Option<String>,
53}
54
55/// Physical expression for Cypher Pattern Comprehension:
56/// `[(a)-[:REL]->(b) WHERE pred | expr]`
57#[derive(Debug)]
58pub struct PatternComprehensionExecExpr {
59    /// Shared graph context for CSR lookups and property materialization.
60    graph_ctx: Arc<GraphExecutionContext>,
61    /// Column name for anchor VIDs (e.g., `"a._vid"`).
62    anchor_column: String,
63    /// Steps describing each hop in the pattern.
64    traversal_steps: Vec<TraversalStep>,
65    /// Optional path variable name.
66    path_variable: Option<String>,
67    /// Optional filter predicate compiled against inner schema.
68    predicate: Option<Arc<dyn PhysicalExpr>>,
69    /// Map expression compiled against inner schema.
70    map_expr: Arc<dyn PhysicalExpr>,
71    /// Schema of the outer input batch.
72    input_schema: Arc<Schema>,
73    /// Schema of the inner (expanded) batch — outer + pattern bindings + properties.
74    inner_schema: Arc<Schema>,
75    /// Data type of items in the output list (result of map_expr).
76    output_item_type: DataType,
77    /// Vertex properties needed per variable: variable → [prop_names].
78    needed_vertex_props: HashMap<String, Vec<String>>,
79    /// Edge properties needed per variable: variable → [prop_names].
80    needed_edge_props: HashMap<String, Vec<String>>,
81}
82
83impl Clone for PatternComprehensionExecExpr {
84    fn clone(&self) -> Self {
85        Self {
86            graph_ctx: self.graph_ctx.clone(),
87            anchor_column: self.anchor_column.clone(),
88            traversal_steps: self.traversal_steps.clone(),
89            path_variable: self.path_variable.clone(),
90            predicate: self.predicate.clone(),
91            map_expr: self.map_expr.clone(),
92            input_schema: self.input_schema.clone(),
93            inner_schema: self.inner_schema.clone(),
94            output_item_type: self.output_item_type.clone(),
95            needed_vertex_props: self.needed_vertex_props.clone(),
96            needed_edge_props: self.needed_edge_props.clone(),
97        }
98    }
99}
100
101impl PatternComprehensionExecExpr {
102    #[expect(clippy::too_many_arguments, reason = "Constructor for complex expr")]
103    pub fn new(
104        graph_ctx: Arc<GraphExecutionContext>,
105        anchor_column: String,
106        traversal_steps: Vec<TraversalStep>,
107        path_variable: Option<String>,
108        predicate: Option<Arc<dyn PhysicalExpr>>,
109        map_expr: Arc<dyn PhysicalExpr>,
110        input_schema: Arc<Schema>,
111        inner_schema: Arc<Schema>,
112        output_item_type: DataType,
113        needed_vertex_props: HashMap<String, Vec<String>>,
114        needed_edge_props: HashMap<String, Vec<String>>,
115    ) -> Self {
116        Self {
117            graph_ctx,
118            anchor_column,
119            traversal_steps,
120            path_variable,
121            predicate,
122            map_expr,
123            input_schema,
124            inner_schema,
125            output_item_type,
126            needed_vertex_props,
127            needed_edge_props,
128        }
129    }
130}
131
132impl Display for PatternComprehensionExecExpr {
133    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
134        write!(
135            f,
136            "PatternComprehension(anchor={}, steps={})",
137            self.anchor_column,
138            self.traversal_steps.len()
139        )
140    }
141}
142
143impl PartialEq for PatternComprehensionExecExpr {
144    fn eq(&self, other: &Self) -> bool {
145        self.anchor_column == other.anchor_column
146            && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
147            && Arc::ptr_eq(&self.map_expr, &other.map_expr)
148            && match (&self.predicate, &other.predicate) {
149                (Some(a), Some(b)) => Arc::ptr_eq(a, b),
150                (None, None) => true,
151                _ => false,
152            }
153    }
154}
155
156impl Eq for PatternComprehensionExecExpr {}
157
158impl Hash for PatternComprehensionExecExpr {
159    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
160        self.anchor_column.hash(state);
161        self.output_item_type.hash(state);
162    }
163}
164
165impl PartialEq<dyn Any> for PatternComprehensionExecExpr {
166    fn eq(&self, other: &dyn Any) -> bool {
167        other
168            .downcast_ref::<Self>()
169            .map(|x| self == x)
170            .unwrap_or(false)
171    }
172}
173
174impl PhysicalExpr for PatternComprehensionExecExpr {
175    fn as_any(&self) -> &dyn Any {
176        self
177    }
178
179    fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
180        Ok(DataType::LargeList(Arc::new(Field::new(
181            "item",
182            self.output_item_type.clone(),
183            true,
184        ))))
185    }
186
187    fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
188        Ok(true)
189    }
190
191    fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
192        let num_rows = batch.num_rows();
193
194        // Step 1: Extract anchor VIDs
195        let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
196            col
197        } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
198            batch.column_by_name(var_name).ok_or_else(|| {
199                DataFusionError::Execution(format!(
200                    "Anchor column '{}' not found in batch schema: {:?}",
201                    self.anchor_column,
202                    batch
203                        .schema()
204                        .fields()
205                        .iter()
206                        .map(|f| f.name().as_str())
207                        .collect::<Vec<_>>()
208                ))
209            })?
210        } else {
211            return Err(DataFusionError::Execution(format!(
212                "Anchor column '{}' not found in batch schema: {:?}",
213                self.anchor_column,
214                batch
215                    .schema()
216                    .fields()
217                    .iter()
218                    .map(|f| f.name().as_str())
219                    .collect::<Vec<_>>()
220            )));
221        };
222        let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
223        let anchor_vids: &UInt64Array = &anchor_vid_cow;
224
225        // Step 2: CSR expansion
226        // Warm CSR for all edge types and directions
227        for step in &self.traversal_steps {
228            log::debug!(
229                "PatternComprehension: warming CSR for edge_type_ids={:?}, direction={:?}",
230                step.edge_type_ids,
231                step.direction
232            );
233            std::thread::scope(|s| {
234                s.spawn(|| {
235                    let rt = tokio::runtime::Builder::new_current_thread()
236                        .enable_all()
237                        .build()
238                        .map_err(|e| {
239                            DataFusionError::Execution(format!("Runtime creation failed: {e}"))
240                        })?;
241                    rt.block_on(
242                        self.graph_ctx
243                            .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
244                    )
245                    .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
246                })
247                .join()
248                .unwrap_or_else(|_| {
249                    Err(DataFusionError::Execution(
250                        "CSR warming thread panicked".to_string(),
251                    ))
252                })
253            })?;
254        }
255
256        log::debug!(
257            "PatternComprehension: expanding {} anchor VIDs, steps={}",
258            anchor_vids.len(),
259            self.traversal_steps.len()
260        );
261
262        // Expand: for each anchor VID, traverse the pattern and collect results.
263        // For single-hop, this is straightforward. For multi-hop, chain expansions.
264        let expansion = self.expand_pattern(anchor_vids)?;
265
266        log::debug!(
267            "PatternComprehension: expansion produced {} rows",
268            expansion.row_indices.len()
269        );
270
271        // Handle empty expansion: produce LargeList of empty lists
272        if expansion.row_indices.is_empty() {
273            return self.build_empty_list_result(num_rows);
274        }
275
276        // Step 3: Build flat inner batch
277        let indices_array = UInt32Array::from(expansion.row_indices.clone());
278        let mut inner_columns: Vec<ArrayRef> = Vec::new();
279
280        // Replicate outer columns
281        for col in batch.columns() {
282            inner_columns.push(take(col, &indices_array, None)?);
283        }
284
285        // Add target VID columns for each step
286        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
287            if let Some(ref _target_var) = step.target_variable {
288                inner_columns.push(Arc::new(UInt64Array::from(
289                    expansion.step_target_vids[step_idx].clone(),
290                )));
291            }
292            if let Some(ref _edge_var) = step.edge_variable {
293                inner_columns.push(Arc::new(UInt64Array::from(
294                    expansion.step_edge_ids[step_idx].clone(),
295                )));
296            }
297        }
298
299        // Step 3b: Property materialization
300        let query_ctx = self.graph_ctx.query_context();
301        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
302            // Vertex properties
303            if let Some(ref target_var) = step.target_variable
304                && let Some(props) = self.needed_vertex_props.get(target_var)
305            {
306                let vids: Vec<Vid> = expansion.step_target_vids[step_idx]
307                    .iter()
308                    .map(|v| Vid::from(*v))
309                    .collect();
310
311                let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
312
313                let props_map = std::thread::scope(|s| {
314                    s.spawn(|| {
315                        let rt = tokio::runtime::Builder::new_current_thread()
316                            .enable_all()
317                            .build()
318                            .map_err(|e| {
319                                DataFusionError::Execution(format!("Runtime creation failed: {e}"))
320                            })?;
321                        rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
322                            &vids,
323                            &prop_refs,
324                            Some(&query_ctx),
325                        ))
326                        .map_err(|e| {
327                            DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
328                        })
329                    })
330                    .join()
331                    .unwrap_or_else(|_| {
332                        Err(DataFusionError::Execution(
333                            "Vertex prop load thread panicked".to_string(),
334                        ))
335                    })
336                })?;
337
338                for prop in props {
339                    let col = build_property_column_static(
340                        &vids,
341                        &props_map,
342                        prop,
343                        &DataType::LargeBinary,
344                    )?;
345                    inner_columns.push(col);
346                }
347            }
348
349            // Edge properties
350            if let Some(ref edge_var) = step.edge_variable
351                && let Some(props) = self.needed_edge_props.get(edge_var)
352            {
353                let eids: Vec<Eid> = expansion.step_edge_ids[step_idx]
354                    .iter()
355                    .map(|e| Eid::from(*e))
356                    .collect();
357
358                let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
359
360                let props_map = std::thread::scope(|s| {
361                    s.spawn(|| {
362                        let rt = tokio::runtime::Builder::new_current_thread()
363                            .enable_all()
364                            .build()
365                            .map_err(|e| {
366                                DataFusionError::Execution(format!("Runtime creation failed: {e}"))
367                            })?;
368                        rt.block_on(self.graph_ctx.property_manager().get_batch_edge_props(
369                            &eids,
370                            &prop_refs,
371                            Some(&query_ctx),
372                        ))
373                        .map_err(|e| {
374                            DataFusionError::Execution(format!("Edge prop load failed: {e}"))
375                        })
376                    })
377                    .join()
378                    .unwrap_or_else(|_| {
379                        Err(DataFusionError::Execution(
380                            "Edge prop load thread panicked".to_string(),
381                        ))
382                    })
383                })?;
384
385                // Edge props use Eid mapped to Vid keys in the HashMap
386                let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
387                for prop in props {
388                    let col = build_property_column_static(
389                        &vid_keys,
390                        &props_map,
391                        prop,
392                        &DataType::LargeBinary,
393                    )?;
394                    inner_columns.push(col);
395                }
396            }
397        }
398
399        // Step 3c: Build path struct column if path variable is bound
400        if self.path_variable.is_some() {
401            let path_col = self.build_path_column(&expansion, anchor_vids, &query_ctx)?;
402            inner_columns.push(path_col);
403        }
404
405        let inner_batch = RecordBatch::try_new(self.inner_schema.clone(), inner_columns)?;
406
407        // Step 4: Filter
408        let (filtered_batch, filtered_indices) = if let Some(pred) = &self.predicate {
409            let mask = pred
410                .evaluate(&inner_batch)?
411                .into_array(inner_batch.num_rows())?;
412            let mask = cast(&mask, &DataType::Boolean)?;
413            let boolean_mask = mask
414                .as_any()
415                .downcast_ref::<BooleanArray>()
416                .ok_or_else(|| {
417                    DataFusionError::Execution(
418                        "Pattern comprehension predicate did not produce BooleanArray".to_string(),
419                    )
420                })?;
421
422            let filtered_batch = filter_record_batch(&inner_batch, boolean_mask)?;
423            let indices_array_ref: ArrayRef = Arc::new(indices_array.clone());
424            let filtered_idx = filter(&indices_array_ref, boolean_mask)?;
425            let filtered_idx = filtered_idx
426                .as_any()
427                .downcast_ref::<UInt32Array>()
428                .unwrap()
429                .clone();
430
431            (filtered_batch, filtered_idx)
432        } else {
433            (inner_batch, indices_array.clone())
434        };
435
436        // Step 5: Map
437        let mapped_val = self.map_expr.evaluate(&filtered_batch)?;
438        let mapped_array = mapped_val.into_array(filtered_batch.num_rows())?;
439
440        // Step 6: Reconstruct LargeList
441        let new_offsets = {
442            let mut offsets = Vec::with_capacity(num_rows + 1);
443            offsets.push(0i64);
444
445            let indices_slice = filtered_indices.values();
446            let mut pos = 0;
447            let mut current_len: i64 = 0;
448
449            for row_idx in 0..num_rows {
450                while pos < indices_slice.len() && indices_slice[pos] as usize == row_idx {
451                    pos += 1;
452                    current_len += 1;
453                }
454                offsets.push(current_len);
455            }
456            OffsetBuffer::new(ScalarBuffer::from(offsets))
457        };
458
459        let new_field = Arc::new(Field::new("item", mapped_array.data_type().clone(), true));
460        let new_list = datafusion::arrow::array::LargeListArray::new(
461            new_field,
462            new_offsets,
463            mapped_array,
464            None,
465        );
466
467        Ok(ColumnarValue::Array(Arc::new(new_list)))
468    }
469
470    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
471        // map_expr and predicate are compiled against inner schema;
472        // don't expose to DF tree traversal.
473        vec![]
474    }
475
476    fn with_new_children(
477        self: Arc<Self>,
478        children: Vec<Arc<dyn PhysicalExpr>>,
479    ) -> DFResult<Arc<dyn PhysicalExpr>> {
480        if !children.is_empty() {
481            return Err(DataFusionError::Internal(
482                "PatternComprehension has no children".to_string(),
483            ));
484        }
485        Ok(self)
486    }
487
488    fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489        write!(f, "PatternComprehension({})", self.anchor_column)
490    }
491}
492
493/// Intermediate result from pattern expansion.
494struct PatternExpansion {
495    /// Row index into the outer batch for each expanded row.
496    row_indices: Vec<u32>,
497    /// Anchor VID for each expanded row (source of the path).
498    anchor_vids: Vec<u64>,
499    /// Per-step target VIDs (parallel arrays with row_indices).
500    step_target_vids: Vec<Vec<u64>>,
501    /// Per-step edge IDs (parallel arrays with row_indices).
502    step_edge_ids: Vec<Vec<u64>>,
503    /// Per-step edge type IDs (parallel arrays with row_indices).
504    step_edge_type_ids: Vec<Vec<u32>>,
505}
506
507impl PatternComprehensionExecExpr {
508    /// Expand the pattern starting from anchor VIDs.
509    ///
510    /// For single-hop patterns this is a direct CSR lookup.
511    /// For multi-hop, each hop chains from the previous hop's target VIDs.
512    fn expand_pattern(&self, anchor_vids: &UInt64Array) -> DFResult<PatternExpansion> {
513        // Start: each anchor VID is a "frontier" entry
514        let mut frontier_row_indices: Vec<u32> = Vec::new();
515        let mut frontier_vids: Vec<u64> = Vec::new();
516
517        for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
518            if let Some(vid_u64) = vid_opt {
519                frontier_row_indices.push(row_idx as u32);
520                frontier_vids.push(vid_u64);
521            }
522        }
523
524        // `result_row_indices[i]` maps expanded row i back to the original outer batch row.
525        let mut result_row_indices: Vec<u32> = frontier_row_indices.clone();
526        // Track anchor VID for each expanded row (the original source node).
527        let mut result_anchor_vids: Vec<u64> = frontier_vids.clone();
528
529        // Track intermediate target/edge/edge-type arrays per step for multi-hop reconstruction.
530        let mut accumulated_target_vids: Vec<Vec<u64>> = Vec::new();
531        let mut accumulated_edge_ids: Vec<Vec<u64>> = Vec::new();
532        let mut accumulated_edge_type_ids: Vec<Vec<u32>> = Vec::new();
533
534        for step in &self.traversal_steps {
535            let is_undirected = step.direction == Direction::Both;
536            let query_ctx = self.graph_ctx.query_context();
537
538            let mut new_row_indices: Vec<u32> = Vec::new();
539            let mut new_anchor_vids: Vec<u64> = Vec::new();
540            let mut new_target_vids: Vec<u64> = Vec::new();
541            let mut new_edge_ids: Vec<u64> = Vec::new();
542            let mut new_edge_type_ids: Vec<u32> = Vec::new();
543            // Carry-forward columns for steps 0..step_idx
544            let num_prev_cols = accumulated_target_vids.len();
545            let mut new_accumulated_targets: Vec<Vec<u64>> = vec![Vec::new(); num_prev_cols];
546            let mut new_accumulated_edges: Vec<Vec<u64>> =
547                vec![Vec::new(); accumulated_edge_ids.len()];
548            let mut new_accumulated_edge_types: Vec<Vec<u32>> =
549                vec![Vec::new(); accumulated_edge_type_ids.len()];
550
551            for (i, &src_vid_u64) in frontier_vids.iter().enumerate() {
552                let vid = Vid::from(src_vid_u64);
553                let outer_row = result_row_indices[i];
554                let anchor_vid = result_anchor_vids[i];
555
556                let mut seen_edges: HashSet<u64> = HashSet::new();
557
558                for &edge_type in &step.edge_type_ids {
559                    let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
560
561                    for (target_vid, eid) in neighbors {
562                        let eid_u64 = eid.as_u64();
563
564                        // Deduplicate edges for undirected patterns
565                        if is_undirected && !seen_edges.insert(eid_u64) {
566                            continue;
567                        }
568
569                        // Label filtering. Resolve from the L0 chain then the
570                        // persisted index so Lance-only vertices (e.g. on a
571                        // fork) are matched correctly.
572                        if let Some(ref label_name) = step.target_label_name
573                            && let Some(vertex_labels) =
574                                self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
575                            && !vertex_labels.contains(label_name)
576                        {
577                            continue;
578                        }
579
580                        new_row_indices.push(outer_row);
581                        new_anchor_vids.push(anchor_vid);
582                        new_target_vids.push(target_vid.as_u64());
583                        new_edge_ids.push(eid_u64);
584                        new_edge_type_ids.push(edge_type);
585
586                        // Carry forward accumulated columns from previous steps
587                        for (col_idx, col) in accumulated_target_vids.iter().enumerate() {
588                            new_accumulated_targets[col_idx].push(col[i]);
589                        }
590                        for (col_idx, col) in accumulated_edge_ids.iter().enumerate() {
591                            new_accumulated_edges[col_idx].push(col[i]);
592                        }
593                        for (col_idx, col) in accumulated_edge_type_ids.iter().enumerate() {
594                            new_accumulated_edge_types[col_idx].push(col[i]);
595                        }
596                    }
597                }
598            }
599
600            // Update frontier for next hop
601            frontier_vids.clone_from(&new_target_vids);
602            result_row_indices = new_row_indices;
603            result_anchor_vids = new_anchor_vids;
604
605            // Append this step's data to accumulated columns
606            new_accumulated_targets.push(new_target_vids);
607            new_accumulated_edges.push(new_edge_ids);
608            new_accumulated_edge_types.push(new_edge_type_ids);
609            accumulated_target_vids = new_accumulated_targets;
610            accumulated_edge_ids = new_accumulated_edges;
611            accumulated_edge_type_ids = new_accumulated_edge_types;
612        }
613
614        Ok(PatternExpansion {
615            row_indices: result_row_indices,
616            anchor_vids: result_anchor_vids,
617            step_target_vids: accumulated_target_vids,
618            step_edge_ids: accumulated_edge_ids,
619            step_edge_type_ids: accumulated_edge_type_ids,
620        })
621    }
622
623    /// Build a LargeList result of empty lists for all rows.
624    fn build_empty_list_result(&self, num_rows: usize) -> DFResult<ColumnarValue> {
625        let offsets: Vec<i64> = vec![0; num_rows + 1];
626        let empty_values: ArrayRef = arrow_array::new_empty_array(&self.output_item_type);
627        let field = Arc::new(Field::new("item", self.output_item_type.clone(), true));
628        let list = datafusion::arrow::array::LargeListArray::new(
629            field,
630            OffsetBuffer::new(ScalarBuffer::from(offsets)),
631            empty_values,
632            None,
633        );
634        Ok(ColumnarValue::Array(Arc::new(list)))
635    }
636
637    /// Build a path struct column for each expanded row.
638    ///
639    /// Each path consists of: nodes = [anchor, step0_target, step1_target, ...]
640    /// and relationships = [step0_edge, step1_edge, ...].
641    /// The path struct follows the schema from `build_path_struct_field()`.
642    fn build_path_column(
643        &self,
644        expansion: &PatternExpansion,
645        _anchor_vids: &UInt64Array,
646        query_ctx: &QueryContext,
647    ) -> DFResult<ArrayRef> {
648        use arrow_array::builder::{
649            LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
650        };
651
652        let num_expanded = expansion.row_indices.len();
653
654        let node_struct_fields: Vec<Arc<Field>> =
655            crate::query::df_graph::common::node_struct_fields()
656                .iter()
657                .cloned()
658                .collect();
659        let edge_struct_fields: Vec<Arc<Field>> =
660            crate::query::df_graph::common::edge_struct_fields()
661                .iter()
662                .cloned()
663                .collect();
664
665        let mut nodes_builder = ListBuilder::new(StructBuilder::new(
666            node_struct_fields,
667            vec![
668                Box::new(UInt64Builder::new()),
669                Box::new(ListBuilder::new(StringBuilder::new())),
670                Box::new(LargeBinaryBuilder::new()),
671            ],
672        ));
673
674        let mut rels_builder = ListBuilder::new(StructBuilder::new(
675            edge_struct_fields,
676            vec![
677                Box::new(UInt64Builder::new()),
678                Box::new(StringBuilder::new()),
679                Box::new(UInt64Builder::new()),
680                Box::new(UInt64Builder::new()),
681                Box::new(LargeBinaryBuilder::new()),
682            ],
683        ));
684
685        let uni_schema = self.graph_ctx.storage().schema_manager().schema();
686        let num_steps = self.traversal_steps.len();
687
688        for row_idx in 0..num_expanded {
689            // Build node list: anchor + each step's target
690            let anchor_vid_u64 = expansion.anchor_vids[row_idx];
691            let anchor_vid = Vid::from(anchor_vid_u64);
692
693            // Append anchor node
694            super::common::append_node_to_struct(nodes_builder.values(), anchor_vid, query_ctx);
695
696            // Append target node for each step
697            for step_idx in 0..num_steps {
698                let target_vid = Vid::from(expansion.step_target_vids[step_idx][row_idx]);
699                super::common::append_node_to_struct(nodes_builder.values(), target_vid, query_ctx);
700            }
701            nodes_builder.append(true);
702
703            // Build relationships list for each step
704            for step_idx in 0..num_steps {
705                let eid = Eid::from(expansion.step_edge_ids[step_idx][row_idx]);
706                let edge_type_id = expansion.step_edge_type_ids[step_idx][row_idx];
707                let edge_type_name = uni_schema
708                    .edge_type_name_by_id_unified(edge_type_id)
709                    .unwrap_or_default();
710
711                // Determine src and dst: for the path struct, src is the node
712                // *before* this edge and dst is the node *after* this edge.
713                let src_vid = if step_idx == 0 {
714                    anchor_vid_u64
715                } else {
716                    expansion.step_target_vids[step_idx - 1][row_idx]
717                };
718                let dst_vid = expansion.step_target_vids[step_idx][row_idx];
719
720                // Report the relationship's STORED direction, not the traversal
721                // order (`src_vid` -> `dst_vid` along the expansion). Resolves
722                // flushed (L1-resident) edges too, where the L0 chain no longer
723                // holds the stored endpoints.
724                let (stored_src, stored_dst) = self.graph_ctx.resolve_stored_edge_endpoints(
725                    eid,
726                    Vid::from(src_vid),
727                    Vid::from(dst_vid),
728                    &[edge_type_id],
729                );
730                super::common::append_edge_to_struct(
731                    rels_builder.values(),
732                    eid,
733                    &edge_type_name,
734                    stored_src,
735                    stored_dst,
736                    query_ctx,
737                );
738            }
739            rels_builder.append(true);
740        }
741
742        let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
743        let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
744
745        let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
746        let rels_field = Arc::new(Field::new(
747            "relationships",
748            rels_array.data_type().clone(),
749            true,
750        ));
751
752        let path_struct = arrow_array::StructArray::try_new(
753            vec![nodes_field, rels_field].into(),
754            vec![nodes_array, rels_array],
755            None,
756        )
757        .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
758
759        Ok(Arc::new(path_struct))
760    }
761}
762
763// ─── Pattern Analysis Functions ──────────────────────────────────────────────
764
765/// Analyze a pattern to extract the anchor column and traversal steps.
766///
767/// The anchor is the first node in the pattern whose variable has `{var}._vid`
768/// in the input schema. The remaining nodes/edges become traversal steps.
769pub fn analyze_pattern(
770    pattern: &Pattern,
771    input_schema: &Schema,
772    uni_schema: &UniSchema,
773) -> anyhow::Result<(String, Vec<TraversalStep>)> {
774    if pattern.paths.is_empty() {
775        return Err(anyhow::anyhow!(
776            "Pattern comprehension requires at least one path"
777        ));
778    }
779
780    let path = &pattern.paths[0];
781    let elements = &path.elements;
782
783    if elements.is_empty() {
784        return Err(anyhow::anyhow!(
785            "Pattern comprehension path has no elements"
786        ));
787    }
788
789    // Find the anchor node
790    let (anchor_idx, anchor_var) = find_anchor_node(elements, input_schema)?;
791
792    let anchor_column = format!("{}._vid", anchor_var);
793
794    // Build traversal steps from the elements after (or around) the anchor
795    let steps = build_traversal_steps(elements, anchor_idx, uni_schema)?;
796
797    Ok((anchor_column, steps))
798}
799
800/// Find the anchor node in the pattern elements.
801///
802/// The anchor is the first `NodePattern` whose variable has `{var}._vid` in the
803/// input schema. This identifies which node is already bound from the outer scope.
804fn find_anchor_node(
805    elements: &[PatternElement],
806    input_schema: &Schema,
807) -> anyhow::Result<(usize, String)> {
808    for (idx, elem) in elements.iter().enumerate() {
809        if let PatternElement::Node(node) = elem
810            && let Some(ref var) = node.variable
811        {
812            let vid_col = format!("{}._vid", var);
813            if input_schema.column_with_name(&vid_col).is_some() {
814                return Ok((idx, var.clone()));
815            }
816        }
817    }
818
819    Err(anyhow::anyhow!(
820        "No anchor node found in pattern comprehension. \
821         None of the pattern variables have a corresponding `_vid` column in the input schema. \
822         Schema fields: {:?}",
823        input_schema
824            .fields()
825            .iter()
826            .map(|f| f.name().as_str())
827            .collect::<Vec<_>>()
828    ))
829}
830
831/// Build traversal steps from pattern elements starting from the anchor.
832///
833/// The pattern alternates Node, Rel, Node, Rel, Node, ...
834/// The anchor is at `anchor_idx`. We build steps going right from the anchor.
835fn build_traversal_steps(
836    elements: &[PatternElement],
837    anchor_idx: usize,
838    uni_schema: &UniSchema,
839) -> anyhow::Result<Vec<TraversalStep>> {
840    let mut steps = Vec::new();
841
842    // Traverse right from anchor: (anchor)-[r]->(b)-[s]->(c)...
843    let mut i = anchor_idx + 1;
844    while i + 1 < elements.len() {
845        let rel_elem = &elements[i];
846        let target_elem = &elements[i + 1];
847
848        let PatternElement::Relationship(rel) = rel_elem else {
849            return Err(anyhow::anyhow!(
850                "Expected relationship at pattern index {}, got {:?}",
851                i,
852                rel_elem
853            ));
854        };
855
856        let PatternElement::Node(target_node) = target_elem else {
857            return Err(anyhow::anyhow!(
858                "Expected node at pattern index {}, got {:?}",
859                i + 1,
860                target_elem
861            ));
862        };
863
864        let step = build_step_from_rel_and_node(rel, target_node, uni_schema)?;
865        steps.push(step);
866
867        i += 2;
868    }
869
870    if steps.is_empty() {
871        return Err(anyhow::anyhow!(
872            "Pattern comprehension has no traversal steps after anchor"
873        ));
874    }
875
876    Ok(steps)
877}
878
879/// Build a single traversal step from a relationship pattern and target node.
880fn build_step_from_rel_and_node(
881    rel: &RelationshipPattern,
882    target_node: &NodePattern,
883    uni_schema: &UniSchema,
884) -> anyhow::Result<TraversalStep> {
885    // Resolve edge type IDs — check both schema-defined and schemaless registries.
886    let edge_type_ids = if rel.types.is_empty() {
887        // Untyped: traverse all edge types
888        uni_schema.all_edge_type_ids()
889    } else {
890        rel.types
891            .iter()
892            .filter_map(|t| resolve_edge_type_id_unified(uni_schema, t))
893            .collect()
894    };
895
896    if edge_type_ids.is_empty() && !rel.types.is_empty() {
897        // Edge types were specified but none resolved — return empty step
898        // that will produce no results
899        return Ok(TraversalStep {
900            edge_type_ids: vec![],
901            direction: convert_direction(&rel.direction),
902            target_variable: target_node.variable.clone(),
903            target_label_name: target_node.labels.first().cloned(),
904            edge_variable: rel.variable.clone(),
905        });
906    }
907
908    let direction = convert_direction(&rel.direction);
909    let target_label_name = target_node.labels.first().cloned();
910
911    Ok(TraversalStep {
912        edge_type_ids,
913        direction,
914        target_variable: target_node.variable.clone(),
915        target_label_name,
916        edge_variable: rel.variable.clone(),
917    })
918}
919
920/// Resolve an edge type name to its ID, checking both the schema-defined
921/// edge types and the schemaless registry (case-insensitive).
922fn resolve_edge_type_id_unified(uni_schema: &UniSchema, type_name: &str) -> Option<u32> {
923    uni_schema.edge_type_id_unified_case_insensitive(type_name)
924}
925
926/// Convert AST direction to storage direction.
927fn convert_direction(ast_dir: &AstDirection) -> Direction {
928    match ast_dir {
929        AstDirection::Outgoing => Direction::Outgoing,
930        AstDirection::Incoming => Direction::Incoming,
931        AstDirection::Both => Direction::Both,
932    }
933}
934
935/// Collect property references from expressions that refer to inner variables.
936///
937/// Walks expression trees looking for `Expr::Property(Expr::Variable(v), prop)`
938/// where `v` is in `inner_vars`. Separates vertex props from edge props based on
939/// whether the variable is a node or edge variable.
940pub fn collect_inner_properties(
941    where_clause: Option<&Expr>,
942    map_expr: &Expr,
943    steps: &[TraversalStep],
944) -> (HashMap<String, Vec<String>>, HashMap<String, Vec<String>>) {
945    let mut vertex_props: HashMap<String, Vec<String>> = HashMap::new();
946    let mut edge_props: HashMap<String, Vec<String>> = HashMap::new();
947
948    // Build sets of node and edge variable names
949    let node_vars: HashSet<String> = steps
950        .iter()
951        .filter_map(|s| s.target_variable.clone())
952        .collect();
953    let edge_vars: HashSet<String> = steps
954        .iter()
955        .filter_map(|s| s.edge_variable.clone())
956        .collect();
957
958    // Walk expression trees
959    let mut exprs_to_visit: Vec<&Expr> = vec![map_expr];
960    if let Some(w) = where_clause {
961        exprs_to_visit.push(w);
962    }
963
964    while let Some(expr) = exprs_to_visit.pop() {
965        match expr {
966            Expr::Property(base, prop) => {
967                if let Expr::Variable(var) = base.as_ref() {
968                    if node_vars.contains(var) {
969                        vertex_props
970                            .entry(var.clone())
971                            .or_default()
972                            .push(prop.clone());
973                    } else if edge_vars.contains(var) {
974                        edge_props
975                            .entry(var.clone())
976                            .or_default()
977                            .push(prop.clone());
978                    }
979                }
980                // Also walk the base in case it's nested
981                exprs_to_visit.push(base);
982            }
983            Expr::BinaryOp { left, right, .. } => {
984                exprs_to_visit.push(left);
985                exprs_to_visit.push(right);
986            }
987            Expr::UnaryOp { expr: inner, .. } => {
988                exprs_to_visit.push(inner);
989            }
990            Expr::FunctionCall { args, .. } => {
991                for arg in args {
992                    exprs_to_visit.push(arg);
993                }
994            }
995            Expr::Case {
996                when_then,
997                else_expr,
998                ..
999            } => {
1000                for (w, t) in when_then {
1001                    exprs_to_visit.push(w);
1002                    exprs_to_visit.push(t);
1003                }
1004                if let Some(e) = else_expr {
1005                    exprs_to_visit.push(e);
1006                }
1007            }
1008            Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1009                exprs_to_visit.push(inner);
1010            }
1011            Expr::List(items) => {
1012                for item in items {
1013                    exprs_to_visit.push(item);
1014                }
1015            }
1016            Expr::Map(entries) => {
1017                for (_, v) in entries {
1018                    exprs_to_visit.push(v);
1019                }
1020            }
1021            Expr::In { expr: l, list: r } => {
1022                exprs_to_visit.push(l);
1023                exprs_to_visit.push(r);
1024            }
1025            _ => {}
1026        }
1027    }
1028
1029    // Deduplicate property lists
1030    for props in vertex_props.values_mut() {
1031        props.sort();
1032        props.dedup();
1033    }
1034    for props in edge_props.values_mut() {
1035        props.sort();
1036        props.dedup();
1037    }
1038
1039    (vertex_props, edge_props)
1040}
1041
1042/// Build the inner schema for the expanded batch.
1043///
1044/// Starts with outer fields, then adds pattern binding columns
1045/// (_vid for target nodes, _eid for edges), property columns,
1046/// and a path struct column if `path_variable` is provided.
1047pub fn build_inner_schema(
1048    input_schema: &Schema,
1049    steps: &[TraversalStep],
1050    vertex_props: &HashMap<String, Vec<String>>,
1051    edge_props: &HashMap<String, Vec<String>>,
1052    path_variable: Option<&str>,
1053) -> Schema {
1054    let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
1055
1056    for step in steps {
1057        // Target node VID column
1058        if let Some(ref target_var) = step.target_variable {
1059            fields.push(Arc::new(Field::new(
1060                format!("{}._vid", target_var),
1061                DataType::UInt64,
1062                true,
1063            )));
1064        }
1065
1066        // Edge ID column
1067        if let Some(ref edge_var) = step.edge_variable {
1068            fields.push(Arc::new(Field::new(
1069                format!("{}._eid", edge_var),
1070                DataType::UInt64,
1071                true,
1072            )));
1073        }
1074    }
1075
1076    // Add property columns for vertex variables
1077    for step in steps {
1078        if let Some(ref target_var) = step.target_variable
1079            && let Some(props) = vertex_props.get(target_var)
1080        {
1081            for prop in props {
1082                fields.push(Arc::new(Field::new(
1083                    format!("{}.{}", target_var, prop),
1084                    DataType::LargeBinary,
1085                    true,
1086                )));
1087            }
1088        }
1089    }
1090
1091    // Add property columns for edge variables
1092    for step in steps {
1093        if let Some(ref edge_var) = step.edge_variable
1094            && let Some(props) = edge_props.get(edge_var)
1095        {
1096            for prop in props {
1097                fields.push(Arc::new(Field::new(
1098                    format!("{}.{}", edge_var, prop),
1099                    DataType::LargeBinary,
1100                    true,
1101                )));
1102            }
1103        }
1104    }
1105
1106    // Add path struct column if path variable is bound
1107    if let Some(path_var) = path_variable {
1108        fields.push(Arc::new(build_path_struct_field(path_var)));
1109    }
1110
1111    Schema::new(fields)
1112}