Skip to main content

uni_query/query/df_graph/
pattern_comprehension.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Vectorized pattern comprehension for Cypher `[(a)-[:REL]->(b) WHERE pred | expr]`.
5//!
6//! Implements a `PhysicalExpr` that:
7//! 1. Extracts anchor VIDs from the input batch
8//! 2. Expands neighbors via CSR adjacency (synchronous)
9//! 3. Materializes needed properties via `block_in_place`
10//! 4. Applies optional predicate filter
11//! 5. Evaluates the map expression
12//! 6. Reconstructs a `LargeList` column grouped by parent row
13
14use std::any::Any;
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display, Formatter};
17use std::hash::Hash;
18use std::sync::Arc;
19
20use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt32Array, UInt64Array};
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
23use datafusion::arrow::compute::{cast, filter, filter_record_batch, take};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema::Schema as UniSchema;
30use uni_cypher::ast::{
31    Direction as AstDirection, Expr, NodePattern, Pattern, PatternElement, RelationshipPattern,
32};
33use uni_store::QueryContext;
34use uni_store::runtime::l0_visibility;
35use uni_store::storage::direction::Direction;
36
37use super::GraphExecutionContext;
38use crate::query::df_graph::common::{build_path_struct_field, column_as_vid_array};
39use crate::query::df_graph::scan::build_property_column_static;
40
41/// A single hop derived from the pattern's elements.
42#[derive(Debug, Clone)]
43pub struct TraversalStep {
44    /// Resolved edge type IDs for this hop.
45    pub edge_type_ids: Vec<u32>,
46    /// Direction of the hop.
47    pub direction: Direction,
48    /// Variable name for the target node, if any.
49    pub target_variable: Option<String>,
50    /// Label filter for the target node, if any.
51    pub target_label_name: Option<String>,
52    /// Variable name for the edge, if any.
53    pub edge_variable: Option<String>,
54}
55
56/// Physical expression for Cypher Pattern Comprehension:
57/// `[(a)-[:REL]->(b) WHERE pred | expr]`
58#[derive(Debug)]
59pub struct PatternComprehensionExecExpr {
60    /// Shared graph context for CSR lookups and property materialization.
61    graph_ctx: Arc<GraphExecutionContext>,
62    /// Column name for anchor VIDs (e.g., `"a._vid"`).
63    anchor_column: String,
64    /// Steps describing each hop in the pattern.
65    traversal_steps: Vec<TraversalStep>,
66    /// Optional path variable name.
67    path_variable: Option<String>,
68    /// Optional filter predicate compiled against inner schema.
69    predicate: Option<Arc<dyn PhysicalExpr>>,
70    /// Map expression compiled against inner schema.
71    map_expr: Arc<dyn PhysicalExpr>,
72    /// Schema of the outer input batch.
73    input_schema: Arc<Schema>,
74    /// Schema of the inner (expanded) batch — outer + pattern bindings + properties.
75    inner_schema: Arc<Schema>,
76    /// Data type of items in the output list (result of map_expr).
77    output_item_type: DataType,
78    /// Vertex properties needed per variable: variable → [prop_names].
79    needed_vertex_props: HashMap<String, Vec<String>>,
80    /// Edge properties needed per variable: variable → [prop_names].
81    needed_edge_props: HashMap<String, Vec<String>>,
82}
83
84impl Clone for PatternComprehensionExecExpr {
85    fn clone(&self) -> Self {
86        Self {
87            graph_ctx: self.graph_ctx.clone(),
88            anchor_column: self.anchor_column.clone(),
89            traversal_steps: self.traversal_steps.clone(),
90            path_variable: self.path_variable.clone(),
91            predicate: self.predicate.clone(),
92            map_expr: self.map_expr.clone(),
93            input_schema: self.input_schema.clone(),
94            inner_schema: self.inner_schema.clone(),
95            output_item_type: self.output_item_type.clone(),
96            needed_vertex_props: self.needed_vertex_props.clone(),
97            needed_edge_props: self.needed_edge_props.clone(),
98        }
99    }
100}
101
102impl PatternComprehensionExecExpr {
103    #[expect(clippy::too_many_arguments, reason = "Constructor for complex expr")]
104    pub fn new(
105        graph_ctx: Arc<GraphExecutionContext>,
106        anchor_column: String,
107        traversal_steps: Vec<TraversalStep>,
108        path_variable: Option<String>,
109        predicate: Option<Arc<dyn PhysicalExpr>>,
110        map_expr: Arc<dyn PhysicalExpr>,
111        input_schema: Arc<Schema>,
112        inner_schema: Arc<Schema>,
113        output_item_type: DataType,
114        needed_vertex_props: HashMap<String, Vec<String>>,
115        needed_edge_props: HashMap<String, Vec<String>>,
116    ) -> Self {
117        Self {
118            graph_ctx,
119            anchor_column,
120            traversal_steps,
121            path_variable,
122            predicate,
123            map_expr,
124            input_schema,
125            inner_schema,
126            output_item_type,
127            needed_vertex_props,
128            needed_edge_props,
129        }
130    }
131}
132
133impl Display for PatternComprehensionExecExpr {
134    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
135        write!(
136            f,
137            "PatternComprehension(anchor={}, steps={})",
138            self.anchor_column,
139            self.traversal_steps.len()
140        )
141    }
142}
143
144impl PartialEq for PatternComprehensionExecExpr {
145    fn eq(&self, other: &Self) -> bool {
146        self.anchor_column == other.anchor_column
147            && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
148            && Arc::ptr_eq(&self.map_expr, &other.map_expr)
149            && match (&self.predicate, &other.predicate) {
150                (Some(a), Some(b)) => Arc::ptr_eq(a, b),
151                (None, None) => true,
152                _ => false,
153            }
154    }
155}
156
157impl Eq for PatternComprehensionExecExpr {}
158
159impl Hash for PatternComprehensionExecExpr {
160    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
161        self.anchor_column.hash(state);
162        self.output_item_type.hash(state);
163    }
164}
165
166impl PartialEq<dyn Any> for PatternComprehensionExecExpr {
167    fn eq(&self, other: &dyn Any) -> bool {
168        other
169            .downcast_ref::<Self>()
170            .map(|x| self == x)
171            .unwrap_or(false)
172    }
173}
174
175impl PhysicalExpr for PatternComprehensionExecExpr {
176    fn as_any(&self) -> &dyn Any {
177        self
178    }
179
180    fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
181        Ok(DataType::LargeList(Arc::new(Field::new(
182            "item",
183            self.output_item_type.clone(),
184            true,
185        ))))
186    }
187
188    fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
189        Ok(true)
190    }
191
192    fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
193        let num_rows = batch.num_rows();
194
195        // Step 1: Extract anchor VIDs
196        let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
197            col
198        } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
199            batch.column_by_name(var_name).ok_or_else(|| {
200                DataFusionError::Execution(format!(
201                    "Anchor column '{}' not found in batch schema: {:?}",
202                    self.anchor_column,
203                    batch
204                        .schema()
205                        .fields()
206                        .iter()
207                        .map(|f| f.name().as_str())
208                        .collect::<Vec<_>>()
209                ))
210            })?
211        } else {
212            return Err(DataFusionError::Execution(format!(
213                "Anchor column '{}' not found in batch schema: {:?}",
214                self.anchor_column,
215                batch
216                    .schema()
217                    .fields()
218                    .iter()
219                    .map(|f| f.name().as_str())
220                    .collect::<Vec<_>>()
221            )));
222        };
223        let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
224        let anchor_vids: &UInt64Array = &anchor_vid_cow;
225
226        // Step 2: CSR expansion
227        // Warm CSR for all edge types and directions
228        for step in &self.traversal_steps {
229            log::debug!(
230                "PatternComprehension: warming CSR for edge_type_ids={:?}, direction={:?}",
231                step.edge_type_ids,
232                step.direction
233            );
234            std::thread::scope(|s| {
235                s.spawn(|| {
236                    let rt = tokio::runtime::Builder::new_current_thread()
237                        .enable_all()
238                        .build()
239                        .map_err(|e| {
240                            DataFusionError::Execution(format!("Runtime creation failed: {e}"))
241                        })?;
242                    rt.block_on(
243                        self.graph_ctx
244                            .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
245                    )
246                    .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
247                })
248                .join()
249                .unwrap_or_else(|_| {
250                    Err(DataFusionError::Execution(
251                        "CSR warming thread panicked".to_string(),
252                    ))
253                })
254            })?;
255        }
256
257        log::debug!(
258            "PatternComprehension: expanding {} anchor VIDs, steps={}",
259            anchor_vids.len(),
260            self.traversal_steps.len()
261        );
262
263        // Expand: for each anchor VID, traverse the pattern and collect results.
264        // For single-hop, this is straightforward. For multi-hop, chain expansions.
265        let expansion = self.expand_pattern(anchor_vids)?;
266
267        log::debug!(
268            "PatternComprehension: expansion produced {} rows",
269            expansion.row_indices.len()
270        );
271
272        // Handle empty expansion: produce LargeList of empty lists
273        if expansion.row_indices.is_empty() {
274            return self.build_empty_list_result(num_rows);
275        }
276
277        // Step 3: Build flat inner batch
278        let indices_array = UInt32Array::from(expansion.row_indices.clone());
279        let mut inner_columns: Vec<ArrayRef> = Vec::new();
280
281        // Replicate outer columns
282        for col in batch.columns() {
283            inner_columns.push(take(col, &indices_array, None)?);
284        }
285
286        // Add target VID columns for each step
287        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
288            if let Some(ref _target_var) = step.target_variable {
289                inner_columns.push(Arc::new(UInt64Array::from(
290                    expansion.step_target_vids[step_idx].clone(),
291                )));
292            }
293            if let Some(ref _edge_var) = step.edge_variable {
294                inner_columns.push(Arc::new(UInt64Array::from(
295                    expansion.step_edge_ids[step_idx].clone(),
296                )));
297            }
298        }
299
300        // Step 3b: Property materialization
301        let query_ctx = self.graph_ctx.query_context();
302        for (step_idx, step) in self.traversal_steps.iter().enumerate() {
303            // Vertex properties
304            if let Some(ref target_var) = step.target_variable
305                && let Some(props) = self.needed_vertex_props.get(target_var)
306            {
307                let vids: Vec<Vid> = expansion.step_target_vids[step_idx]
308                    .iter()
309                    .map(|v| Vid::from(*v))
310                    .collect();
311
312                let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
313
314                let props_map = std::thread::scope(|s| {
315                    s.spawn(|| {
316                        let rt = tokio::runtime::Builder::new_current_thread()
317                            .enable_all()
318                            .build()
319                            .map_err(|e| {
320                                DataFusionError::Execution(format!("Runtime creation failed: {e}"))
321                            })?;
322                        rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
323                            &vids,
324                            &prop_refs,
325                            Some(&query_ctx),
326                        ))
327                        .map_err(|e| {
328                            DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
329                        })
330                    })
331                    .join()
332                    .unwrap_or_else(|_| {
333                        Err(DataFusionError::Execution(
334                            "Vertex prop load thread panicked".to_string(),
335                        ))
336                    })
337                })?;
338
339                for prop in props {
340                    let col = build_property_column_static(
341                        &vids,
342                        &props_map,
343                        prop,
344                        &DataType::LargeBinary,
345                    )?;
346                    inner_columns.push(col);
347                }
348            }
349
350            // Edge properties
351            if let Some(ref edge_var) = step.edge_variable
352                && let Some(props) = self.needed_edge_props.get(edge_var)
353            {
354                let eids: Vec<Eid> = expansion.step_edge_ids[step_idx]
355                    .iter()
356                    .map(|e| Eid::from(*e))
357                    .collect();
358
359                let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
360
361                let props_map = std::thread::scope(|s| {
362                    s.spawn(|| {
363                        let rt = tokio::runtime::Builder::new_current_thread()
364                            .enable_all()
365                            .build()
366                            .map_err(|e| {
367                                DataFusionError::Execution(format!("Runtime creation failed: {e}"))
368                            })?;
369                        rt.block_on(self.graph_ctx.property_manager().get_batch_edge_props(
370                            &eids,
371                            &prop_refs,
372                            Some(&query_ctx),
373                        ))
374                        .map_err(|e| {
375                            DataFusionError::Execution(format!("Edge prop load failed: {e}"))
376                        })
377                    })
378                    .join()
379                    .unwrap_or_else(|_| {
380                        Err(DataFusionError::Execution(
381                            "Edge prop load thread panicked".to_string(),
382                        ))
383                    })
384                })?;
385
386                // Edge props use Eid mapped to Vid keys in the HashMap
387                let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
388                for prop in props {
389                    let col = build_property_column_static(
390                        &vid_keys,
391                        &props_map,
392                        prop,
393                        &DataType::LargeBinary,
394                    )?;
395                    inner_columns.push(col);
396                }
397            }
398        }
399
400        // Step 3c: Build path struct column if path variable is bound
401        if self.path_variable.is_some() {
402            let path_col = self.build_path_column(&expansion, anchor_vids, &query_ctx)?;
403            inner_columns.push(path_col);
404        }
405
406        let inner_batch = RecordBatch::try_new(self.inner_schema.clone(), inner_columns)?;
407
408        // Step 4: Filter
409        let (filtered_batch, filtered_indices) = if let Some(pred) = &self.predicate {
410            let mask = pred
411                .evaluate(&inner_batch)?
412                .into_array(inner_batch.num_rows())?;
413            let mask = cast(&mask, &DataType::Boolean)?;
414            let boolean_mask = mask
415                .as_any()
416                .downcast_ref::<BooleanArray>()
417                .ok_or_else(|| {
418                    DataFusionError::Execution(
419                        "Pattern comprehension predicate did not produce BooleanArray".to_string(),
420                    )
421                })?;
422
423            let filtered_batch = filter_record_batch(&inner_batch, boolean_mask)?;
424            let indices_array_ref: ArrayRef = Arc::new(indices_array.clone());
425            let filtered_idx = filter(&indices_array_ref, boolean_mask)?;
426            let filtered_idx = filtered_idx
427                .as_any()
428                .downcast_ref::<UInt32Array>()
429                .unwrap()
430                .clone();
431
432            (filtered_batch, filtered_idx)
433        } else {
434            (inner_batch, indices_array.clone())
435        };
436
437        // Step 5: Map
438        let mapped_val = self.map_expr.evaluate(&filtered_batch)?;
439        let mapped_array = mapped_val.into_array(filtered_batch.num_rows())?;
440
441        // Step 6: Reconstruct LargeList
442        let new_offsets = {
443            let mut offsets = Vec::with_capacity(num_rows + 1);
444            offsets.push(0i64);
445
446            let indices_slice = filtered_indices.values();
447            let mut pos = 0;
448            let mut current_len: i64 = 0;
449
450            for row_idx in 0..num_rows {
451                while pos < indices_slice.len() && indices_slice[pos] as usize == row_idx {
452                    pos += 1;
453                    current_len += 1;
454                }
455                offsets.push(current_len);
456            }
457            OffsetBuffer::new(ScalarBuffer::from(offsets))
458        };
459
460        let new_field = Arc::new(Field::new("item", mapped_array.data_type().clone(), true));
461        let new_list = datafusion::arrow::array::LargeListArray::new(
462            new_field,
463            new_offsets,
464            mapped_array,
465            None,
466        );
467
468        Ok(ColumnarValue::Array(Arc::new(new_list)))
469    }
470
471    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
472        // map_expr and predicate are compiled against inner schema;
473        // don't expose to DF tree traversal.
474        vec![]
475    }
476
477    fn with_new_children(
478        self: Arc<Self>,
479        children: Vec<Arc<dyn PhysicalExpr>>,
480    ) -> DFResult<Arc<dyn PhysicalExpr>> {
481        if !children.is_empty() {
482            return Err(DataFusionError::Internal(
483                "PatternComprehension has no children".to_string(),
484            ));
485        }
486        Ok(self)
487    }
488
489    fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        write!(f, "PatternComprehension({})", self.anchor_column)
491    }
492}
493
494/// Intermediate result from pattern expansion.
495struct PatternExpansion {
496    /// Row index into the outer batch for each expanded row.
497    row_indices: Vec<u32>,
498    /// Anchor VID for each expanded row (source of the path).
499    anchor_vids: Vec<u64>,
500    /// Per-step target VIDs (parallel arrays with row_indices).
501    step_target_vids: Vec<Vec<u64>>,
502    /// Per-step edge IDs (parallel arrays with row_indices).
503    step_edge_ids: Vec<Vec<u64>>,
504    /// Per-step edge type IDs (parallel arrays with row_indices).
505    step_edge_type_ids: Vec<Vec<u32>>,
506}
507
508impl PatternComprehensionExecExpr {
509    /// Expand the pattern starting from anchor VIDs.
510    ///
511    /// For single-hop patterns this is a direct CSR lookup.
512    /// For multi-hop, each hop chains from the previous hop's target VIDs.
513    fn expand_pattern(&self, anchor_vids: &UInt64Array) -> DFResult<PatternExpansion> {
514        // Start: each anchor VID is a "frontier" entry
515        let mut frontier_row_indices: Vec<u32> = Vec::new();
516        let mut frontier_vids: Vec<u64> = Vec::new();
517
518        for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
519            if let Some(vid_u64) = vid_opt {
520                frontier_row_indices.push(row_idx as u32);
521                frontier_vids.push(vid_u64);
522            }
523        }
524
525        // `result_row_indices[i]` maps expanded row i back to the original outer batch row.
526        let mut result_row_indices: Vec<u32> = frontier_row_indices.clone();
527        // Track anchor VID for each expanded row (the original source node).
528        let mut result_anchor_vids: Vec<u64> = frontier_vids.clone();
529
530        // Track intermediate target/edge/edge-type arrays per step for multi-hop reconstruction.
531        let mut accumulated_target_vids: Vec<Vec<u64>> = Vec::new();
532        let mut accumulated_edge_ids: Vec<Vec<u64>> = Vec::new();
533        let mut accumulated_edge_type_ids: Vec<Vec<u32>> = Vec::new();
534
535        for step in &self.traversal_steps {
536            let is_undirected = step.direction == Direction::Both;
537            let query_ctx = self.graph_ctx.query_context();
538
539            let mut new_row_indices: Vec<u32> = Vec::new();
540            let mut new_anchor_vids: Vec<u64> = Vec::new();
541            let mut new_target_vids: Vec<u64> = Vec::new();
542            let mut new_edge_ids: Vec<u64> = Vec::new();
543            let mut new_edge_type_ids: Vec<u32> = Vec::new();
544            // Carry-forward columns for steps 0..step_idx
545            let num_prev_cols = accumulated_target_vids.len();
546            let mut new_accumulated_targets: Vec<Vec<u64>> = vec![Vec::new(); num_prev_cols];
547            let mut new_accumulated_edges: Vec<Vec<u64>> =
548                vec![Vec::new(); accumulated_edge_ids.len()];
549            let mut new_accumulated_edge_types: Vec<Vec<u32>> =
550                vec![Vec::new(); accumulated_edge_type_ids.len()];
551
552            for (i, &src_vid_u64) in frontier_vids.iter().enumerate() {
553                let vid = Vid::from(src_vid_u64);
554                let outer_row = result_row_indices[i];
555                let anchor_vid = result_anchor_vids[i];
556
557                let mut seen_edges: HashSet<u64> = HashSet::new();
558
559                for &edge_type in &step.edge_type_ids {
560                    let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
561
562                    for (target_vid, eid) in neighbors {
563                        let eid_u64 = eid.as_u64();
564
565                        // Deduplicate edges for undirected patterns
566                        if is_undirected && !seen_edges.insert(eid_u64) {
567                            continue;
568                        }
569
570                        // Label filtering
571                        if let Some(ref label_name) = step.target_label_name
572                            && let Some(vertex_labels) =
573                                l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
574                            && !vertex_labels.contains(label_name)
575                        {
576                            continue;
577                        }
578
579                        new_row_indices.push(outer_row);
580                        new_anchor_vids.push(anchor_vid);
581                        new_target_vids.push(target_vid.as_u64());
582                        new_edge_ids.push(eid_u64);
583                        new_edge_type_ids.push(edge_type);
584
585                        // Carry forward accumulated columns from previous steps
586                        for (col_idx, col) in accumulated_target_vids.iter().enumerate() {
587                            new_accumulated_targets[col_idx].push(col[i]);
588                        }
589                        for (col_idx, col) in accumulated_edge_ids.iter().enumerate() {
590                            new_accumulated_edges[col_idx].push(col[i]);
591                        }
592                        for (col_idx, col) in accumulated_edge_type_ids.iter().enumerate() {
593                            new_accumulated_edge_types[col_idx].push(col[i]);
594                        }
595                    }
596                }
597            }
598
599            // Update frontier for next hop
600            frontier_vids.clone_from(&new_target_vids);
601            result_row_indices = new_row_indices;
602            result_anchor_vids = new_anchor_vids;
603
604            // Append this step's data to accumulated columns
605            new_accumulated_targets.push(new_target_vids);
606            new_accumulated_edges.push(new_edge_ids);
607            new_accumulated_edge_types.push(new_edge_type_ids);
608            accumulated_target_vids = new_accumulated_targets;
609            accumulated_edge_ids = new_accumulated_edges;
610            accumulated_edge_type_ids = new_accumulated_edge_types;
611        }
612
613        Ok(PatternExpansion {
614            row_indices: result_row_indices,
615            anchor_vids: result_anchor_vids,
616            step_target_vids: accumulated_target_vids,
617            step_edge_ids: accumulated_edge_ids,
618            step_edge_type_ids: accumulated_edge_type_ids,
619        })
620    }
621
622    /// Build a LargeList result of empty lists for all rows.
623    fn build_empty_list_result(&self, num_rows: usize) -> DFResult<ColumnarValue> {
624        let offsets: Vec<i64> = vec![0; num_rows + 1];
625        let empty_values: ArrayRef = arrow_array::new_empty_array(&self.output_item_type);
626        let field = Arc::new(Field::new("item", self.output_item_type.clone(), true));
627        let list = datafusion::arrow::array::LargeListArray::new(
628            field,
629            OffsetBuffer::new(ScalarBuffer::from(offsets)),
630            empty_values,
631            None,
632        );
633        Ok(ColumnarValue::Array(Arc::new(list)))
634    }
635
636    /// Build a path struct column for each expanded row.
637    ///
638    /// Each path consists of: nodes = [anchor, step0_target, step1_target, ...]
639    /// and relationships = [step0_edge, step1_edge, ...].
640    /// The path struct follows the schema from `build_path_struct_field()`.
641    fn build_path_column(
642        &self,
643        expansion: &PatternExpansion,
644        _anchor_vids: &UInt64Array,
645        query_ctx: &QueryContext,
646    ) -> DFResult<ArrayRef> {
647        use arrow_array::builder::{
648            LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
649        };
650
651        let num_expanded = expansion.row_indices.len();
652
653        let node_struct_fields: Vec<Arc<Field>> =
654            crate::query::df_graph::common::node_struct_fields()
655                .iter()
656                .cloned()
657                .collect();
658        let edge_struct_fields: Vec<Arc<Field>> =
659            crate::query::df_graph::common::edge_struct_fields()
660                .iter()
661                .cloned()
662                .collect();
663
664        let mut nodes_builder = ListBuilder::new(StructBuilder::new(
665            node_struct_fields,
666            vec![
667                Box::new(UInt64Builder::new()),
668                Box::new(ListBuilder::new(StringBuilder::new())),
669                Box::new(LargeBinaryBuilder::new()),
670            ],
671        ));
672
673        let mut rels_builder = ListBuilder::new(StructBuilder::new(
674            edge_struct_fields,
675            vec![
676                Box::new(UInt64Builder::new()),
677                Box::new(StringBuilder::new()),
678                Box::new(UInt64Builder::new()),
679                Box::new(UInt64Builder::new()),
680                Box::new(LargeBinaryBuilder::new()),
681            ],
682        ));
683
684        let uni_schema = self.graph_ctx.storage().schema_manager().schema();
685        let num_steps = self.traversal_steps.len();
686
687        for row_idx in 0..num_expanded {
688            // Build node list: anchor + each step's target
689            let anchor_vid_u64 = expansion.anchor_vids[row_idx];
690            let anchor_vid = Vid::from(anchor_vid_u64);
691
692            // Append anchor node
693            super::common::append_node_to_struct(nodes_builder.values(), anchor_vid, query_ctx);
694
695            // Append target node for each step
696            for step_idx in 0..num_steps {
697                let target_vid = Vid::from(expansion.step_target_vids[step_idx][row_idx]);
698                super::common::append_node_to_struct(nodes_builder.values(), target_vid, query_ctx);
699            }
700            nodes_builder.append(true);
701
702            // Build relationships list for each step
703            for step_idx in 0..num_steps {
704                let eid = Eid::from(expansion.step_edge_ids[step_idx][row_idx]);
705                let edge_type_id = expansion.step_edge_type_ids[step_idx][row_idx];
706                let edge_type_name = uni_schema
707                    .edge_type_name_by_id_unified(edge_type_id)
708                    .unwrap_or_default();
709
710                // Determine src and dst: for the path struct, src is the node
711                // *before* this edge and dst is the node *after* this edge.
712                let src_vid = if step_idx == 0 {
713                    anchor_vid_u64
714                } else {
715                    expansion.step_target_vids[step_idx - 1][row_idx]
716                };
717                let dst_vid = expansion.step_target_vids[step_idx][row_idx];
718
719                super::common::append_edge_to_struct(
720                    rels_builder.values(),
721                    eid,
722                    &edge_type_name,
723                    src_vid,
724                    dst_vid,
725                    query_ctx,
726                );
727            }
728            rels_builder.append(true);
729        }
730
731        let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
732        let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
733
734        let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
735        let rels_field = Arc::new(Field::new(
736            "relationships",
737            rels_array.data_type().clone(),
738            true,
739        ));
740
741        let path_struct = arrow_array::StructArray::try_new(
742            vec![nodes_field, rels_field].into(),
743            vec![nodes_array, rels_array],
744            None,
745        )
746        .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
747
748        Ok(Arc::new(path_struct))
749    }
750}
751
752// ─── Pattern Analysis Functions ──────────────────────────────────────────────
753
754/// Analyze a pattern to extract the anchor column and traversal steps.
755///
756/// The anchor is the first node in the pattern whose variable has `{var}._vid`
757/// in the input schema. The remaining nodes/edges become traversal steps.
758pub fn analyze_pattern(
759    pattern: &Pattern,
760    input_schema: &Schema,
761    uni_schema: &UniSchema,
762) -> anyhow::Result<(String, Vec<TraversalStep>)> {
763    if pattern.paths.is_empty() {
764        return Err(anyhow::anyhow!(
765            "Pattern comprehension requires at least one path"
766        ));
767    }
768
769    let path = &pattern.paths[0];
770    let elements = &path.elements;
771
772    if elements.is_empty() {
773        return Err(anyhow::anyhow!(
774            "Pattern comprehension path has no elements"
775        ));
776    }
777
778    // Find the anchor node
779    let (anchor_idx, anchor_var) = find_anchor_node(elements, input_schema)?;
780
781    let anchor_column = format!("{}._vid", anchor_var);
782
783    // Build traversal steps from the elements after (or around) the anchor
784    let steps = build_traversal_steps(elements, anchor_idx, uni_schema)?;
785
786    Ok((anchor_column, steps))
787}
788
789/// Find the anchor node in the pattern elements.
790///
791/// The anchor is the first `NodePattern` whose variable has `{var}._vid` in the
792/// input schema. This identifies which node is already bound from the outer scope.
793fn find_anchor_node(
794    elements: &[PatternElement],
795    input_schema: &Schema,
796) -> anyhow::Result<(usize, String)> {
797    for (idx, elem) in elements.iter().enumerate() {
798        if let PatternElement::Node(node) = elem
799            && let Some(ref var) = node.variable
800        {
801            let vid_col = format!("{}._vid", var);
802            if input_schema.column_with_name(&vid_col).is_some() {
803                return Ok((idx, var.clone()));
804            }
805        }
806    }
807
808    Err(anyhow::anyhow!(
809        "No anchor node found in pattern comprehension. \
810         None of the pattern variables have a corresponding `_vid` column in the input schema. \
811         Schema fields: {:?}",
812        input_schema
813            .fields()
814            .iter()
815            .map(|f| f.name().as_str())
816            .collect::<Vec<_>>()
817    ))
818}
819
820/// Build traversal steps from pattern elements starting from the anchor.
821///
822/// The pattern alternates Node, Rel, Node, Rel, Node, ...
823/// The anchor is at `anchor_idx`. We build steps going right from the anchor.
824fn build_traversal_steps(
825    elements: &[PatternElement],
826    anchor_idx: usize,
827    uni_schema: &UniSchema,
828) -> anyhow::Result<Vec<TraversalStep>> {
829    let mut steps = Vec::new();
830
831    // Traverse right from anchor: (anchor)-[r]->(b)-[s]->(c)...
832    let mut i = anchor_idx + 1;
833    while i + 1 < elements.len() {
834        let rel_elem = &elements[i];
835        let target_elem = &elements[i + 1];
836
837        let PatternElement::Relationship(rel) = rel_elem else {
838            return Err(anyhow::anyhow!(
839                "Expected relationship at pattern index {}, got {:?}",
840                i,
841                rel_elem
842            ));
843        };
844
845        let PatternElement::Node(target_node) = target_elem else {
846            return Err(anyhow::anyhow!(
847                "Expected node at pattern index {}, got {:?}",
848                i + 1,
849                target_elem
850            ));
851        };
852
853        let step = build_step_from_rel_and_node(rel, target_node, uni_schema)?;
854        steps.push(step);
855
856        i += 2;
857    }
858
859    if steps.is_empty() {
860        return Err(anyhow::anyhow!(
861            "Pattern comprehension has no traversal steps after anchor"
862        ));
863    }
864
865    Ok(steps)
866}
867
868/// Build a single traversal step from a relationship pattern and target node.
869fn build_step_from_rel_and_node(
870    rel: &RelationshipPattern,
871    target_node: &NodePattern,
872    uni_schema: &UniSchema,
873) -> anyhow::Result<TraversalStep> {
874    // Resolve edge type IDs — check both schema-defined and schemaless registries.
875    let edge_type_ids = if rel.types.is_empty() {
876        // Untyped: traverse all edge types
877        uni_schema.all_edge_type_ids()
878    } else {
879        rel.types
880            .iter()
881            .filter_map(|t| resolve_edge_type_id_unified(uni_schema, t))
882            .collect()
883    };
884
885    if edge_type_ids.is_empty() && !rel.types.is_empty() {
886        // Edge types were specified but none resolved — return empty step
887        // that will produce no results
888        return Ok(TraversalStep {
889            edge_type_ids: vec![],
890            direction: convert_direction(&rel.direction),
891            target_variable: target_node.variable.clone(),
892            target_label_name: target_node.labels.first().cloned(),
893            edge_variable: rel.variable.clone(),
894        });
895    }
896
897    let direction = convert_direction(&rel.direction);
898    let target_label_name = target_node.labels.first().cloned();
899
900    Ok(TraversalStep {
901        edge_type_ids,
902        direction,
903        target_variable: target_node.variable.clone(),
904        target_label_name,
905        edge_variable: rel.variable.clone(),
906    })
907}
908
909/// Resolve an edge type name to its ID, checking both the schema-defined
910/// edge types and the schemaless registry (case-insensitive).
911fn resolve_edge_type_id_unified(uni_schema: &UniSchema, type_name: &str) -> Option<u32> {
912    uni_schema.edge_type_id_unified_case_insensitive(type_name)
913}
914
915/// Convert AST direction to storage direction.
916fn convert_direction(ast_dir: &AstDirection) -> Direction {
917    match ast_dir {
918        AstDirection::Outgoing => Direction::Outgoing,
919        AstDirection::Incoming => Direction::Incoming,
920        AstDirection::Both => Direction::Both,
921    }
922}
923
924/// Collect property references from expressions that refer to inner variables.
925///
926/// Walks expression trees looking for `Expr::Property(Expr::Variable(v), prop)`
927/// where `v` is in `inner_vars`. Separates vertex props from edge props based on
928/// whether the variable is a node or edge variable.
929pub fn collect_inner_properties(
930    where_clause: Option<&Expr>,
931    map_expr: &Expr,
932    steps: &[TraversalStep],
933) -> (HashMap<String, Vec<String>>, HashMap<String, Vec<String>>) {
934    let mut vertex_props: HashMap<String, Vec<String>> = HashMap::new();
935    let mut edge_props: HashMap<String, Vec<String>> = HashMap::new();
936
937    // Build sets of node and edge variable names
938    let node_vars: HashSet<String> = steps
939        .iter()
940        .filter_map(|s| s.target_variable.clone())
941        .collect();
942    let edge_vars: HashSet<String> = steps
943        .iter()
944        .filter_map(|s| s.edge_variable.clone())
945        .collect();
946
947    // Walk expression trees
948    let mut exprs_to_visit: Vec<&Expr> = vec![map_expr];
949    if let Some(w) = where_clause {
950        exprs_to_visit.push(w);
951    }
952
953    while let Some(expr) = exprs_to_visit.pop() {
954        match expr {
955            Expr::Property(base, prop) => {
956                if let Expr::Variable(var) = base.as_ref() {
957                    if node_vars.contains(var) {
958                        vertex_props
959                            .entry(var.clone())
960                            .or_default()
961                            .push(prop.clone());
962                    } else if edge_vars.contains(var) {
963                        edge_props
964                            .entry(var.clone())
965                            .or_default()
966                            .push(prop.clone());
967                    }
968                }
969                // Also walk the base in case it's nested
970                exprs_to_visit.push(base);
971            }
972            Expr::BinaryOp { left, right, .. } => {
973                exprs_to_visit.push(left);
974                exprs_to_visit.push(right);
975            }
976            Expr::UnaryOp { expr: inner, .. } => {
977                exprs_to_visit.push(inner);
978            }
979            Expr::FunctionCall { args, .. } => {
980                for arg in args {
981                    exprs_to_visit.push(arg);
982                }
983            }
984            Expr::Case {
985                when_then,
986                else_expr,
987                ..
988            } => {
989                for (w, t) in when_then {
990                    exprs_to_visit.push(w);
991                    exprs_to_visit.push(t);
992                }
993                if let Some(e) = else_expr {
994                    exprs_to_visit.push(e);
995                }
996            }
997            Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
998                exprs_to_visit.push(inner);
999            }
1000            Expr::List(items) => {
1001                for item in items {
1002                    exprs_to_visit.push(item);
1003                }
1004            }
1005            Expr::Map(entries) => {
1006                for (_, v) in entries {
1007                    exprs_to_visit.push(v);
1008                }
1009            }
1010            Expr::In { expr: l, list: r } => {
1011                exprs_to_visit.push(l);
1012                exprs_to_visit.push(r);
1013            }
1014            _ => {}
1015        }
1016    }
1017
1018    // Deduplicate property lists
1019    for props in vertex_props.values_mut() {
1020        props.sort();
1021        props.dedup();
1022    }
1023    for props in edge_props.values_mut() {
1024        props.sort();
1025        props.dedup();
1026    }
1027
1028    (vertex_props, edge_props)
1029}
1030
1031/// Build the inner schema for the expanded batch.
1032///
1033/// Starts with outer fields, then adds pattern binding columns
1034/// (_vid for target nodes, _eid for edges), property columns,
1035/// and a path struct column if `path_variable` is provided.
1036pub fn build_inner_schema(
1037    input_schema: &Schema,
1038    steps: &[TraversalStep],
1039    vertex_props: &HashMap<String, Vec<String>>,
1040    edge_props: &HashMap<String, Vec<String>>,
1041    path_variable: Option<&str>,
1042) -> Schema {
1043    let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
1044
1045    for step in steps {
1046        // Target node VID column
1047        if let Some(ref target_var) = step.target_variable {
1048            fields.push(Arc::new(Field::new(
1049                format!("{}._vid", target_var),
1050                DataType::UInt64,
1051                true,
1052            )));
1053        }
1054
1055        // Edge ID column
1056        if let Some(ref edge_var) = step.edge_variable {
1057            fields.push(Arc::new(Field::new(
1058                format!("{}._eid", edge_var),
1059                DataType::UInt64,
1060                true,
1061            )));
1062        }
1063    }
1064
1065    // Add property columns for vertex variables
1066    for step in steps {
1067        if let Some(ref target_var) = step.target_variable
1068            && let Some(props) = vertex_props.get(target_var)
1069        {
1070            for prop in props {
1071                fields.push(Arc::new(Field::new(
1072                    format!("{}.{}", target_var, prop),
1073                    DataType::LargeBinary,
1074                    true,
1075                )));
1076            }
1077        }
1078    }
1079
1080    // Add property columns for edge variables
1081    for step in steps {
1082        if let Some(ref edge_var) = step.edge_variable
1083            && let Some(props) = edge_props.get(edge_var)
1084        {
1085            for prop in props {
1086                fields.push(Arc::new(Field::new(
1087                    format!("{}.{}", edge_var, prop),
1088                    DataType::LargeBinary,
1089                    true,
1090                )));
1091            }
1092        }
1093    }
1094
1095    // Add path struct column if path variable is bound
1096    if let Some(path_var) = path_variable {
1097        fields.push(Arc::new(build_path_struct_field(path_var)));
1098    }
1099
1100    Schema::new(fields)
1101}