Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62/// BFS result: (target_vid, hop_count, node_path, edge_path)
63type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
66type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68/// Prepend nodes and edges from an existing path struct column into builders.
69///
70/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
71/// Reads the nodes and relationships from the existing path at `row_idx` and appends
72/// them to the provided builders. The caller should then skip the first VLP node
73/// (which is the junction point already present in the existing path).
74fn prepend_existing_path(
75    existing_path: &arrow_array::StructArray,
76    row_idx: usize,
77    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79    query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81    // Read existing nodes
82    let nodes_list = existing_path
83        .column(0)
84        .as_any()
85        .downcast_ref::<arrow_array::ListArray>()
86        .unwrap();
87    let node_values = nodes_list.value(row_idx);
88    let node_struct = node_values
89        .as_any()
90        .downcast_ref::<arrow_array::StructArray>()
91        .unwrap();
92    let vid_col = node_struct
93        .column(0)
94        .as_any()
95        .downcast_ref::<UInt64Array>()
96        .unwrap();
97    for i in 0..vid_col.len() {
98        append_node_to_struct(
99            nodes_builder.values(),
100            Vid::from(vid_col.value(i)),
101            query_ctx,
102        );
103    }
104
105    // Read existing edges
106    let rels_list = existing_path
107        .column(1)
108        .as_any()
109        .downcast_ref::<arrow_array::ListArray>()
110        .unwrap();
111    let edge_values = rels_list.value(row_idx);
112    let edge_struct = edge_values
113        .as_any()
114        .downcast_ref::<arrow_array::StructArray>()
115        .unwrap();
116    let eid_col = edge_struct
117        .column(0)
118        .as_any()
119        .downcast_ref::<UInt64Array>()
120        .unwrap();
121    let type_col = edge_struct
122        .column(1)
123        .as_any()
124        .downcast_ref::<arrow_array::StringArray>()
125        .unwrap();
126    let src_col = edge_struct
127        .column(2)
128        .as_any()
129        .downcast_ref::<UInt64Array>()
130        .unwrap();
131    let dst_col = edge_struct
132        .column(3)
133        .as_any()
134        .downcast_ref::<UInt64Array>()
135        .unwrap();
136    for i in 0..eid_col.len() {
137        append_edge_to_struct(
138            rels_builder.values(),
139            Eid::from(eid_col.value(i)),
140            type_col.value(i),
141            src_col.value(i),
142            dst_col.value(i),
143            query_ctx,
144        );
145    }
146}
147
148/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
149/// schemaless properties. Unlike vertex properties, schemaless edge properties must
150/// preserve original JSON value types (int, float, etc.) since edge types commonly
151/// lack explicit property definitions.
152fn resolve_edge_property_type(
153    prop: &str,
154    schema_props: Option<
155        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156    >,
157) -> DataType {
158    if prop == "overflow_json" {
159        DataType::LargeBinary
160    } else {
161        schema_props
162            .and_then(|props| props.get(prop))
163            .map(|meta| meta.r#type.to_arrow())
164            .unwrap_or(DataType::LargeBinary)
165    }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
171type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173/// Single-hop graph traversal execution plan.
174///
175/// Expands each input row by traversing edges to find neighbors.
176/// For each (source, edge, target) triple, produces one output row
177/// containing the input columns plus target vertex and edge columns.
178///
179/// # Example
180///
181/// ```ignore
182/// // Input: batch with _vid column
183/// // Traverse KNOWS edges outgoing
184/// let traverse = GraphTraverseExec::new(
185///     input_plan,
186///     "_vid",
187///     vec![knows_type_id],
188///     Direction::Outgoing,
189///     "m",           // target variable
190///     Some("r"),     // edge variable
191///     None,          // no target label filter
192///     graph_ctx,
193/// );
194///
195/// // Output: input columns + m._vid + r._eid
196/// ```
197pub struct GraphTraverseExec {
198    /// Input execution plan.
199    input: Arc<dyn ExecutionPlan>,
200
201    /// Column name containing source VIDs.
202    source_column: String,
203
204    /// Edge type IDs to traverse.
205    edge_type_ids: Vec<u32>,
206
207    /// Traversal direction.
208    direction: Direction,
209
210    /// Variable name for target vertex columns.
211    target_variable: String,
212
213    /// Variable name for edge columns (if edge is bound).
214    edge_variable: Option<String>,
215
216    /// Edge properties to materialize (for pushdown hydration).
217    edge_properties: Vec<String>,
218
219    /// Target vertex properties to materialize.
220    target_properties: Vec<String>,
221
222    /// Target label name for property type resolution.
223    target_label_name: Option<String>,
224
225    /// Optional target label filter.
226    target_label_id: Option<u16>,
227
228    /// Graph execution context.
229    graph_ctx: Arc<GraphExecutionContext>,
230
231    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
232    optional: bool,
233
234    /// Variables introduced by the OPTIONAL MATCH pattern.
235    /// Used to determine which columns should be null-extended on failure.
236    optional_pattern_vars: HashSet<String>,
237
238    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
239    /// When set, only traversals that reach this VID are included.
240    bound_target_column: Option<String>,
241
242    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
243    /// Edges matching any of these IDs are excluded from traversal results.
244    used_edge_columns: Vec<String>,
245
246    /// Output schema.
247    schema: SchemaRef,
248
249    /// Cached plan properties.
250    properties: PlanProperties,
251
252    /// Execution metrics.
253    metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("GraphTraverseExec")
259            .field("source_column", &self.source_column)
260            .field("edge_type_ids", &self.edge_type_ids)
261            .field("direction", &self.direction)
262            .field("target_variable", &self.target_variable)
263            .field("edge_variable", &self.edge_variable)
264            .finish()
265    }
266}
267
268impl GraphTraverseExec {
269    /// Create a new single-hop traversal plan.
270    ///
271    /// # Arguments
272    ///
273    /// * `input` - Input plan providing source vertices
274    /// * `source_column` - Column name containing source VIDs
275    /// * `edge_type_ids` - Edge types to traverse
276    /// * `direction` - Traversal direction
277    /// * `target_variable` - Variable name for target vertices
278    /// * `edge_variable` - Optional variable name for edges
279    /// * `edge_properties` - Edge properties to materialize
280    /// * `target_label_id` - Optional target label filter
281    /// * `graph_ctx` - Graph execution context
282    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
283    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
284    #[expect(clippy::too_many_arguments)]
285    pub fn new(
286        input: Arc<dyn ExecutionPlan>,
287        source_column: impl Into<String>,
288        edge_type_ids: Vec<u32>,
289        direction: Direction,
290        target_variable: impl Into<String>,
291        edge_variable: Option<String>,
292        edge_properties: Vec<String>,
293        target_properties: Vec<String>,
294        target_label_name: Option<String>,
295        target_label_id: Option<u16>,
296        graph_ctx: Arc<GraphExecutionContext>,
297        optional: bool,
298        optional_pattern_vars: HashSet<String>,
299        bound_target_column: Option<String>,
300        used_edge_columns: Vec<String>,
301    ) -> Self {
302        let source_column = source_column.into();
303        let target_variable = target_variable.into();
304
305        // Resolve target property Arrow types from the schema
306        let uni_schema = graph_ctx.storage().schema_manager().schema();
307        let label_props = target_label_name
308            .as_deref()
309            .and_then(|ln| uni_schema.properties.get(ln));
310        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311        let edge_props = if merged_edge_props.is_empty() {
312            None
313        } else {
314            Some(&merged_edge_props)
315        };
316
317        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
318        let schema = Self::build_schema(
319            input.schema(),
320            &target_variable,
321            edge_variable.as_deref(),
322            &edge_properties,
323            &target_properties,
324            label_props,
325            edge_props,
326            optional,
327        );
328
329        let properties = compute_plan_properties(schema.clone());
330
331        Self {
332            input,
333            source_column,
334            edge_type_ids,
335            direction,
336            target_variable,
337            edge_variable,
338            edge_properties,
339            target_properties,
340            target_label_name,
341            target_label_id,
342            graph_ctx,
343            optional,
344            optional_pattern_vars,
345            bound_target_column,
346            used_edge_columns,
347            schema,
348            properties,
349            metrics: ExecutionPlanMetricsSet::new(),
350        }
351    }
352
353    /// Build output schema.
354    #[expect(
355        clippy::too_many_arguments,
356        reason = "Schema construction needs all field metadata"
357    )]
358    fn build_schema(
359        input_schema: SchemaRef,
360        target_variable: &str,
361        edge_variable: Option<&str>,
362        edge_properties: &[String],
363        target_properties: &[String],
364        label_props: Option<
365            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366        >,
367        edge_props: Option<
368            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369        >,
370        optional: bool,
371    ) -> SchemaRef {
372        let mut fields: Vec<Field> = input_schema
373            .fields()
374            .iter()
375            .map(|f| f.as_ref().clone())
376            .collect();
377
378        // Add target VID column (nullable when optional — unmatched rows get NULL)
379        let target_vid_name = format!("{}._vid", target_variable);
380        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
383        fields.push(Field::new(
384            format!("{}._labels", target_variable),
385            labels_data_type(),
386            true,
387        ));
388
389        // Add target vertex property columns
390        for prop_name in target_properties {
391            let col_name = format!("{}.{}", target_variable, prop_name);
392            let arrow_type = resolve_property_type(prop_name, label_props);
393            fields.push(Field::new(&col_name, arrow_type, true));
394        }
395
396        // Add edge ID column if edge variable is bound
397        if let Some(edge_var) = edge_variable {
398            let edge_id_name = format!("{}._eid", edge_var);
399            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401            // Add edge _type column for type(r) support
402            fields.push(Field::new(
403                format!("{}._type", edge_var),
404                DataType::Utf8,
405                true,
406            ));
407
408            // Add edge property columns with types resolved from schema
409            for prop_name in edge_properties {
410                let prop_col_name = format!("{}.{}", edge_var, prop_name);
411                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412                fields.push(Field::new(&prop_col_name, arrow_type, true));
413            }
414        } else {
415            // Add internal edge ID column for relationship uniqueness tracking
416            // even when edge variable is not explicitly bound.
417            let internal_eid_name = format!("__eid_to_{}", target_variable);
418            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419        }
420
421        Arc::new(Schema::new(fields))
422    }
423}
424
425impl DisplayAs for GraphTraverseExec {
426    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427        write!(
428            f,
429            "GraphTraverseExec: {} --[{:?}]--> {}",
430            self.source_column, self.edge_type_ids, self.target_variable
431        )?;
432        if let Some(ref edge_var) = self.edge_variable {
433            write!(f, " as {}", edge_var)?;
434        }
435        Ok(())
436    }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440    fn name(&self) -> &str {
441        "GraphTraverseExec"
442    }
443
444    fn as_any(&self) -> &dyn Any {
445        self
446    }
447
448    fn schema(&self) -> SchemaRef {
449        self.schema.clone()
450    }
451
452    fn properties(&self) -> &PlanProperties {
453        &self.properties
454    }
455
456    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457        vec![&self.input]
458    }
459
460    fn with_new_children(
461        self: Arc<Self>,
462        children: Vec<Arc<dyn ExecutionPlan>>,
463    ) -> DFResult<Arc<dyn ExecutionPlan>> {
464        if children.len() != 1 {
465            return Err(datafusion::error::DataFusionError::Plan(
466                "GraphTraverseExec requires exactly one child".to_string(),
467            ));
468        }
469
470        Ok(Arc::new(Self::new(
471            children[0].clone(),
472            self.source_column.clone(),
473            self.edge_type_ids.clone(),
474            self.direction,
475            self.target_variable.clone(),
476            self.edge_variable.clone(),
477            self.edge_properties.clone(),
478            self.target_properties.clone(),
479            self.target_label_name.clone(),
480            self.target_label_id,
481            self.graph_ctx.clone(),
482            self.optional,
483            self.optional_pattern_vars.clone(),
484            self.bound_target_column.clone(),
485            self.used_edge_columns.clone(),
486        )))
487    }
488
489    fn execute(
490        &self,
491        partition: usize,
492        context: Arc<TaskContext>,
493    ) -> DFResult<SendableRecordBatchStream> {
494        let input_stream = self.input.execute(partition, context)?;
495
496        let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498        let warm_fut = self
499            .graph_ctx
500            .warming_future(self.edge_type_ids.clone(), self.direction);
501
502        Ok(Box::pin(GraphTraverseStream {
503            input: input_stream,
504            source_column: self.source_column.clone(),
505            edge_type_ids: self.edge_type_ids.clone(),
506            direction: self.direction,
507            target_variable: self.target_variable.clone(),
508            edge_variable: self.edge_variable.clone(),
509            edge_properties: self.edge_properties.clone(),
510            target_properties: self.target_properties.clone(),
511            target_label_name: self.target_label_name.clone(),
512            graph_ctx: self.graph_ctx.clone(),
513            optional: self.optional,
514            optional_pattern_vars: self.optional_pattern_vars.clone(),
515            bound_target_column: self.bound_target_column.clone(),
516            used_edge_columns: self.used_edge_columns.clone(),
517            schema: self.schema.clone(),
518            state: TraverseStreamState::Warming(warm_fut),
519            metrics,
520        }))
521    }
522
523    fn metrics(&self) -> Option<MetricsSet> {
524        Some(self.metrics.clone_inner())
525    }
526}
527
528/// State machine for traverse stream execution.
529enum TraverseStreamState {
530    /// Warming adjacency CSRs before first batch.
531    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532    /// Polling the input stream for batches.
533    Reading,
534    /// Materializing target vertex properties asynchronously.
535    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536    /// Stream is done.
537    Done,
538}
539
540/// Stream that performs single-hop traversal with async property materialization.
541struct GraphTraverseStream {
542    /// Input stream.
543    input: SendableRecordBatchStream,
544
545    /// Column name containing source VIDs.
546    source_column: String,
547
548    /// Edge type IDs to traverse.
549    edge_type_ids: Vec<u32>,
550
551    /// Traversal direction.
552    direction: Direction,
553
554    /// Variable name for target vertex (retained for diagnostics).
555    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556    target_variable: String,
557
558    /// Variable name for edge (if bound).
559    edge_variable: Option<String>,
560
561    /// Edge properties to materialize.
562    edge_properties: Vec<String>,
563
564    /// Target vertex properties to materialize.
565    target_properties: Vec<String>,
566
567    /// Target label name for property resolution and filtering.
568    target_label_name: Option<String>,
569
570    /// Graph execution context.
571    graph_ctx: Arc<GraphExecutionContext>,
572
573    /// Whether this is an OPTIONAL MATCH.
574    optional: bool,
575
576    /// Variables introduced by the OPTIONAL MATCH pattern.
577    optional_pattern_vars: HashSet<String>,
578
579    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
580    bound_target_column: Option<String>,
581
582    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
583    used_edge_columns: Vec<String>,
584
585    /// Output schema.
586    schema: SchemaRef,
587
588    /// Stream state.
589    state: TraverseStreamState,
590
591    /// Metrics.
592    metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596    /// Expand neighbors synchronously and return expansions.
597    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
598    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600            datafusion::error::DataFusionError::Execution(format!(
601                "Source column '{}' not found",
602                self.source_column
603            ))
604        })?;
605
606        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607        let source_vids: &UInt64Array = &source_vid_cow;
608
609        // If bound_target_column is set, get the expected target VIDs for each row.
610        // This is used for cycle patterns like n-->k<--n where the target must match.
611        let bound_target_cow = self
612            .bound_target_column
613            .as_ref()
614            .and_then(|col| batch.column_by_name(col))
615            .map(|c| column_as_vid_array(c.as_ref()))
616            .transpose()?;
617        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
620        let used_edge_arrays: Vec<&UInt64Array> = self
621            .used_edge_columns
622            .iter()
623            .filter_map(|col| {
624                batch
625                    .column_by_name(col)
626                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627            })
628            .collect();
629
630        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631        let is_undirected = matches!(self.direction, Direction::Both);
632
633        for (row_idx, source_vid) in source_vids.iter().enumerate() {
634            let Some(src) = source_vid else {
635                continue;
636            };
637
638            // Get expected target VID if this is a bound target pattern.
639            // Distinguish between:
640            // - no bound target column (no filtering),
641            // - bound target present but NULL for this row (must produce no expansion),
642            // - bound target present with VID.
643            let expected_target = bound_target_vids.map(|arr| {
644                if arr.is_null(row_idx) {
645                    None
646                } else {
647                    Some(arr.value(row_idx))
648                }
649            });
650
651            // Collect used edge IDs for this row from all previous hops
652            let used_eids: HashSet<u64> = used_edge_arrays
653                .iter()
654                .filter_map(|arr| {
655                    if arr.is_null(row_idx) {
656                        None
657                    } else {
658                        Some(arr.value(row_idx))
659                    }
660                })
661                .collect();
662
663            let vid = Vid::from(src);
664            // For Direction::Both, deduplicate edges by eid within each source.
665            // This prevents the same edge being counted twice (once outgoing, once incoming).
666            let mut seen_edges: HashSet<u64> = HashSet::new();
667
668            for &edge_type in &self.edge_type_ids {
669                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671                for (target_vid, eid) in neighbors {
672                    let eid_u64 = eid.as_u64();
673
674                    // Skip edges already used in previous hops (relationship uniqueness)
675                    if used_eids.contains(&eid_u64) {
676                        continue;
677                    }
678
679                    // Deduplicate edges for undirected patterns
680                    if is_undirected && !seen_edges.insert(eid_u64) {
681                        continue;
682                    }
683
684                    // Filter by bound target VID if set (for cycle patterns).
685                    // NULL bound targets do not match anything.
686                    if let Some(expected_opt) = expected_target {
687                        let Some(expected) = expected_opt else {
688                            continue;
689                        };
690                        if target_vid.as_u64() != expected {
691                            continue;
692                        }
693                    }
694
695                    // Filter by target label using L0 visibility.
696                    // VIDs no longer embed label information, so we must look up labels.
697                    if let Some(ref label_name) = self.target_label_name {
698                        let query_ctx = self.graph_ctx.query_context();
699                        if let Some(vertex_labels) =
700                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701                        {
702                            // Vertex is in L0 — require actual label match
703                            if !vertex_labels.contains(label_name) {
704                                continue;
705                            }
706                        }
707                        // else: vertex not in L0 → trust storage-level filtering
708                    }
709
710                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711                }
712            }
713        }
714
715        Ok(expanded_rows)
716    }
717}
718
719/// Build target vertex labels column from L0 buffers.
720fn build_target_labels_column(
721    target_vids: &[Vid],
722    target_label_name: &Option<String>,
723    graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725    use arrow_array::builder::{ListBuilder, StringBuilder};
726    let mut labels_builder = ListBuilder::new(StringBuilder::new());
727    let query_ctx = graph_ctx.query_context();
728    for vid in target_vids {
729        let row_labels: Vec<String> =
730            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731                Some(labels) => labels,
732                None => {
733                    // Vertex not in L0 — trust schema label (storage already filtered)
734                    if let Some(label_name) = target_label_name {
735                        vec![label_name.clone()]
736                    } else {
737                        vec![]
738                    }
739                }
740            };
741        let values = labels_builder.values();
742        for lbl in &row_labels {
743            values.append_value(lbl);
744        }
745        labels_builder.append(true);
746    }
747    Arc::new(labels_builder.finish())
748}
749
750/// Build target vertex property columns from storage and L0.
751async fn build_target_property_columns(
752    target_vids: &[Vid],
753    target_properties: &[String],
754    target_label_name: &Option<String>,
755    graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757    let mut columns = Vec::new();
758
759    if let Some(label_name) = target_label_name {
760        let property_manager = graph_ctx.property_manager();
761        let query_ctx = graph_ctx.query_context();
762
763        let props_map = property_manager
764            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765            .await
766            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768        let uni_schema = graph_ctx.storage().schema_manager().schema();
769        let label_props = uni_schema.properties.get(label_name.as_str());
770
771        for prop_name in target_properties {
772            let data_type = resolve_property_type(prop_name, label_props);
773            let column =
774                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775            columns.push(column);
776        }
777    } else {
778        // No label name — use label-agnostic property lookup.
779        let non_internal_props: Vec<&str> = target_properties
780            .iter()
781            .filter(|p| *p != "_all_props")
782            .map(|s| s.as_str())
783            .collect();
784        let property_manager = graph_ctx.property_manager();
785        let query_ctx = graph_ctx.query_context();
786
787        let props_map = if !non_internal_props.is_empty() {
788            property_manager
789                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790                .await
791                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792        } else {
793            std::collections::HashMap::new()
794        };
795
796        for prop_name in target_properties {
797            if prop_name == "_all_props" {
798                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799            } else {
800                let column = build_property_column_static(
801                    target_vids,
802                    &props_map,
803                    prop_name,
804                    &arrow::datatypes::DataType::LargeBinary,
805                )?;
806                columns.push(column);
807            }
808        }
809    }
810
811    Ok(columns)
812}
813
814/// Build a CypherValue blob column from all vertex properties (L0 + storage).
815fn build_all_props_column(
816    target_vids: &[Vid],
817    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818    graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820    use crate::query::df_graph::scan::encode_cypher_value;
821    use arrow_array::builder::LargeBinaryBuilder;
822
823    let mut builder = LargeBinaryBuilder::new();
824    let l0_ctx = graph_ctx.l0_context();
825    for vid in target_vids {
826        let mut merged_props = serde_json::Map::new();
827        if let Some(vid_props) = props_map.get(vid) {
828            for (k, v) in vid_props.iter() {
829                let json_val: serde_json::Value = v.clone().into();
830                merged_props.insert(k.to_string(), json_val);
831            }
832        }
833        for l0 in l0_ctx.iter_l0_buffers() {
834            let guard = l0.read();
835            if let Some(l0_props) = guard.vertex_properties.get(vid) {
836                for (k, v) in l0_props.iter() {
837                    let json_val: serde_json::Value = v.clone().into();
838                    merged_props.insert(k.to_string(), json_val);
839                }
840            }
841        }
842        if merged_props.is_empty() {
843            builder.append_null();
844        } else {
845            let json = serde_json::Value::Object(merged_props);
846            match encode_cypher_value(&json) {
847                Ok(bytes) => builder.append_value(bytes),
848                Err(_) => builder.append_null(),
849            }
850        }
851    }
852    Arc::new(builder.finish())
853}
854
855/// Build edge ID, type, and property columns for bound edge variables.
856async fn build_edge_columns(
857    expansions: &[(usize, Vid, u64, u32)],
858    edge_properties: &[String],
859    edge_type_ids: &[u32],
860    graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862    let mut columns = Vec::new();
863
864    let eids: Vec<Eid> = expansions
865        .iter()
866        .map(|(_, _, eid, _)| Eid::from(*eid))
867        .collect();
868    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871    // Edge _type column
872    {
873        let uni_schema = graph_ctx.storage().schema_manager().schema();
874        let mut type_builder = arrow_array::builder::StringBuilder::new();
875        for (_, _, _, edge_type_id) in expansions {
876            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877                type_builder.append_value(&name);
878            } else {
879                type_builder.append_null();
880            }
881        }
882        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883    }
884
885    if !edge_properties.is_empty() {
886        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887        let property_manager = graph_ctx.property_manager();
888        let query_ctx = graph_ctx.query_context();
889
890        let props_map = property_manager
891            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892            .await
893            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895        let uni_schema = graph_ctx.storage().schema_manager().schema();
896        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897        let edge_type_props = if merged_edge_props.is_empty() {
898            None
899        } else {
900            Some(&merged_edge_props)
901        };
902
903        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905        for prop_name in edge_properties {
906            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907            let column =
908                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909            columns.push(column);
910        }
911    }
912
913    Ok(columns)
914}
915
916/// Build the output batch with target vertex properties.
917///
918/// This is a standalone async function so it can be boxed into a `Send` future
919/// without borrowing from `GraphTraverseStream`.
920#[expect(
921    clippy::too_many_arguments,
922    reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925    input: RecordBatch,
926    expansions: Vec<(usize, Vid, u64, u32)>,
927    schema: SchemaRef,
928    edge_variable: Option<String>,
929    edge_properties: Vec<String>,
930    edge_type_ids: Vec<u32>,
931    target_properties: Vec<String>,
932    target_label_name: Option<String>,
933    graph_ctx: Arc<GraphExecutionContext>,
934    optional: bool,
935    optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937    if expansions.is_empty() {
938        if !optional {
939            return Ok(RecordBatch::new_empty(schema));
940        }
941        let unmatched_reps = collect_unmatched_optional_group_rows(
942            &input,
943            &HashSet::new(),
944            &schema,
945            &optional_pattern_vars,
946        )?;
947        if unmatched_reps.is_empty() {
948            return Ok(RecordBatch::new_empty(schema));
949        }
950        return build_optional_null_batch_for_rows_with_optional_vars(
951            &input,
952            &unmatched_reps,
953            &schema,
954            &optional_pattern_vars,
955        );
956    }
957
958    // Expand input columns via index array
959    let indices: Vec<u64> = expansions
960        .iter()
961        .map(|(idx, _, _, _)| *idx as u64)
962        .collect();
963    let indices_array = UInt64Array::from(indices);
964    let mut columns: Vec<ArrayRef> = input
965        .columns()
966        .iter()
967        .map(|col| take(col.as_ref(), &indices_array, None))
968        .collect::<Result<_, _>>()?;
969
970    // Target VID column
971    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975    // Target labels column
976    columns.push(build_target_labels_column(
977        &target_vids,
978        &target_label_name,
979        &graph_ctx,
980    ));
981
982    // Target vertex property columns
983    if !target_properties.is_empty() {
984        let prop_cols = build_target_property_columns(
985            &target_vids,
986            &target_properties,
987            &target_label_name,
988            &graph_ctx,
989        )
990        .await?;
991        columns.extend(prop_cols);
992    }
993
994    // Edge columns (bound or internal tracking)
995    if edge_variable.is_some() {
996        let edge_cols =
997            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998        columns.extend(edge_cols);
999    } else {
1000        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002    }
1003
1004    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1005
1006    // Append null rows for unmatched optional sources
1007    if optional {
1008        let matched_indices: HashSet<usize> =
1009            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1010        let unmatched = collect_unmatched_optional_group_rows(
1011            &input,
1012            &matched_indices,
1013            &schema,
1014            &optional_pattern_vars,
1015        )?;
1016
1017        if !unmatched.is_empty() {
1018            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1019                &input,
1020                &unmatched,
1021                &schema,
1022                &optional_pattern_vars,
1023            )?;
1024            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1025                .map_err(arrow_err)?;
1026            return Ok(combined);
1027        }
1028    }
1029
1030    Ok(expanded_batch)
1031}
1032
1033/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1034/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1035fn build_optional_null_batch_for_rows(
1036    input: &RecordBatch,
1037    unmatched_indices: &[usize],
1038    schema: &SchemaRef,
1039) -> DFResult<RecordBatch> {
1040    let num_rows = unmatched_indices.len();
1041    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1042    let indices_array = UInt64Array::from(indices);
1043
1044    // Take the unmatched input rows
1045    let mut columns: Vec<ArrayRef> = Vec::new();
1046    for col in input.columns() {
1047        let taken = take(col.as_ref(), &indices_array, None)?;
1048        columns.push(taken);
1049    }
1050    // Fill remaining columns with nulls
1051    for field in schema.fields().iter().skip(input.num_columns()) {
1052        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1053    }
1054    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1055}
1056
1057fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1058    optional_vars.contains(col_name)
1059        || optional_vars.iter().any(|var| {
1060            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1061                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1062        })
1063}
1064
1065fn collect_unmatched_optional_group_rows(
1066    input: &RecordBatch,
1067    matched_indices: &HashSet<usize>,
1068    schema: &SchemaRef,
1069    optional_vars: &HashSet<String>,
1070) -> DFResult<Vec<usize>> {
1071    if input.num_rows() == 0 {
1072        return Ok(Vec::new());
1073    }
1074
1075    if optional_vars.is_empty() {
1076        return Ok((0..input.num_rows())
1077            .filter(|idx| !matched_indices.contains(idx))
1078            .collect());
1079    }
1080
1081    let source_vid_indices: Vec<usize> = schema
1082        .fields()
1083        .iter()
1084        .enumerate()
1085        .filter_map(|(idx, field)| {
1086            if idx >= input.num_columns() {
1087                return None;
1088            }
1089            let name = field.name();
1090            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1091                Some(idx)
1092            } else {
1093                None
1094            }
1095        })
1096        .collect();
1097
1098    // Group rows by non-optional VID bindings and preserve group order.
1099    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1100    let mut group_order: Vec<Vec<u8>> = Vec::new();
1101
1102    for row_idx in 0..input.num_rows() {
1103        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1104        let entry = groups.entry(key.clone());
1105        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1106            group_order.push(key.clone());
1107        }
1108        let matched = matched_indices.contains(&row_idx);
1109        entry
1110            .and_modify(|(_, any_matched)| *any_matched |= matched)
1111            .or_insert((row_idx, matched));
1112    }
1113
1114    Ok(group_order
1115        .into_iter()
1116        .filter_map(|key| {
1117            groups
1118                .get(&key)
1119                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1120        })
1121        .collect())
1122}
1123
1124fn compute_optional_group_key(
1125    batch: &RecordBatch,
1126    row_idx: usize,
1127    source_vid_indices: &[usize],
1128) -> DFResult<Vec<u8>> {
1129    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1130    for &col_idx in source_vid_indices {
1131        let col = batch.column(col_idx);
1132        let vid_cow = column_as_vid_array(col.as_ref())?;
1133        let arr: &UInt64Array = &vid_cow;
1134        if arr.is_null(row_idx) {
1135            key.extend_from_slice(&u64::MAX.to_le_bytes());
1136        } else {
1137            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1138        }
1139    }
1140    Ok(key)
1141}
1142
1143fn build_optional_null_batch_for_rows_with_optional_vars(
1144    input: &RecordBatch,
1145    unmatched_indices: &[usize],
1146    schema: &SchemaRef,
1147    optional_vars: &HashSet<String>,
1148) -> DFResult<RecordBatch> {
1149    if optional_vars.is_empty() {
1150        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1151    }
1152
1153    let num_rows = unmatched_indices.len();
1154    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1155    let indices_array = UInt64Array::from(indices);
1156
1157    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1158    for (col_idx, field) in schema.fields().iter().enumerate() {
1159        if col_idx < input.num_columns() {
1160            if is_optional_column_for_vars(field.name(), optional_vars) {
1161                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1162            } else {
1163                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1164                columns.push(taken);
1165            }
1166        } else {
1167            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1168        }
1169    }
1170
1171    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1172}
1173
1174impl Stream for GraphTraverseStream {
1175    type Item = DFResult<RecordBatch>;
1176
1177    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1178        loop {
1179            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1180
1181            match state {
1182                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1183                    Poll::Ready(Ok(())) => {
1184                        self.state = TraverseStreamState::Reading;
1185                        // Continue loop to start reading
1186                    }
1187                    Poll::Ready(Err(e)) => {
1188                        self.state = TraverseStreamState::Done;
1189                        return Poll::Ready(Some(Err(e)));
1190                    }
1191                    Poll::Pending => {
1192                        self.state = TraverseStreamState::Warming(fut);
1193                        return Poll::Pending;
1194                    }
1195                },
1196                TraverseStreamState::Reading => {
1197                    // Check timeout
1198                    if let Err(e) = self.graph_ctx.check_timeout() {
1199                        return Poll::Ready(Some(Err(
1200                            datafusion::error::DataFusionError::Execution(e.to_string()),
1201                        )));
1202                    }
1203
1204                    match self.input.poll_next_unpin(cx) {
1205                        Poll::Ready(Some(Ok(batch))) => {
1206                            // Expand neighbors synchronously
1207                            let expansions = match self.expand_neighbors(&batch) {
1208                                Ok(exp) => exp,
1209                                Err(e) => {
1210                                    self.state = TraverseStreamState::Reading;
1211                                    return Poll::Ready(Some(Err(e)));
1212                                }
1213                            };
1214
1215                            // Build output synchronously only when no properties need async hydration
1216                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1217                            {
1218                                let result = build_traverse_output_batch_sync(
1219                                    &batch,
1220                                    &expansions,
1221                                    &self.schema,
1222                                    self.edge_variable.as_ref(),
1223                                    &self.graph_ctx,
1224                                    self.optional,
1225                                    &self.optional_pattern_vars,
1226                                );
1227                                self.state = TraverseStreamState::Reading;
1228                                if let Ok(ref r) = result {
1229                                    self.metrics.record_output(r.num_rows());
1230                                }
1231                                return Poll::Ready(Some(result));
1232                            }
1233
1234                            // Properties needed — create async future for hydration
1235                            let schema = self.schema.clone();
1236                            let edge_variable = self.edge_variable.clone();
1237                            let edge_properties = self.edge_properties.clone();
1238                            let edge_type_ids = self.edge_type_ids.clone();
1239                            let target_properties = self.target_properties.clone();
1240                            let target_label_name = self.target_label_name.clone();
1241                            let graph_ctx = self.graph_ctx.clone();
1242
1243                            let optional = self.optional;
1244                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1245
1246                            let fut = build_traverse_output_batch(
1247                                batch,
1248                                expansions,
1249                                schema,
1250                                edge_variable,
1251                                edge_properties,
1252                                edge_type_ids,
1253                                target_properties,
1254                                target_label_name,
1255                                graph_ctx,
1256                                optional,
1257                                optional_pattern_vars,
1258                            );
1259
1260                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1261                            // Continue loop to poll the future
1262                        }
1263                        Poll::Ready(Some(Err(e))) => {
1264                            self.state = TraverseStreamState::Done;
1265                            return Poll::Ready(Some(Err(e)));
1266                        }
1267                        Poll::Ready(None) => {
1268                            self.state = TraverseStreamState::Done;
1269                            return Poll::Ready(None);
1270                        }
1271                        Poll::Pending => {
1272                            self.state = TraverseStreamState::Reading;
1273                            return Poll::Pending;
1274                        }
1275                    }
1276                }
1277                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1278                    Poll::Ready(Ok(batch)) => {
1279                        self.state = TraverseStreamState::Reading;
1280                        self.metrics.record_output(batch.num_rows());
1281                        return Poll::Ready(Some(Ok(batch)));
1282                    }
1283                    Poll::Ready(Err(e)) => {
1284                        self.state = TraverseStreamState::Done;
1285                        return Poll::Ready(Some(Err(e)));
1286                    }
1287                    Poll::Pending => {
1288                        self.state = TraverseStreamState::Materializing(fut);
1289                        return Poll::Pending;
1290                    }
1291                },
1292                TraverseStreamState::Done => {
1293                    return Poll::Ready(None);
1294                }
1295            }
1296        }
1297    }
1298}
1299
1300/// Build output batch synchronously when no properties need async hydration.
1301///
1302/// Only called when both `target_properties` and `edge_properties` are empty,
1303/// so no property columns need to be materialized.
1304fn build_traverse_output_batch_sync(
1305    input: &RecordBatch,
1306    expansions: &[(usize, Vid, u64, u32)],
1307    schema: &SchemaRef,
1308    edge_variable: Option<&String>,
1309    graph_ctx: &GraphExecutionContext,
1310    optional: bool,
1311    optional_pattern_vars: &HashSet<String>,
1312) -> DFResult<RecordBatch> {
1313    if expansions.is_empty() {
1314        if !optional {
1315            return Ok(RecordBatch::new_empty(schema.clone()));
1316        }
1317        let unmatched_reps = collect_unmatched_optional_group_rows(
1318            input,
1319            &HashSet::new(),
1320            schema,
1321            optional_pattern_vars,
1322        )?;
1323        if unmatched_reps.is_empty() {
1324            return Ok(RecordBatch::new_empty(schema.clone()));
1325        }
1326        return build_optional_null_batch_for_rows_with_optional_vars(
1327            input,
1328            &unmatched_reps,
1329            schema,
1330            optional_pattern_vars,
1331        );
1332    }
1333
1334    let indices: Vec<u64> = expansions
1335        .iter()
1336        .map(|(idx, _, _, _)| *idx as u64)
1337        .collect();
1338    let indices_array = UInt64Array::from(indices);
1339
1340    let mut columns: Vec<ArrayRef> = Vec::new();
1341    for col in input.columns() {
1342        let expanded = take(col.as_ref(), &indices_array, None)?;
1343        columns.push(expanded);
1344    }
1345
1346    // Add target VID column
1347    let target_vids: Vec<u64> = expansions
1348        .iter()
1349        .map(|(_, vid, _, _)| vid.as_u64())
1350        .collect();
1351    columns.push(Arc::new(UInt64Array::from(target_vids)));
1352
1353    // Add target ._labels column (from L0 buffers)
1354    {
1355        use arrow_array::builder::{ListBuilder, StringBuilder};
1356        let l0_ctx = graph_ctx.l0_context();
1357        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1358        for (_, vid, _, _) in expansions {
1359            let mut row_labels: Vec<String> = Vec::new();
1360            for l0 in l0_ctx.iter_l0_buffers() {
1361                let guard = l0.read();
1362                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1363                    for lbl in l0_labels {
1364                        if !row_labels.contains(lbl) {
1365                            row_labels.push(lbl.clone());
1366                        }
1367                    }
1368                }
1369            }
1370            let values = labels_builder.values();
1371            for lbl in &row_labels {
1372                values.append_value(lbl);
1373            }
1374            labels_builder.append(true);
1375        }
1376        columns.push(Arc::new(labels_builder.finish()));
1377    }
1378
1379    // Add edge columns if edge is bound (no properties in sync path)
1380    if edge_variable.is_some() {
1381        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1382        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1383
1384        // Add edge _type column
1385        let uni_schema = graph_ctx.storage().schema_manager().schema();
1386        let mut type_builder = arrow_array::builder::StringBuilder::new();
1387        for (_, _, _, edge_type_id) in expansions {
1388            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1389                type_builder.append_value(&name);
1390            } else {
1391                type_builder.append_null();
1392            }
1393        }
1394        columns.push(Arc::new(type_builder.finish()));
1395    } else {
1396        // Internal EID column for relationship uniqueness tracking (matches schema)
1397        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1398        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1399    }
1400
1401    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1402
1403    if optional {
1404        let matched_indices: HashSet<usize> =
1405            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1406        let unmatched = collect_unmatched_optional_group_rows(
1407            input,
1408            &matched_indices,
1409            schema,
1410            optional_pattern_vars,
1411        )?;
1412
1413        if !unmatched.is_empty() {
1414            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1415                input,
1416                &unmatched,
1417                schema,
1418                optional_pattern_vars,
1419            )?;
1420            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1421                .map_err(|e| {
1422                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1423            })?;
1424            return Ok(combined);
1425        }
1426    }
1427
1428    Ok(expanded_batch)
1429}
1430
1431impl RecordBatchStream for GraphTraverseStream {
1432    fn schema(&self) -> SchemaRef {
1433        self.schema.clone()
1434    }
1435}
1436
1437/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1438type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1439
1440/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1441///
1442/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1443/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1444///
1445/// # Example
1446///
1447/// ```ignore
1448/// // Traverse schemaless "CUSTOM" edges
1449/// let traverse = GraphTraverseMainExec::new(
1450///     input_plan,
1451///     "_vid",
1452///     "CUSTOM",
1453///     Direction::Outgoing,
1454///     "m",           // target variable
1455///     Some("r"),     // edge variable
1456///     vec![],        // edge properties
1457///     vec![],        // target properties
1458///     graph_ctx,
1459///     false,         // not optional
1460/// );
1461/// ```
1462pub struct GraphTraverseMainExec {
1463    /// Input execution plan.
1464    input: Arc<dyn ExecutionPlan>,
1465
1466    /// Column name containing source VIDs.
1467    source_column: String,
1468
1469    /// Edge type names (not IDs, since schemaless types may not have IDs).
1470    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1471    type_names: Vec<String>,
1472
1473    /// Traversal direction.
1474    direction: Direction,
1475
1476    /// Variable name for target vertex columns.
1477    target_variable: String,
1478
1479    /// Variable name for edge columns (if edge is bound).
1480    edge_variable: Option<String>,
1481
1482    /// Edge properties to materialize.
1483    edge_properties: Vec<String>,
1484
1485    /// Target vertex properties to materialize.
1486    target_properties: Vec<String>,
1487
1488    /// Graph execution context.
1489    graph_ctx: Arc<GraphExecutionContext>,
1490
1491    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1492    optional: bool,
1493
1494    /// Variables introduced by the OPTIONAL MATCH pattern.
1495    optional_pattern_vars: HashSet<String>,
1496
1497    /// Column name of an already-bound target VID (for patterns where target is in scope).
1498    /// When set, only traversals reaching this exact VID are included.
1499    bound_target_column: Option<String>,
1500
1501    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1502    /// Edges matching any of these IDs are excluded from traversal results.
1503    used_edge_columns: Vec<String>,
1504
1505    /// Output schema.
1506    schema: SchemaRef,
1507
1508    /// Cached plan properties.
1509    properties: PlanProperties,
1510
1511    /// Execution metrics.
1512    metrics: ExecutionPlanMetricsSet,
1513}
1514
1515impl fmt::Debug for GraphTraverseMainExec {
1516    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1517        f.debug_struct("GraphTraverseMainExec")
1518            .field("type_names", &self.type_names)
1519            .field("direction", &self.direction)
1520            .field("target_variable", &self.target_variable)
1521            .field("edge_variable", &self.edge_variable)
1522            .finish()
1523    }
1524}
1525
1526impl GraphTraverseMainExec {
1527    /// Create a new schemaless traversal executor.
1528    #[expect(clippy::too_many_arguments)]
1529    pub fn new(
1530        input: Arc<dyn ExecutionPlan>,
1531        source_column: impl Into<String>,
1532        type_names: Vec<String>,
1533        direction: Direction,
1534        target_variable: impl Into<String>,
1535        edge_variable: Option<String>,
1536        edge_properties: Vec<String>,
1537        target_properties: Vec<String>,
1538        graph_ctx: Arc<GraphExecutionContext>,
1539        optional: bool,
1540        optional_pattern_vars: HashSet<String>,
1541        bound_target_column: Option<String>,
1542        used_edge_columns: Vec<String>,
1543    ) -> Self {
1544        let source_column = source_column.into();
1545        let target_variable = target_variable.into();
1546
1547        // Build output schema
1548        let schema = Self::build_schema(
1549            &input.schema(),
1550            &target_variable,
1551            &edge_variable,
1552            &edge_properties,
1553            &target_properties,
1554            optional,
1555        );
1556
1557        let properties = compute_plan_properties(schema.clone());
1558
1559        Self {
1560            input,
1561            source_column,
1562            type_names,
1563            direction,
1564            target_variable,
1565            edge_variable,
1566            edge_properties,
1567            target_properties,
1568            graph_ctx,
1569            optional,
1570            optional_pattern_vars,
1571            bound_target_column,
1572            used_edge_columns,
1573            schema,
1574            properties,
1575            metrics: ExecutionPlanMetricsSet::new(),
1576        }
1577    }
1578
1579    /// Build output schema for traversal.
1580    fn build_schema(
1581        input_schema: &SchemaRef,
1582        target_variable: &str,
1583        edge_variable: &Option<String>,
1584        edge_properties: &[String],
1585        target_properties: &[String],
1586        optional: bool,
1587    ) -> SchemaRef {
1588        let mut fields: Vec<Field> = input_schema
1589            .fields()
1590            .iter()
1591            .map(|f| f.as_ref().clone())
1592            .collect();
1593
1594        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1595        let target_vid_name = format!("{}._vid", target_variable);
1596        if input_schema.column_with_name(&target_vid_name).is_none() {
1597            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1598        }
1599
1600        // Add target ._labels column (only if not already in input)
1601        let target_labels_name = format!("{}._labels", target_variable);
1602        if input_schema.column_with_name(&target_labels_name).is_none() {
1603            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1604        }
1605
1606        // Add edge columns if edge variable is bound
1607        if let Some(edge_var) = edge_variable {
1608            fields.push(Field::new(
1609                format!("{}._eid", edge_var),
1610                DataType::UInt64,
1611                optional,
1612            ));
1613
1614            // Add edge ._type column for type(r) support
1615            fields.push(Field::new(
1616                format!("{}._type", edge_var),
1617                DataType::Utf8,
1618                true,
1619            ));
1620
1621            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1622            // Schemaless edges store properties as CypherValue blobs so that
1623            // Int, Float, etc. round-trip correctly through Arrow.
1624            for prop in edge_properties {
1625                let col_name = format!("{}.{}", edge_var, prop);
1626                let mut metadata = std::collections::HashMap::new();
1627                metadata.insert("cv_encoded".to_string(), "true".to_string());
1628                fields.push(
1629                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1630                );
1631            }
1632        } else {
1633            // Add internal edge ID for anonymous relationships so BindPath can
1634            // reconstruct named paths (p = (a)-[:T]->(b)).
1635            fields.push(Field::new(
1636                format!("__eid_to_{}", target_variable),
1637                DataType::UInt64,
1638                optional,
1639            ));
1640        }
1641
1642        // Target properties: all as LargeBinary (deferred to PropertyManager)
1643        for prop in target_properties {
1644            fields.push(Field::new(
1645                format!("{}.{}", target_variable, prop),
1646                DataType::LargeBinary,
1647                true,
1648            ));
1649        }
1650
1651        Arc::new(Schema::new(fields))
1652    }
1653}
1654
1655impl DisplayAs for GraphTraverseMainExec {
1656    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657        write!(
1658            f,
1659            "GraphTraverseMainExec: types={:?}, direction={:?}",
1660            self.type_names, self.direction
1661        )
1662    }
1663}
1664
1665impl ExecutionPlan for GraphTraverseMainExec {
1666    fn name(&self) -> &str {
1667        "GraphTraverseMainExec"
1668    }
1669
1670    fn as_any(&self) -> &dyn Any {
1671        self
1672    }
1673
1674    fn schema(&self) -> SchemaRef {
1675        self.schema.clone()
1676    }
1677
1678    fn properties(&self) -> &PlanProperties {
1679        &self.properties
1680    }
1681
1682    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1683        vec![&self.input]
1684    }
1685
1686    fn with_new_children(
1687        self: Arc<Self>,
1688        children: Vec<Arc<dyn ExecutionPlan>>,
1689    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1690        if children.len() != 1 {
1691            return Err(datafusion::error::DataFusionError::Plan(
1692                "GraphTraverseMainExec expects exactly one child".to_string(),
1693            ));
1694        }
1695
1696        Ok(Arc::new(Self {
1697            input: children[0].clone(),
1698            source_column: self.source_column.clone(),
1699            type_names: self.type_names.clone(),
1700            direction: self.direction,
1701            target_variable: self.target_variable.clone(),
1702            edge_variable: self.edge_variable.clone(),
1703            edge_properties: self.edge_properties.clone(),
1704            target_properties: self.target_properties.clone(),
1705            graph_ctx: self.graph_ctx.clone(),
1706            optional: self.optional,
1707            optional_pattern_vars: self.optional_pattern_vars.clone(),
1708            bound_target_column: self.bound_target_column.clone(),
1709            used_edge_columns: self.used_edge_columns.clone(),
1710            schema: self.schema.clone(),
1711            properties: self.properties.clone(),
1712            metrics: self.metrics.clone(),
1713        }))
1714    }
1715
1716    fn execute(
1717        &self,
1718        partition: usize,
1719        context: Arc<TaskContext>,
1720    ) -> DFResult<SendableRecordBatchStream> {
1721        let input_stream = self.input.execute(partition, context)?;
1722        let metrics = BaselineMetrics::new(&self.metrics, partition);
1723
1724        Ok(Box::pin(GraphTraverseMainStream::new(
1725            input_stream,
1726            self.source_column.clone(),
1727            self.type_names.clone(),
1728            self.direction,
1729            self.target_variable.clone(),
1730            self.edge_variable.clone(),
1731            self.edge_properties.clone(),
1732            self.target_properties.clone(),
1733            self.graph_ctx.clone(),
1734            self.optional,
1735            self.optional_pattern_vars.clone(),
1736            self.bound_target_column.clone(),
1737            self.used_edge_columns.clone(),
1738            self.schema.clone(),
1739            metrics,
1740        )))
1741    }
1742
1743    fn metrics(&self) -> Option<MetricsSet> {
1744        Some(self.metrics.clone_inner())
1745    }
1746}
1747
1748/// State machine for GraphTraverseMainStream.
1749enum GraphTraverseMainState {
1750    /// Loading adjacency map from main edges table.
1751    LoadingEdges {
1752        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1753        input_stream: SendableRecordBatchStream,
1754    },
1755    /// Processing input stream with loaded adjacency.
1756    Processing {
1757        adjacency: EdgeAdjacencyMap,
1758        input_stream: SendableRecordBatchStream,
1759    },
1760    /// Stream is done.
1761    Done,
1762}
1763
1764/// Stream that executes schemaless edge traversal.
1765struct GraphTraverseMainStream {
1766    /// Source column name.
1767    source_column: String,
1768
1769    /// Target variable name.
1770    target_variable: String,
1771
1772    /// Edge variable name.
1773    edge_variable: Option<String>,
1774
1775    /// Edge properties to materialize.
1776    edge_properties: Vec<String>,
1777
1778    /// Target properties to materialize.
1779    target_properties: Vec<String>,
1780
1781    /// Graph execution context.
1782    graph_ctx: Arc<GraphExecutionContext>,
1783
1784    /// Whether this is optional (preserve unmatched rows).
1785    optional: bool,
1786
1787    /// Variables introduced by OPTIONAL pattern.
1788    optional_pattern_vars: HashSet<String>,
1789
1790    /// Column name of an already-bound target VID (for filtering).
1791    bound_target_column: Option<String>,
1792
1793    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1794    used_edge_columns: Vec<String>,
1795
1796    /// Output schema.
1797    schema: SchemaRef,
1798
1799    /// Stream state.
1800    state: GraphTraverseMainState,
1801
1802    /// Metrics.
1803    metrics: BaselineMetrics,
1804}
1805
1806impl GraphTraverseMainStream {
1807    /// Create a new traverse main stream.
1808    #[expect(clippy::too_many_arguments)]
1809    fn new(
1810        input_stream: SendableRecordBatchStream,
1811        source_column: String,
1812        type_names: Vec<String>,
1813        direction: Direction,
1814        target_variable: String,
1815        edge_variable: Option<String>,
1816        edge_properties: Vec<String>,
1817        target_properties: Vec<String>,
1818        graph_ctx: Arc<GraphExecutionContext>,
1819        optional: bool,
1820        optional_pattern_vars: HashSet<String>,
1821        bound_target_column: Option<String>,
1822        used_edge_columns: Vec<String>,
1823        schema: SchemaRef,
1824        metrics: BaselineMetrics,
1825    ) -> Self {
1826        // Start by loading the adjacency map from the main edges table
1827        let loading_ctx = graph_ctx.clone();
1828        let loading_types = type_names.clone();
1829        let fut =
1830            async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1831
1832        Self {
1833            source_column,
1834            target_variable,
1835            edge_variable,
1836            edge_properties,
1837            target_properties,
1838            graph_ctx,
1839            optional,
1840            optional_pattern_vars,
1841            bound_target_column,
1842            used_edge_columns,
1843            schema,
1844            state: GraphTraverseMainState::LoadingEdges {
1845                future: Box::pin(fut),
1846                input_stream,
1847            },
1848            metrics,
1849        }
1850    }
1851
1852    /// Expand input batch using adjacency map (synchronous version).
1853    fn expand_batch(
1854        &self,
1855        input: &RecordBatch,
1856        adjacency: &EdgeAdjacencyMap,
1857    ) -> DFResult<RecordBatch> {
1858        // Extract source VIDs from source column
1859        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1860            datafusion::error::DataFusionError::Execution(format!(
1861                "Source column {} not found",
1862                self.source_column
1863            ))
1864        })?;
1865
1866        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1867        let source_vids: &UInt64Array = &source_vid_cow;
1868
1869        // Read bound target VIDs if column exists
1870        let bound_target_cow = self
1871            .bound_target_column
1872            .as_ref()
1873            .and_then(|col| input.column_by_name(col))
1874            .map(|c| column_as_vid_array(c.as_ref()))
1875            .transpose()?;
1876        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1877
1878        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1879        let used_edge_arrays: Vec<&UInt64Array> = self
1880            .used_edge_columns
1881            .iter()
1882            .filter_map(|col| {
1883                input
1884                    .column_by_name(col)
1885                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1886            })
1887            .collect();
1888
1889        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props)
1890        type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1891        let mut expansions: Vec<Expansion> = Vec::new();
1892
1893        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1894            if let Some(src_u64) = src_u64 {
1895                let src_vid = Vid::from(src_u64);
1896
1897                // Collect used edge IDs for this row from all previous hops
1898                let used_eids: HashSet<u64> = used_edge_arrays
1899                    .iter()
1900                    .filter_map(|arr| {
1901                        if arr.is_null(row_idx) {
1902                            None
1903                        } else {
1904                            Some(arr.value(row_idx))
1905                        }
1906                    })
1907                    .collect();
1908
1909                if let Some(neighbors) = adjacency.get(&src_vid) {
1910                    for (target_vid, eid, edge_type, props) in neighbors {
1911                        // Skip edges already used in previous hops (relationship uniqueness)
1912                        if used_eids.contains(&eid.as_u64()) {
1913                            continue;
1914                        }
1915
1916                        // Filter by bound target VID if set (for patterns where target is in scope).
1917                        // Only include traversals where the target matches the expected VID.
1918                        if let Some(targets) = expected_targets {
1919                            if targets.is_null(row_idx) {
1920                                continue;
1921                            }
1922                            let expected_vid = targets.value(row_idx);
1923                            if target_vid.as_u64() != expected_vid {
1924                                continue;
1925                            }
1926                        }
1927
1928                        expansions.push((
1929                            row_idx,
1930                            *target_vid,
1931                            *eid,
1932                            edge_type.clone(),
1933                            props.clone(),
1934                        ));
1935                    }
1936                }
1937            }
1938        }
1939
1940        // Handle OPTIONAL: preserve unmatched rows
1941        if expansions.is_empty() && self.optional {
1942            // No matches - return input with NULL columns appended
1943            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1944            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1945        }
1946
1947        if expansions.is_empty() {
1948            // No matches, not optional - return empty batch
1949            return Ok(RecordBatch::new_empty(self.schema.clone()));
1950        }
1951
1952        // Track matched rows for OPTIONAL handling
1953        let matched_rows: HashSet<usize> = if self.optional {
1954            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1955        } else {
1956            HashSet::new()
1957        };
1958
1959        // Expand input columns using Arrow take()
1960        let mut columns: Vec<ArrayRef> = Vec::new();
1961        let indices: Vec<u64> = expansions
1962            .iter()
1963            .map(|(idx, _, _, _, _)| *idx as u64)
1964            .collect();
1965        let indices_array = UInt64Array::from(indices);
1966
1967        for col in input.columns() {
1968            let expanded = take(col.as_ref(), &indices_array, None)?;
1969            columns.push(expanded);
1970        }
1971
1972        // Add target ._vid column (only if not already in input)
1973        let target_vid_name = format!("{}._vid", self.target_variable);
1974        let target_vids: Vec<u64> = expansions
1975            .iter()
1976            .map(|(_, vid, _, _, _)| vid.as_u64())
1977            .collect();
1978        if input.schema().column_with_name(&target_vid_name).is_none() {
1979            columns.push(Arc::new(UInt64Array::from(target_vids)));
1980        }
1981
1982        // Add target ._labels column (only if not already in input)
1983        let target_labels_name = format!("{}._labels", self.target_variable);
1984        if input
1985            .schema()
1986            .column_with_name(&target_labels_name)
1987            .is_none()
1988        {
1989            use arrow_array::builder::{ListBuilder, StringBuilder};
1990            let l0_ctx = self.graph_ctx.l0_context();
1991            let mut labels_builder = ListBuilder::new(StringBuilder::new());
1992            for (_, target_vid, _, _, _) in &expansions {
1993                let mut row_labels: Vec<String> = Vec::new();
1994                for l0 in l0_ctx.iter_l0_buffers() {
1995                    let guard = l0.read();
1996                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
1997                        for lbl in l0_labels {
1998                            if !row_labels.contains(lbl) {
1999                                row_labels.push(lbl.clone());
2000                            }
2001                        }
2002                    }
2003                }
2004                let values = labels_builder.values();
2005                for lbl in &row_labels {
2006                    values.append_value(lbl);
2007                }
2008                labels_builder.append(true);
2009            }
2010            columns.push(Arc::new(labels_builder.finish()));
2011        }
2012
2013        // Add edge columns if edge variable is bound.
2014        // For anonymous relationships, emit internal edge IDs for BindPath.
2015        if self.edge_variable.is_some() {
2016            // Add edge ._eid column
2017            let eids: Vec<u64> = expansions
2018                .iter()
2019                .map(|(_, _, eid, _, _)| eid.as_u64())
2020                .collect();
2021            columns.push(Arc::new(UInt64Array::from(eids)));
2022
2023            // Add edge ._type column
2024            {
2025                let mut type_builder = arrow_array::builder::StringBuilder::new();
2026                for (_, _, _, edge_type, _) in &expansions {
2027                    type_builder.append_value(edge_type);
2028                }
2029                columns.push(Arc::new(type_builder.finish()));
2030            }
2031
2032            // Add edge property columns as cv_encoded LargeBinary to preserve types
2033            for prop_name in &self.edge_properties {
2034                use crate::query::df_graph::scan::encode_cypher_value;
2035                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2036                if prop_name == "_all_props" {
2037                    // Serialize all edge properties to CypherValue blob
2038                    for (_, _, _, _, props) in &expansions {
2039                        if props.is_empty() {
2040                            builder.append_null();
2041                        } else {
2042                            let mut json_map = serde_json::Map::new();
2043                            for (k, v) in props.iter() {
2044                                let json_val: serde_json::Value = v.clone().into();
2045                                json_map.insert(k.clone(), json_val);
2046                            }
2047                            let json = serde_json::Value::Object(json_map);
2048                            match encode_cypher_value(&json) {
2049                                Ok(bytes) => builder.append_value(bytes),
2050                                Err(_) => builder.append_null(),
2051                            }
2052                        }
2053                    }
2054                } else {
2055                    // Named property as cv_encoded CypherValue
2056                    for (_, _, _, _, props) in &expansions {
2057                        match props.get(prop_name) {
2058                            Some(uni_common::Value::Null) | None => builder.append_null(),
2059                            Some(val) => {
2060                                let json_val: serde_json::Value = val.clone().into();
2061                                match encode_cypher_value(&json_val) {
2062                                    Ok(bytes) => builder.append_value(bytes),
2063                                    Err(_) => builder.append_null(),
2064                                }
2065                            }
2066                        }
2067                    }
2068                }
2069                columns.push(Arc::new(builder.finish()));
2070            }
2071        } else {
2072            let eids: Vec<u64> = expansions
2073                .iter()
2074                .map(|(_, _, eid, _, _)| eid.as_u64())
2075                .collect();
2076            columns.push(Arc::new(UInt64Array::from(eids)));
2077        }
2078
2079        // Add target property columns (hydrate from L0 buffers)
2080        {
2081            use crate::query::df_graph::scan::encode_cypher_value;
2082            let l0_ctx = self.graph_ctx.l0_context();
2083
2084            for prop_name in &self.target_properties {
2085                if prop_name == "_all_props" {
2086                    // Build full CypherValue blob from all L0 vertex properties
2087                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2088                    for (_, target_vid, _, _, _) in &expansions {
2089                        let mut merged_props = serde_json::Map::new();
2090                        for l0 in l0_ctx.iter_l0_buffers() {
2091                            let guard = l0.read();
2092                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2093                                for (k, v) in props.iter() {
2094                                    let json_val: serde_json::Value = v.clone().into();
2095                                    merged_props.insert(k.to_string(), json_val);
2096                                }
2097                            }
2098                        }
2099                        if merged_props.is_empty() {
2100                            builder.append_null();
2101                        } else {
2102                            let json = serde_json::Value::Object(merged_props);
2103                            match encode_cypher_value(&json) {
2104                                Ok(bytes) => builder.append_value(bytes),
2105                                Err(_) => builder.append_null(),
2106                            }
2107                        }
2108                    }
2109                    columns.push(Arc::new(builder.finish()));
2110                } else {
2111                    // Extract individual property from L0 and encode as CypherValue
2112                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2113                    for (_, target_vid, _, _, _) in &expansions {
2114                        let mut found = false;
2115                        for l0 in l0_ctx.iter_l0_buffers() {
2116                            let guard = l0.read();
2117                            if let Some(props) = guard.vertex_properties.get(target_vid)
2118                                && let Some(val) = props.get(prop_name.as_str())
2119                                && !val.is_null()
2120                            {
2121                                let json_val: serde_json::Value = val.clone().into();
2122                                if let Ok(bytes) = encode_cypher_value(&json_val) {
2123                                    builder.append_value(bytes);
2124                                    found = true;
2125                                    break;
2126                                }
2127                            }
2128                        }
2129                        if !found {
2130                            builder.append_null();
2131                        }
2132                    }
2133                    columns.push(Arc::new(builder.finish()));
2134                }
2135            }
2136        }
2137
2138        let matched_batch =
2139            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2140
2141        // Handle OPTIONAL: append unmatched rows with NULLs
2142        if self.optional {
2143            let unmatched = collect_unmatched_optional_group_rows(
2144                input,
2145                &matched_rows,
2146                &self.schema,
2147                &self.optional_pattern_vars,
2148            )?;
2149
2150            if unmatched.is_empty() {
2151                return Ok(matched_batch);
2152            }
2153
2154            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2155                input,
2156                &unmatched,
2157                &self.schema,
2158                &self.optional_pattern_vars,
2159            )?;
2160
2161            // Concatenate matched and unmatched batches
2162            use arrow::compute::concat_batches;
2163            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2164        } else {
2165            Ok(matched_batch)
2166        }
2167    }
2168}
2169
2170/// Build adjacency map from main edges table for given type names and direction.
2171///
2172/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2173/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2174/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2175async fn build_edge_adjacency_map(
2176    graph_ctx: &GraphExecutionContext,
2177    type_names: &[String],
2178    direction: Direction,
2179) -> DFResult<EdgeAdjacencyMap> {
2180    use uni_store::storage::main_edge::MainEdgeDataset;
2181
2182    let storage = graph_ctx.storage();
2183    let l0_ctx = graph_ctx.l0_context();
2184    let lancedb_store = storage.lancedb_store();
2185
2186    // Step 1: Query main edges table for all type names
2187    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2188    let edges_with_type = MainEdgeDataset::find_edges_by_type_names(lancedb_store, &type_refs)
2189        .await
2190        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2191
2192    // Preserve edge type name in the adjacency map for type(r) support
2193    let mut edges: Vec<(
2194        uni_common::Eid,
2195        uni_common::Vid,
2196        uni_common::Vid,
2197        String,
2198        uni_common::Properties,
2199    )> = edges_with_type.into_iter().collect();
2200
2201    // Step 2: Overlay L0 buffers for all type names
2202    for l0 in l0_ctx.iter_l0_buffers() {
2203        let l0_guard = l0.read();
2204
2205        for type_name in type_names {
2206            let l0_eids = l0_guard.eids_for_type(type_name);
2207
2208            // For each L0 edge, extract its information
2209            for &eid in &l0_eids {
2210                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2211                    let src_vid = edge_ref.src_vid;
2212                    let dst_vid = edge_ref.dst_vid;
2213
2214                    // Get properties for this edge from L0
2215                    let props = l0_guard
2216                        .edge_properties
2217                        .get(&eid)
2218                        .cloned()
2219                        .unwrap_or_default();
2220
2221                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2222                }
2223            }
2224        }
2225    }
2226
2227    // Step 3: Deduplicate by EID (L0 takes precedence)
2228    let mut seen_eids = HashSet::new();
2229    let mut unique_edges = Vec::new();
2230    for edge in edges.into_iter().rev() {
2231        if seen_eids.insert(edge.0) {
2232            unique_edges.push(edge);
2233        }
2234    }
2235    unique_edges.reverse();
2236
2237    // Step 4: Filter out edges tombstoned in any L0 buffer
2238    let mut tombstoned_eids = HashSet::new();
2239    for l0 in l0_ctx.iter_l0_buffers() {
2240        let l0_guard = l0.read();
2241        for eid in l0_guard.tombstones.keys() {
2242            tombstoned_eids.insert(*eid);
2243        }
2244    }
2245    if !tombstoned_eids.is_empty() {
2246        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2247    }
2248
2249    // Step 5: Build adjacency map based on direction
2250    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2251
2252    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2253        match direction {
2254            Direction::Outgoing => {
2255                adjacency
2256                    .entry(src_vid)
2257                    .or_default()
2258                    .push((dst_vid, eid, edge_type, props));
2259            }
2260            Direction::Incoming => {
2261                adjacency
2262                    .entry(dst_vid)
2263                    .or_default()
2264                    .push((src_vid, eid, edge_type, props));
2265            }
2266            Direction::Both => {
2267                adjacency.entry(src_vid).or_default().push((
2268                    dst_vid,
2269                    eid,
2270                    edge_type.clone(),
2271                    props.clone(),
2272                ));
2273                adjacency
2274                    .entry(dst_vid)
2275                    .or_default()
2276                    .push((src_vid, eid, edge_type, props));
2277            }
2278        }
2279    }
2280
2281    Ok(adjacency)
2282}
2283
2284impl Stream for GraphTraverseMainStream {
2285    type Item = DFResult<RecordBatch>;
2286
2287    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2288        loop {
2289            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2290
2291            match state {
2292                GraphTraverseMainState::LoadingEdges {
2293                    mut future,
2294                    input_stream,
2295                } => match future.as_mut().poll(cx) {
2296                    Poll::Ready(Ok(adjacency)) => {
2297                        // Move to processing state with loaded adjacency
2298                        self.state = GraphTraverseMainState::Processing {
2299                            adjacency,
2300                            input_stream,
2301                        };
2302                        // Continue loop to start processing
2303                    }
2304                    Poll::Ready(Err(e)) => {
2305                        self.state = GraphTraverseMainState::Done;
2306                        return Poll::Ready(Some(Err(e)));
2307                    }
2308                    Poll::Pending => {
2309                        self.state = GraphTraverseMainState::LoadingEdges {
2310                            future,
2311                            input_stream,
2312                        };
2313                        return Poll::Pending;
2314                    }
2315                },
2316                GraphTraverseMainState::Processing {
2317                    adjacency,
2318                    mut input_stream,
2319                } => {
2320                    // Check timeout
2321                    if let Err(e) = self.graph_ctx.check_timeout() {
2322                        return Poll::Ready(Some(Err(
2323                            datafusion::error::DataFusionError::Execution(e.to_string()),
2324                        )));
2325                    }
2326
2327                    match input_stream.poll_next_unpin(cx) {
2328                        Poll::Ready(Some(Ok(batch))) => {
2329                            // Expand batch using adjacency map
2330                            let result = self.expand_batch(&batch, &adjacency);
2331
2332                            self.state = GraphTraverseMainState::Processing {
2333                                adjacency,
2334                                input_stream,
2335                            };
2336
2337                            if let Ok(ref r) = result {
2338                                self.metrics.record_output(r.num_rows());
2339                            }
2340                            return Poll::Ready(Some(result));
2341                        }
2342                        Poll::Ready(Some(Err(e))) => {
2343                            self.state = GraphTraverseMainState::Done;
2344                            return Poll::Ready(Some(Err(e)));
2345                        }
2346                        Poll::Ready(None) => {
2347                            self.state = GraphTraverseMainState::Done;
2348                            return Poll::Ready(None);
2349                        }
2350                        Poll::Pending => {
2351                            self.state = GraphTraverseMainState::Processing {
2352                                adjacency,
2353                                input_stream,
2354                            };
2355                            return Poll::Pending;
2356                        }
2357                    }
2358                }
2359                GraphTraverseMainState::Done => {
2360                    return Poll::Ready(None);
2361                }
2362            }
2363        }
2364    }
2365}
2366
2367impl RecordBatchStream for GraphTraverseMainStream {
2368    fn schema(&self) -> SchemaRef {
2369        self.schema.clone()
2370    }
2371}
2372
2373/// Variable-length graph traversal execution plan.
2374///
2375/// Performs BFS traversal from source vertices with configurable min/max hops.
2376/// Tracks visited nodes to avoid cycles.
2377///
2378/// # Example
2379///
2380/// ```ignore
2381/// // Find all nodes 1-3 hops away via KNOWS edges
2382/// let traverse = GraphVariableLengthTraverseExec::new(
2383///     input_plan,
2384///     "_vid",
2385///     knows_type_id,
2386///     Direction::Outgoing,
2387///     1,  // min_hops
2388///     3,  // max_hops
2389///     Some("p"), // path variable
2390///     graph_ctx,
2391/// );
2392/// ```
2393pub struct GraphVariableLengthTraverseExec {
2394    /// Input execution plan.
2395    input: Arc<dyn ExecutionPlan>,
2396
2397    /// Column name containing source VIDs.
2398    source_column: String,
2399
2400    /// Edge type IDs to traverse.
2401    edge_type_ids: Vec<u32>,
2402
2403    /// Traversal direction.
2404    direction: Direction,
2405
2406    /// Minimum number of hops.
2407    min_hops: usize,
2408
2409    /// Maximum number of hops.
2410    max_hops: usize,
2411
2412    /// Variable name for target vertex columns.
2413    target_variable: String,
2414
2415    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2416    step_variable: Option<String>,
2417
2418    /// Variable name for path (if path is bound).
2419    path_variable: Option<String>,
2420
2421    /// Target vertex properties to materialize.
2422    target_properties: Vec<String>,
2423
2424    /// Target label name for property type resolution.
2425    target_label_name: Option<String>,
2426
2427    /// Whether this is an optional match (LEFT JOIN semantics).
2428    is_optional: bool,
2429
2430    /// Column name of an already-bound target VID (for patterns where target is in scope).
2431    bound_target_column: Option<String>,
2432
2433    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2434    edge_lance_filter: Option<String>,
2435
2436    /// Simple property equality conditions for per-edge L0 checking during BFS.
2437    /// Each entry is (property_name, expected_value).
2438    edge_property_conditions: Vec<(String, UniValue)>,
2439
2440    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2441    used_edge_columns: Vec<String>,
2442
2443    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2444    path_mode: super::nfa::PathMode,
2445
2446    /// Output mode determining BFS strategy.
2447    output_mode: super::nfa::VlpOutputMode,
2448
2449    /// Compiled NFA for path pattern matching.
2450    nfa: Arc<PathNfa>,
2451
2452    /// Graph execution context.
2453    graph_ctx: Arc<GraphExecutionContext>,
2454
2455    /// Output schema.
2456    schema: SchemaRef,
2457
2458    /// Cached plan properties.
2459    properties: PlanProperties,
2460
2461    /// Execution metrics.
2462    metrics: ExecutionPlanMetricsSet,
2463}
2464
2465impl fmt::Debug for GraphVariableLengthTraverseExec {
2466    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2467        f.debug_struct("GraphVariableLengthTraverseExec")
2468            .field("source_column", &self.source_column)
2469            .field("edge_type_ids", &self.edge_type_ids)
2470            .field("direction", &self.direction)
2471            .field("min_hops", &self.min_hops)
2472            .field("max_hops", &self.max_hops)
2473            .field("target_variable", &self.target_variable)
2474            .finish()
2475    }
2476}
2477
2478impl GraphVariableLengthTraverseExec {
2479    /// Create a new variable-length traversal plan.
2480    ///
2481    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2482    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2483    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2484    #[expect(clippy::too_many_arguments)]
2485    pub fn new(
2486        input: Arc<dyn ExecutionPlan>,
2487        source_column: impl Into<String>,
2488        edge_type_ids: Vec<u32>,
2489        direction: Direction,
2490        min_hops: usize,
2491        max_hops: usize,
2492        target_variable: impl Into<String>,
2493        step_variable: Option<String>,
2494        path_variable: Option<String>,
2495        target_properties: Vec<String>,
2496        target_label_name: Option<String>,
2497        graph_ctx: Arc<GraphExecutionContext>,
2498        is_optional: bool,
2499        bound_target_column: Option<String>,
2500        edge_lance_filter: Option<String>,
2501        edge_property_conditions: Vec<(String, UniValue)>,
2502        used_edge_columns: Vec<String>,
2503        path_mode: super::nfa::PathMode,
2504        output_mode: super::nfa::VlpOutputMode,
2505        qpp_nfa: Option<PathNfa>,
2506    ) -> Self {
2507        let source_column = source_column.into();
2508        let target_variable = target_variable.into();
2509
2510        // Resolve target property Arrow types from the schema
2511        let uni_schema = graph_ctx.storage().schema_manager().schema();
2512        let label_props = target_label_name
2513            .as_deref()
2514            .and_then(|ln| uni_schema.properties.get(ln));
2515
2516        // Build output schema
2517        let schema = Self::build_schema(
2518            input.schema(),
2519            &target_variable,
2520            step_variable.as_deref(),
2521            path_variable.as_deref(),
2522            &target_properties,
2523            label_props,
2524        );
2525        let properties = compute_plan_properties(schema.clone());
2526
2527        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2528        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2529            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2530        }));
2531
2532        Self {
2533            input,
2534            source_column,
2535            edge_type_ids,
2536            direction,
2537            min_hops,
2538            max_hops,
2539            target_variable,
2540            step_variable,
2541            path_variable,
2542            target_properties,
2543            target_label_name,
2544            is_optional,
2545            bound_target_column,
2546            edge_lance_filter,
2547            edge_property_conditions,
2548            used_edge_columns,
2549            path_mode,
2550            output_mode,
2551            nfa,
2552            graph_ctx,
2553            schema,
2554            properties,
2555            metrics: ExecutionPlanMetricsSet::new(),
2556        }
2557    }
2558
2559    /// Build output schema.
2560    fn build_schema(
2561        input_schema: SchemaRef,
2562        target_variable: &str,
2563        step_variable: Option<&str>,
2564        path_variable: Option<&str>,
2565        target_properties: &[String],
2566        label_props: Option<
2567            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2568        >,
2569    ) -> SchemaRef {
2570        let mut fields: Vec<Field> = input_schema
2571            .fields()
2572            .iter()
2573            .map(|f| f.as_ref().clone())
2574            .collect();
2575
2576        // Add target VID column (only if not already in input)
2577        let target_vid_name = format!("{}._vid", target_variable);
2578        if input_schema.column_with_name(&target_vid_name).is_none() {
2579            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2580        }
2581
2582        // Add target ._labels column (only if not already in input)
2583        let target_labels_name = format!("{}._labels", target_variable);
2584        if input_schema.column_with_name(&target_labels_name).is_none() {
2585            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2586        }
2587
2588        // Add target vertex property columns (skip if already in input)
2589        for prop_name in target_properties {
2590            let col_name = format!("{}.{}", target_variable, prop_name);
2591            if input_schema.column_with_name(&col_name).is_none() {
2592                let arrow_type = resolve_property_type(prop_name, label_props);
2593                fields.push(Field::new(&col_name, arrow_type, true));
2594            }
2595        }
2596
2597        // Add hop count
2598        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2599
2600        // Add step variable (edge list) if bound
2601        if let Some(step_var) = step_variable {
2602            fields.push(build_edge_list_field(step_var));
2603        }
2604
2605        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2606        if let Some(path_var) = path_variable
2607            && input_schema.column_with_name(path_var).is_none()
2608        {
2609            fields.push(build_path_struct_field(path_var));
2610        }
2611
2612        Arc::new(Schema::new(fields))
2613    }
2614}
2615
2616impl DisplayAs for GraphVariableLengthTraverseExec {
2617    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2618        write!(
2619            f,
2620            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2621            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2622        )
2623    }
2624}
2625
2626impl ExecutionPlan for GraphVariableLengthTraverseExec {
2627    fn name(&self) -> &str {
2628        "GraphVariableLengthTraverseExec"
2629    }
2630
2631    fn as_any(&self) -> &dyn Any {
2632        self
2633    }
2634
2635    fn schema(&self) -> SchemaRef {
2636        self.schema.clone()
2637    }
2638
2639    fn properties(&self) -> &PlanProperties {
2640        &self.properties
2641    }
2642
2643    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2644        vec![&self.input]
2645    }
2646
2647    fn with_new_children(
2648        self: Arc<Self>,
2649        children: Vec<Arc<dyn ExecutionPlan>>,
2650    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2651        if children.len() != 1 {
2652            return Err(datafusion::error::DataFusionError::Plan(
2653                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2654            ));
2655        }
2656
2657        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2658        Ok(Arc::new(Self::new(
2659            children[0].clone(),
2660            self.source_column.clone(),
2661            self.edge_type_ids.clone(),
2662            self.direction,
2663            self.min_hops,
2664            self.max_hops,
2665            self.target_variable.clone(),
2666            self.step_variable.clone(),
2667            self.path_variable.clone(),
2668            self.target_properties.clone(),
2669            self.target_label_name.clone(),
2670            self.graph_ctx.clone(),
2671            self.is_optional,
2672            self.bound_target_column.clone(),
2673            self.edge_lance_filter.clone(),
2674            self.edge_property_conditions.clone(),
2675            self.used_edge_columns.clone(),
2676            self.path_mode.clone(),
2677            self.output_mode.clone(),
2678            Some((*self.nfa).clone()),
2679        )))
2680    }
2681
2682    fn execute(
2683        &self,
2684        partition: usize,
2685        context: Arc<TaskContext>,
2686    ) -> DFResult<SendableRecordBatchStream> {
2687        let input_stream = self.input.execute(partition, context)?;
2688
2689        let metrics = BaselineMetrics::new(&self.metrics, partition);
2690
2691        let warm_fut = self
2692            .graph_ctx
2693            .warming_future(self.edge_type_ids.clone(), self.direction);
2694
2695        Ok(Box::pin(GraphVariableLengthTraverseStream {
2696            input: input_stream,
2697            exec: Arc::new(self.clone_for_stream()),
2698            schema: self.schema.clone(),
2699            state: VarLengthStreamState::Warming(warm_fut),
2700            metrics,
2701        }))
2702    }
2703
2704    fn metrics(&self) -> Option<MetricsSet> {
2705        Some(self.metrics.clone_inner())
2706    }
2707}
2708
2709impl GraphVariableLengthTraverseExec {
2710    /// Clone fields needed for stream (avoids cloning the full struct).
2711    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2712        GraphVariableLengthTraverseExecData {
2713            source_column: self.source_column.clone(),
2714            edge_type_ids: self.edge_type_ids.clone(),
2715            direction: self.direction,
2716            min_hops: self.min_hops,
2717            max_hops: self.max_hops,
2718            target_variable: self.target_variable.clone(),
2719            step_variable: self.step_variable.clone(),
2720            path_variable: self.path_variable.clone(),
2721            target_properties: self.target_properties.clone(),
2722            target_label_name: self.target_label_name.clone(),
2723            is_optional: self.is_optional,
2724            bound_target_column: self.bound_target_column.clone(),
2725            edge_lance_filter: self.edge_lance_filter.clone(),
2726            edge_property_conditions: self.edge_property_conditions.clone(),
2727            used_edge_columns: self.used_edge_columns.clone(),
2728            path_mode: self.path_mode.clone(),
2729            output_mode: self.output_mode.clone(),
2730            nfa: self.nfa.clone(),
2731            graph_ctx: self.graph_ctx.clone(),
2732        }
2733    }
2734}
2735
2736/// Data needed by the stream (without ExecutionPlan overhead).
2737#[expect(
2738    dead_code,
2739    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2740)]
2741struct GraphVariableLengthTraverseExecData {
2742    source_column: String,
2743    edge_type_ids: Vec<u32>,
2744    direction: Direction,
2745    min_hops: usize,
2746    max_hops: usize,
2747    target_variable: String,
2748    step_variable: Option<String>,
2749    path_variable: Option<String>,
2750    target_properties: Vec<String>,
2751    target_label_name: Option<String>,
2752    is_optional: bool,
2753    bound_target_column: Option<String>,
2754    #[expect(dead_code, reason = "Used in Phase 3 warming")]
2755    edge_lance_filter: Option<String>,
2756    /// Simple property equality conditions for per-edge L0 checking during BFS.
2757    edge_property_conditions: Vec<(String, UniValue)>,
2758    used_edge_columns: Vec<String>,
2759    path_mode: super::nfa::PathMode,
2760    output_mode: super::nfa::VlpOutputMode,
2761    nfa: Arc<PathNfa>,
2762    graph_ctx: Arc<GraphExecutionContext>,
2763}
2764
2765/// Safety cap for frontier size to prevent OOM on pathological graphs.
2766const MAX_FRONTIER_SIZE: usize = 500_000;
2767/// Safety cap for predecessor pool size.
2768const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2769
2770impl GraphVariableLengthTraverseExecData {
2771    /// Check if a vertex passes the target label filter.
2772    fn check_target_label(&self, vid: Vid) -> bool {
2773        if let Some(ref label_name) = self.target_label_name {
2774            let query_ctx = self.graph_ctx.query_context();
2775            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2776                Some(labels) => labels.contains(label_name),
2777                None => true, // not in L0, trust storage
2778            }
2779        } else {
2780            true
2781        }
2782    }
2783
2784    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
2785    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2786        match constraint {
2787            super::nfa::VertexConstraint::Label(label_name) => {
2788                let query_ctx = self.graph_ctx.query_context();
2789                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2790                    Some(labels) => labels.contains(label_name),
2791                    None => true, // not in L0, trust storage
2792                }
2793            }
2794        }
2795    }
2796
2797    /// Expand neighbors from a vertex through all NFA transitions from the given state.
2798    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
2799    fn expand_neighbors(
2800        &self,
2801        vid: Vid,
2802        state: NfaStateId,
2803        eid_filter: &EidFilter,
2804        used_eids: &FxHashSet<u64>,
2805    ) -> Vec<(Vid, Eid, NfaStateId)> {
2806        let is_undirected = matches!(self.direction, Direction::Both);
2807        let mut results = Vec::new();
2808
2809        for transition in self.nfa.transitions_from(state) {
2810            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2811
2812            for &etype in &transition.edge_type_ids {
2813                for (neighbor, eid) in
2814                    self.graph_ctx
2815                        .get_neighbors(vid, etype, transition.direction)
2816                {
2817                    // Deduplicate edges for undirected patterns
2818                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
2819                        continue;
2820                    }
2821
2822                    // Check EidFilter (edge property bitmap preselection)
2823                    if !eid_filter.contains(eid) {
2824                        continue;
2825                    }
2826
2827                    // Check edge property conditions (L0 in-memory properties)
2828                    if !self.edge_property_conditions.is_empty() {
2829                        let query_ctx = self.graph_ctx.query_context();
2830                        let passes = if let Some(props) =
2831                            l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2832                        {
2833                            self.edge_property_conditions
2834                                .iter()
2835                                .all(|(name, expected)| {
2836                                    props.get(name).is_some_and(|actual| actual == expected)
2837                                })
2838                        } else {
2839                            // Edge not in L0 (CSR/Lance) — relies on EidFilter
2840                            // for correctness. TODO: build EidFilter from Lance
2841                            // during warming for flushed edges.
2842                            true
2843                        };
2844                        if !passes {
2845                            continue;
2846                        }
2847                    }
2848
2849                    // Check cross-pattern relationship uniqueness
2850                    if used_eids.contains(&eid.as_u64()) {
2851                        continue;
2852                    }
2853
2854                    // Check NFA state constraint on the destination state (QPP label filters)
2855                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
2856                        && !self.check_state_constraint(neighbor, constraint)
2857                    {
2858                        continue;
2859                    }
2860
2861                    results.push((neighbor, eid, transition.to));
2862                }
2863            }
2864        }
2865
2866        results
2867    }
2868
2869    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
2870    ///
2871    /// Returns BFS results in the same format as the old bfs() for compatibility
2872    /// with build_output_batch.
2873    fn bfs_with_dag(
2874        &self,
2875        source: Vid,
2876        eid_filter: &EidFilter,
2877        used_eids: &FxHashSet<u64>,
2878        vid_filter: &VidFilter,
2879    ) -> Vec<BfsResult> {
2880        let nfa = &self.nfa;
2881        let selector = PathSelector::All;
2882        let mut dag = PredecessorDag::new(selector);
2883        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2884
2885        // Handle zero-length paths (min_hops == 0)
2886        if nfa.is_accepting(nfa.start_state())
2887            && self.check_target_label(source)
2888            && vid_filter.contains(source)
2889        {
2890            accepting.push((source, nfa.start_state(), 0));
2891        }
2892
2893        // Per-depth frontier BFS
2894        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2895        let mut depth: u32 = 0;
2896
2897        while !frontier.is_empty() && depth < self.max_hops as u32 {
2898            depth += 1;
2899            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2900            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2901
2902            for &(vid, state) in &frontier {
2903                for (neighbor, eid, dst_state) in
2904                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2905                {
2906                    // Record in predecessor DAG
2907                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2908
2909                    // Add to next frontier (deduplicated per depth)
2910                    if seen_at_depth.insert((neighbor, dst_state)) {
2911                        next_frontier.push((neighbor, dst_state));
2912
2913                        // Check if accepting
2914                        if nfa.is_accepting(dst_state)
2915                            && self.check_target_label(neighbor)
2916                            && vid_filter.contains(neighbor)
2917                        {
2918                            accepting.push((neighbor, dst_state, depth));
2919                        }
2920                    }
2921                }
2922            }
2923
2924            // Safety cap
2925            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2926                break;
2927            }
2928
2929            frontier = next_frontier;
2930        }
2931
2932        // Enumerate paths from DAG to produce BfsResult tuples
2933        let mut results: Vec<BfsResult> = Vec::new();
2934        for &(target, state, depth) in &accepting {
2935            dag.enumerate_paths(
2936                source,
2937                target,
2938                state,
2939                depth,
2940                depth,
2941                &self.path_mode,
2942                &mut |nodes, edges| {
2943                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2944                    std::ops::ControlFlow::Continue(())
2945                },
2946            );
2947        }
2948
2949        results
2950    }
2951
2952    /// NFA-driven BFS returning only endpoints and depths (Mode A).
2953    ///
2954    /// More efficient when no path/step variable is bound — skips full path enumeration.
2955    /// Uses lightweight trail verification via has_trail_valid_path().
2956    fn bfs_endpoints_only(
2957        &self,
2958        source: Vid,
2959        eid_filter: &EidFilter,
2960        used_eids: &FxHashSet<u64>,
2961        vid_filter: &VidFilter,
2962    ) -> Vec<(Vid, u32)> {
2963        let nfa = &self.nfa;
2964        let selector = PathSelector::Any; // Only need existence, not all paths
2965        let mut dag = PredecessorDag::new(selector);
2966        let mut results: Vec<(Vid, u32)> = Vec::new();
2967
2968        // Handle zero-length paths
2969        if nfa.is_accepting(nfa.start_state())
2970            && self.check_target_label(source)
2971            && vid_filter.contains(source)
2972        {
2973            results.push((source, 0));
2974        }
2975
2976        // Per-depth frontier BFS
2977        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2978        let mut depth: u32 = 0;
2979
2980        while !frontier.is_empty() && depth < self.max_hops as u32 {
2981            depth += 1;
2982            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2983            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2984
2985            for &(vid, state) in &frontier {
2986                for (neighbor, eid, dst_state) in
2987                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2988                {
2989                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2990
2991                    if seen_at_depth.insert((neighbor, dst_state)) {
2992                        next_frontier.push((neighbor, dst_state));
2993
2994                        // Check if accepting with trail verification
2995                        if nfa.is_accepting(dst_state)
2996                            && self.check_target_label(neighbor)
2997                            && vid_filter.contains(neighbor)
2998                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
2999                        {
3000                            results.push((neighbor, depth));
3001                        }
3002                    }
3003                }
3004            }
3005
3006            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3007                break;
3008            }
3009
3010            frontier = next_frontier;
3011        }
3012
3013        results
3014    }
3015}
3016
3017/// State machine for variable-length traverse stream.
3018enum VarLengthStreamState {
3019    /// Warming adjacency CSRs before first batch.
3020    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3021    /// Processing input batches.
3022    Reading,
3023    /// Materializing target vertex properties asynchronously.
3024    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3025    /// Stream is done.
3026    Done,
3027}
3028
3029/// Stream for variable-length traversal.
3030struct GraphVariableLengthTraverseStream {
3031    input: SendableRecordBatchStream,
3032    exec: Arc<GraphVariableLengthTraverseExecData>,
3033    schema: SchemaRef,
3034    state: VarLengthStreamState,
3035    metrics: BaselineMetrics,
3036}
3037
3038impl Stream for GraphVariableLengthTraverseStream {
3039    type Item = DFResult<RecordBatch>;
3040
3041    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3042        loop {
3043            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3044
3045            match state {
3046                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3047                    Poll::Ready(Ok(())) => {
3048                        self.state = VarLengthStreamState::Reading;
3049                        // Continue loop to start reading
3050                    }
3051                    Poll::Ready(Err(e)) => {
3052                        self.state = VarLengthStreamState::Done;
3053                        return Poll::Ready(Some(Err(e)));
3054                    }
3055                    Poll::Pending => {
3056                        self.state = VarLengthStreamState::Warming(fut);
3057                        return Poll::Pending;
3058                    }
3059                },
3060                VarLengthStreamState::Reading => {
3061                    // Check timeout
3062                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3063                        return Poll::Ready(Some(Err(
3064                            datafusion::error::DataFusionError::Execution(e.to_string()),
3065                        )));
3066                    }
3067
3068                    match self.input.poll_next_unpin(cx) {
3069                        Poll::Ready(Some(Ok(batch))) => {
3070                            // Build base batch synchronously (BFS + expand)
3071                            // TODO(Phase 3.5): Build real EidFilter/VidFilter during warming
3072                            let eid_filter = EidFilter::AllAllowed;
3073                            let vid_filter = VidFilter::AllAllowed;
3074                            let base_result =
3075                                self.process_batch_base(batch, &eid_filter, &vid_filter);
3076                            let base_batch = match base_result {
3077                                Ok(b) => b,
3078                                Err(e) => {
3079                                    self.state = VarLengthStreamState::Reading;
3080                                    return Poll::Ready(Some(Err(e)));
3081                                }
3082                            };
3083
3084                            // If no properties need async hydration, return directly
3085                            if self.exec.target_properties.is_empty() {
3086                                self.state = VarLengthStreamState::Reading;
3087                                return Poll::Ready(Some(Ok(base_batch)));
3088                            }
3089
3090                            // Properties needed — create async future for hydration
3091                            let schema = self.schema.clone();
3092                            let target_variable = self.exec.target_variable.clone();
3093                            let target_properties = self.exec.target_properties.clone();
3094                            let target_label_name = self.exec.target_label_name.clone();
3095                            let graph_ctx = self.exec.graph_ctx.clone();
3096
3097                            let fut = hydrate_vlp_target_properties(
3098                                base_batch,
3099                                schema,
3100                                target_variable,
3101                                target_properties,
3102                                target_label_name,
3103                                graph_ctx,
3104                            );
3105
3106                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3107                            // Continue loop to poll the future
3108                        }
3109                        Poll::Ready(Some(Err(e))) => {
3110                            self.state = VarLengthStreamState::Done;
3111                            return Poll::Ready(Some(Err(e)));
3112                        }
3113                        Poll::Ready(None) => {
3114                            self.state = VarLengthStreamState::Done;
3115                            return Poll::Ready(None);
3116                        }
3117                        Poll::Pending => {
3118                            self.state = VarLengthStreamState::Reading;
3119                            return Poll::Pending;
3120                        }
3121                    }
3122                }
3123                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3124                    Poll::Ready(Ok(batch)) => {
3125                        self.state = VarLengthStreamState::Reading;
3126                        self.metrics.record_output(batch.num_rows());
3127                        return Poll::Ready(Some(Ok(batch)));
3128                    }
3129                    Poll::Ready(Err(e)) => {
3130                        self.state = VarLengthStreamState::Done;
3131                        return Poll::Ready(Some(Err(e)));
3132                    }
3133                    Poll::Pending => {
3134                        self.state = VarLengthStreamState::Materializing(fut);
3135                        return Poll::Pending;
3136                    }
3137                },
3138                VarLengthStreamState::Done => {
3139                    return Poll::Ready(None);
3140                }
3141            }
3142        }
3143    }
3144}
3145
3146impl GraphVariableLengthTraverseStream {
3147    fn process_batch_base(
3148        &self,
3149        batch: RecordBatch,
3150        eid_filter: &EidFilter,
3151        vid_filter: &VidFilter,
3152    ) -> DFResult<RecordBatch> {
3153        let source_col = batch
3154            .column_by_name(&self.exec.source_column)
3155            .ok_or_else(|| {
3156                datafusion::error::DataFusionError::Execution(format!(
3157                    "Source column '{}' not found",
3158                    self.exec.source_column
3159                ))
3160            })?;
3161
3162        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3163        let source_vids: &UInt64Array = &source_vid_cow;
3164
3165        // Read bound target VIDs if column exists
3166        let bound_target_cow = self
3167            .exec
3168            .bound_target_column
3169            .as_ref()
3170            .and_then(|col| batch.column_by_name(col))
3171            .map(|c| column_as_vid_array(c.as_ref()))
3172            .transpose()?;
3173        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3174
3175        // Extract used edge columns for cross-pattern relationship uniqueness
3176        let used_edge_arrays: Vec<&UInt64Array> = self
3177            .exec
3178            .used_edge_columns
3179            .iter()
3180            .filter_map(|col| {
3181                batch
3182                    .column_by_name(col)?
3183                    .as_any()
3184                    .downcast_ref::<UInt64Array>()
3185            })
3186            .collect();
3187
3188        // Collect all BFS results
3189        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3190
3191        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3192            let mut emitted_for_row = false;
3193
3194            if let Some(src) = source_vid {
3195                let vid = Vid::from(src);
3196
3197                // Collect used edge IDs from previous hops for this row
3198                let used_eids: FxHashSet<u64> = used_edge_arrays
3199                    .iter()
3200                    .filter_map(|arr| {
3201                        if arr.is_null(row_idx) {
3202                            None
3203                        } else {
3204                            Some(arr.value(row_idx))
3205                        }
3206                    })
3207                    .collect();
3208
3209                // Dispatch to appropriate BFS mode based on output_mode
3210                match &self.exec.output_mode {
3211                    VlpOutputMode::EndpointsOnly => {
3212                        let endpoints = self
3213                            .exec
3214                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3215                        for (target, depth) in endpoints {
3216                            // Filter by bound target VID
3217                            if let Some(targets) = expected_targets {
3218                                if targets.is_null(row_idx) {
3219                                    continue;
3220                                }
3221                                if target.as_u64() != targets.value(row_idx) {
3222                                    continue;
3223                                }
3224                            }
3225                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3226                            emitted_for_row = true;
3227                        }
3228                    }
3229                    _ => {
3230                        // FullPath, StepVariable, CountOnly, etc.
3231                        let bfs_results = self
3232                            .exec
3233                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3234                        for (target, hop_count, node_path, edge_path) in bfs_results {
3235                            // Filter by bound target VID
3236                            if let Some(targets) = expected_targets {
3237                                if targets.is_null(row_idx) {
3238                                    continue;
3239                                }
3240                                if target.as_u64() != targets.value(row_idx) {
3241                                    continue;
3242                                }
3243                            }
3244                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3245                            emitted_for_row = true;
3246                        }
3247                    }
3248                }
3249            }
3250
3251            if self.exec.is_optional && !emitted_for_row {
3252                // Preserve the source row with NULL optional bindings.
3253                // We use empty node/edge paths to mark unmatched rows.
3254                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3255            }
3256        }
3257
3258        self.build_output_batch(&batch, &expansions)
3259    }
3260
3261    fn build_output_batch(
3262        &self,
3263        input: &RecordBatch,
3264        expansions: &[VarLengthExpansion],
3265    ) -> DFResult<RecordBatch> {
3266        if expansions.is_empty() {
3267            return Ok(RecordBatch::new_empty(self.schema.clone()));
3268        }
3269
3270        let num_rows = expansions.len();
3271
3272        // Build index array
3273        let indices: Vec<u64> = expansions
3274            .iter()
3275            .map(|(idx, _, _, _, _)| *idx as u64)
3276            .collect();
3277        let indices_array = UInt64Array::from(indices);
3278
3279        // Expand input columns
3280        let mut columns: Vec<ArrayRef> = Vec::new();
3281        for col in input.columns() {
3282            let expanded = take(col.as_ref(), &indices_array, None)?;
3283            columns.push(expanded);
3284        }
3285
3286        // Collect target VIDs and unmatched markers for use in multiple places.
3287        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3288        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3289        let unmatched_rows: Vec<bool> = expansions
3290            .iter()
3291            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3292            .collect();
3293        let target_vids: Vec<Option<u64>> = expansions
3294            .iter()
3295            .zip(unmatched_rows.iter())
3296            .map(
3297                |((_, vid, _, _, _), unmatched)| {
3298                    if *unmatched { None } else { Some(vid.as_u64()) }
3299                },
3300            )
3301            .collect();
3302
3303        // Add target VID column (only if not already in input)
3304        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3305        if input.schema().column_with_name(&target_vid_name).is_none() {
3306            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3307        }
3308
3309        // Add target ._labels column (only if not already in input)
3310        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3311        if input
3312            .schema()
3313            .column_with_name(&target_labels_name)
3314            .is_none()
3315        {
3316            use arrow_array::builder::{ListBuilder, StringBuilder};
3317            let query_ctx = self.exec.graph_ctx.query_context();
3318            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3319            for target_vid in &target_vids {
3320                let Some(vid_u64) = target_vid else {
3321                    labels_builder.append(false);
3322                    continue;
3323                };
3324                let vid = Vid::from(*vid_u64);
3325                let row_labels: Vec<String> =
3326                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3327                        Some(labels) => {
3328                            // Vertex is in L0 — use actual labels only
3329                            labels
3330                        }
3331                        None => {
3332                            // Vertex not in L0 — trust schema label (storage already filtered)
3333                            if let Some(ref label_name) = self.exec.target_label_name {
3334                                vec![label_name.clone()]
3335                            } else {
3336                                vec![]
3337                            }
3338                        }
3339                    };
3340                let values = labels_builder.values();
3341                for lbl in &row_labels {
3342                    values.append_value(lbl);
3343                }
3344                labels_builder.append(true);
3345            }
3346            columns.push(Arc::new(labels_builder.finish()));
3347        }
3348
3349        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3350        for prop_name in &self.exec.target_properties {
3351            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3352            if input.schema().column_with_name(&full_prop_name).is_none() {
3353                let col_idx = columns.len();
3354                if col_idx < self.schema.fields().len() {
3355                    let field = self.schema.field(col_idx);
3356                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3357                }
3358            }
3359        }
3360
3361        // Add hop count column
3362        let hop_counts: Vec<u64> = expansions
3363            .iter()
3364            .map(|(_, _, hops, _, _)| *hops as u64)
3365            .collect();
3366        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3367
3368        // Add step variable (edge list) column if bound
3369        if self.exec.step_variable.is_some() {
3370            let mut edges_builder = new_edge_list_builder();
3371            let query_ctx = self.exec.graph_ctx.query_context();
3372
3373            for (_, _, _, node_path, edge_path) in expansions {
3374                if node_path.is_empty() && edge_path.is_empty() {
3375                    // Null row for OPTIONAL MATCH unmatched
3376                    edges_builder.append_null();
3377                } else if edge_path.is_empty() {
3378                    // Zero-hop match: empty list
3379                    edges_builder.append(true);
3380                } else {
3381                    for (i, eid) in edge_path.iter().enumerate() {
3382                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3383                            .unwrap_or_else(|| "UNKNOWN".to_string());
3384                        append_edge_to_struct(
3385                            edges_builder.values(),
3386                            *eid,
3387                            &type_name,
3388                            node_path[i].as_u64(),
3389                            node_path[i + 1].as_u64(),
3390                            &query_ctx,
3391                        );
3392                    }
3393                    edges_builder.append(true);
3394                }
3395            }
3396
3397            columns.push(Arc::new(edges_builder.finish()));
3398        }
3399
3400        // Add path variable column if bound.
3401        // For named paths, we output a Path struct with nodes and relationships arrays.
3402        // If a path column already exists in input (from a prior BindFixedPath), extend it
3403        // rather than building from scratch.
3404        if let Some(path_var_name) = &self.exec.path_variable {
3405            let existing_path_col_idx = input
3406                .schema()
3407                .column_with_name(path_var_name)
3408                .map(|(idx, _)| idx);
3409            // Clone the Arc so we can read existing path without borrowing `columns`
3410            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3411            let existing_path = existing_path_arc
3412                .as_ref()
3413                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3414
3415            let mut nodes_builder = new_node_list_builder();
3416            let mut rels_builder = new_edge_list_builder();
3417            let query_ctx = self.exec.graph_ctx.query_context();
3418            let mut path_validity = Vec::with_capacity(expansions.len());
3419
3420            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3421                if node_path.is_empty() && edge_path.is_empty() {
3422                    nodes_builder.append(false);
3423                    rels_builder.append(false);
3424                    path_validity.push(false);
3425                    continue;
3426                }
3427
3428                // Prepend existing path prefix if extending
3429                let skip_first_vlp_node = if let Some(existing) = existing_path {
3430                    if !existing.is_null(row_out_idx) {
3431                        prepend_existing_path(
3432                            existing,
3433                            row_out_idx,
3434                            &mut nodes_builder,
3435                            &mut rels_builder,
3436                            &query_ctx,
3437                        );
3438                        true
3439                    } else {
3440                        false
3441                    }
3442                } else {
3443                    false
3444                };
3445
3446                // Append VLP nodes (skip first if extending — it's the junction point)
3447                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3448                for vid in &node_path[start_idx..] {
3449                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3450                }
3451                nodes_builder.append(true);
3452
3453                for (i, eid) in edge_path.iter().enumerate() {
3454                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3455                        .unwrap_or_else(|| "UNKNOWN".to_string());
3456                    append_edge_to_struct(
3457                        rels_builder.values(),
3458                        *eid,
3459                        &type_name,
3460                        node_path[i].as_u64(),
3461                        node_path[i + 1].as_u64(),
3462                        &query_ctx,
3463                    );
3464                }
3465                rels_builder.append(true);
3466                path_validity.push(true);
3467            }
3468
3469            // Finish builders and get ListArrays
3470            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3471            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3472
3473            // Build the path struct fields
3474            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3475            let rels_field = Arc::new(Field::new(
3476                "relationships",
3477                rels_array.data_type().clone(),
3478                true,
3479            ));
3480
3481            // Create the path struct array
3482            let path_struct = arrow_array::StructArray::try_new(
3483                vec![nodes_field, rels_field].into(),
3484                vec![nodes_array, rels_array],
3485                Some(arrow::buffer::NullBuffer::from(path_validity)),
3486            )
3487            .map_err(arrow_err)?;
3488
3489            if let Some(idx) = existing_path_col_idx {
3490                columns[idx] = Arc::new(path_struct);
3491            } else {
3492                columns.push(Arc::new(path_struct));
3493            }
3494        }
3495
3496        self.metrics.record_output(num_rows);
3497
3498        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3499    }
3500}
3501
3502impl RecordBatchStream for GraphVariableLengthTraverseStream {
3503    fn schema(&self) -> SchemaRef {
3504        self.schema.clone()
3505    }
3506}
3507
3508/// Hydrate target vertex properties into a VLP batch.
3509///
3510/// The base batch already has null placeholder columns for target properties.
3511/// This function replaces them with actual property values fetched from storage.
3512async fn hydrate_vlp_target_properties(
3513    base_batch: RecordBatch,
3514    schema: SchemaRef,
3515    target_variable: String,
3516    target_properties: Vec<String>,
3517    target_label_name: Option<String>,
3518    graph_ctx: Arc<GraphExecutionContext>,
3519) -> DFResult<RecordBatch> {
3520    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3521        return Ok(base_batch);
3522    }
3523
3524    // Find the target VID column by exact name.
3525    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3526    //
3527    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3528    // clauses referencing the same variable), there may be duplicate column names. We need
3529    // the LAST occurrence of target._vid, which is the one added by the VLP.
3530    let target_vid_col_name = format!("{}._vid", target_variable);
3531    let vid_col_idx = schema
3532        .fields()
3533        .iter()
3534        .enumerate()
3535        .rev()
3536        .find(|(_, f)| f.name() == &target_vid_col_name)
3537        .map(|(i, _)| i);
3538
3539    let Some(vid_col_idx) = vid_col_idx else {
3540        return Ok(base_batch);
3541    };
3542
3543    let vid_col = base_batch.column(vid_col_idx);
3544    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3545    let target_vid_array: &UInt64Array = &target_vid_cow;
3546
3547    let target_vids: Vec<Vid> = target_vid_array
3548        .iter()
3549        // Preserve null rows by mapping them to a sentinel VID that never resolves
3550        // to stored properties. The output property columns remain NULL for these rows.
3551        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3552        .collect();
3553
3554    // Fetch properties from storage
3555    let mut property_columns: Vec<ArrayRef> = Vec::new();
3556
3557    if let Some(ref label_name) = target_label_name {
3558        let property_manager = graph_ctx.property_manager();
3559        let query_ctx = graph_ctx.query_context();
3560
3561        let props_map = property_manager
3562            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3563            .await
3564            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3565
3566        let uni_schema = graph_ctx.storage().schema_manager().schema();
3567        let label_props = uni_schema.properties.get(label_name.as_str());
3568
3569        for prop_name in &target_properties {
3570            let data_type = resolve_property_type(prop_name, label_props);
3571            let column =
3572                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3573            property_columns.push(column);
3574        }
3575    } else {
3576        // No label name — use label-agnostic property lookup.
3577        // This scans all label datasets, slower but correct for label-less traversals.
3578        let non_internal_props: Vec<&str> = target_properties
3579            .iter()
3580            .filter(|p| *p != "_all_props")
3581            .map(|s| s.as_str())
3582            .collect();
3583        let property_manager = graph_ctx.property_manager();
3584        let query_ctx = graph_ctx.query_context();
3585
3586        let props_map = if !non_internal_props.is_empty() {
3587            property_manager
3588                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3589                .await
3590                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3591        } else {
3592            std::collections::HashMap::new()
3593        };
3594
3595        for prop_name in &target_properties {
3596            if prop_name == "_all_props" {
3597                // Build CypherValue blob from all vertex properties (L0 + storage)
3598                use crate::query::df_graph::scan::encode_cypher_value;
3599                use arrow_array::builder::LargeBinaryBuilder;
3600
3601                let mut builder = LargeBinaryBuilder::new();
3602                let l0_ctx = graph_ctx.l0_context();
3603                for vid in &target_vids {
3604                    let mut merged_props = serde_json::Map::new();
3605                    // Collect from storage-hydrated props
3606                    if let Some(vid_props) = props_map.get(vid) {
3607                        for (k, v) in vid_props.iter() {
3608                            let json_val: serde_json::Value = v.clone().into();
3609                            merged_props.insert(k.to_string(), json_val);
3610                        }
3611                    }
3612                    // Overlay L0 properties
3613                    for l0 in l0_ctx.iter_l0_buffers() {
3614                        let guard = l0.read();
3615                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3616                            for (k, v) in l0_props.iter() {
3617                                let json_val: serde_json::Value = v.clone().into();
3618                                merged_props.insert(k.to_string(), json_val);
3619                            }
3620                        }
3621                    }
3622                    if merged_props.is_empty() {
3623                        builder.append_null();
3624                    } else {
3625                        let json = serde_json::Value::Object(merged_props);
3626                        match encode_cypher_value(&json) {
3627                            Ok(bytes) => builder.append_value(bytes),
3628                            Err(_) => builder.append_null(),
3629                        }
3630                    }
3631                }
3632                property_columns.push(Arc::new(builder.finish()));
3633            } else {
3634                let column = build_property_column_static(
3635                    &target_vids,
3636                    &props_map,
3637                    prop_name,
3638                    &arrow::datatypes::DataType::LargeBinary,
3639                )?;
3640                property_columns.push(column);
3641            }
3642        }
3643    }
3644
3645    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3646    // Find each property column by name — works regardless of column ordering
3647    // (schema-aware puts props before _hop_count; schemaless puts them after).
3648    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3649    // not pre-existing input columns with the same name (duplicate variable binding).
3650    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3651    let mut prop_idx = 0;
3652    for (col_idx, field) in schema.fields().iter().enumerate() {
3653        let is_target_prop = col_idx > vid_col_idx
3654            && target_properties
3655                .iter()
3656                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3657        if is_target_prop && prop_idx < property_columns.len() {
3658            new_columns.push(property_columns[prop_idx].clone());
3659            prop_idx += 1;
3660        } else {
3661            new_columns.push(base_batch.column(col_idx).clone());
3662        }
3663    }
3664
3665    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3666}
3667
3668// ============================================================================
3669// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3670// ============================================================================
3671
3672/// Execution plan for variable-length path traversal on schemaless edge types.
3673///
3674/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3675/// that don't have schema-defined IDs. It queries the main edges table by type name.
3676/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3677pub struct GraphVariableLengthTraverseMainExec {
3678    /// Input execution plan.
3679    input: Arc<dyn ExecutionPlan>,
3680
3681    /// Column name containing source VIDs.
3682    source_column: String,
3683
3684    /// Edge type names (not IDs, since schemaless types may not have IDs).
3685    type_names: Vec<String>,
3686
3687    /// Traversal direction.
3688    direction: Direction,
3689
3690    /// Minimum number of hops.
3691    min_hops: usize,
3692
3693    /// Maximum number of hops.
3694    max_hops: usize,
3695
3696    /// Variable name for target vertex columns.
3697    target_variable: String,
3698
3699    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3700    step_variable: Option<String>,
3701
3702    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3703    path_variable: Option<String>,
3704
3705    /// Target vertex properties to materialize.
3706    target_properties: Vec<String>,
3707
3708    /// Whether this is an optional match (LEFT JOIN semantics).
3709    is_optional: bool,
3710
3711    /// Column name of an already-bound target VID (for patterns where target is in scope).
3712    bound_target_column: Option<String>,
3713
3714    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3715    edge_lance_filter: Option<String>,
3716
3717    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
3718    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
3719    edge_property_conditions: Vec<(String, UniValue)>,
3720
3721    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
3722    used_edge_columns: Vec<String>,
3723
3724    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
3725    path_mode: super::nfa::PathMode,
3726
3727    /// Output mode determining BFS strategy.
3728    output_mode: super::nfa::VlpOutputMode,
3729
3730    /// Graph execution context.
3731    graph_ctx: Arc<GraphExecutionContext>,
3732
3733    /// Output schema.
3734    schema: SchemaRef,
3735
3736    /// Cached plan properties.
3737    properties: PlanProperties,
3738
3739    /// Execution metrics.
3740    metrics: ExecutionPlanMetricsSet,
3741}
3742
3743impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3744    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3745        f.debug_struct("GraphVariableLengthTraverseMainExec")
3746            .field("source_column", &self.source_column)
3747            .field("type_names", &self.type_names)
3748            .field("direction", &self.direction)
3749            .field("min_hops", &self.min_hops)
3750            .field("max_hops", &self.max_hops)
3751            .field("target_variable", &self.target_variable)
3752            .finish()
3753    }
3754}
3755
3756impl GraphVariableLengthTraverseMainExec {
3757    /// Create a new variable-length traversal plan for schemaless edges.
3758    #[expect(clippy::too_many_arguments)]
3759    pub fn new(
3760        input: Arc<dyn ExecutionPlan>,
3761        source_column: impl Into<String>,
3762        type_names: Vec<String>,
3763        direction: Direction,
3764        min_hops: usize,
3765        max_hops: usize,
3766        target_variable: impl Into<String>,
3767        step_variable: Option<String>,
3768        path_variable: Option<String>,
3769        target_properties: Vec<String>,
3770        graph_ctx: Arc<GraphExecutionContext>,
3771        is_optional: bool,
3772        bound_target_column: Option<String>,
3773        edge_lance_filter: Option<String>,
3774        edge_property_conditions: Vec<(String, UniValue)>,
3775        used_edge_columns: Vec<String>,
3776        path_mode: super::nfa::PathMode,
3777        output_mode: super::nfa::VlpOutputMode,
3778    ) -> Self {
3779        let source_column = source_column.into();
3780        let target_variable = target_variable.into();
3781
3782        // Build output schema
3783        let schema = Self::build_schema(
3784            input.schema(),
3785            &target_variable,
3786            step_variable.as_deref(),
3787            path_variable.as_deref(),
3788            &target_properties,
3789        );
3790        let properties = compute_plan_properties(schema.clone());
3791
3792        Self {
3793            input,
3794            source_column,
3795            type_names,
3796            direction,
3797            min_hops,
3798            max_hops,
3799            target_variable,
3800            step_variable,
3801            path_variable,
3802            target_properties,
3803            is_optional,
3804            bound_target_column,
3805            edge_lance_filter,
3806            edge_property_conditions,
3807            used_edge_columns,
3808            path_mode,
3809            output_mode,
3810            graph_ctx,
3811            schema,
3812            properties,
3813            metrics: ExecutionPlanMetricsSet::new(),
3814        }
3815    }
3816
3817    /// Build output schema.
3818    fn build_schema(
3819        input_schema: SchemaRef,
3820        target_variable: &str,
3821        step_variable: Option<&str>,
3822        path_variable: Option<&str>,
3823        target_properties: &[String],
3824    ) -> SchemaRef {
3825        let mut fields: Vec<Field> = input_schema
3826            .fields()
3827            .iter()
3828            .map(|f| f.as_ref().clone())
3829            .collect();
3830
3831        // Add target VID column (only if not already in input)
3832        let target_vid_name = format!("{}._vid", target_variable);
3833        if input_schema.column_with_name(&target_vid_name).is_none() {
3834            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3835        }
3836
3837        // Add target ._labels column (only if not already in input)
3838        let target_labels_name = format!("{}._labels", target_variable);
3839        if input_schema.column_with_name(&target_labels_name).is_none() {
3840            fields.push(Field::new(target_labels_name, labels_data_type(), true));
3841        }
3842
3843        // Add hop count
3844        fields.push(Field::new("_hop_count", DataType::UInt64, false));
3845
3846        // Add step variable column (list of edge structs) if bound
3847        // This is the relationship variable like `r` in `[r*1..3]`
3848        if let Some(step_var) = step_variable {
3849            fields.push(build_edge_list_field(step_var));
3850        }
3851
3852        // Add path struct if bound (only if not already in input from prior BindFixedPath)
3853        if let Some(path_var) = path_variable
3854            && input_schema.column_with_name(path_var).is_none()
3855        {
3856            fields.push(build_path_struct_field(path_var));
3857        }
3858
3859        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
3860        // Skip properties that are already in the input schema
3861        for prop in target_properties {
3862            let prop_name = format!("{}.{}", target_variable, prop);
3863            if input_schema.column_with_name(&prop_name).is_none() {
3864                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3865            }
3866        }
3867
3868        Arc::new(Schema::new(fields))
3869    }
3870}
3871
3872impl DisplayAs for GraphVariableLengthTraverseMainExec {
3873    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3874        write!(
3875            f,
3876            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3877            self.source_column, self.type_names, self.min_hops, self.max_hops
3878        )
3879    }
3880}
3881
3882impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3883    fn name(&self) -> &str {
3884        "GraphVariableLengthTraverseMainExec"
3885    }
3886
3887    fn as_any(&self) -> &dyn Any {
3888        self
3889    }
3890
3891    fn schema(&self) -> SchemaRef {
3892        self.schema.clone()
3893    }
3894
3895    fn properties(&self) -> &PlanProperties {
3896        &self.properties
3897    }
3898
3899    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3900        vec![&self.input]
3901    }
3902
3903    fn with_new_children(
3904        self: Arc<Self>,
3905        children: Vec<Arc<dyn ExecutionPlan>>,
3906    ) -> DFResult<Arc<dyn ExecutionPlan>> {
3907        if children.len() != 1 {
3908            return Err(datafusion::error::DataFusionError::Plan(
3909                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3910            ));
3911        }
3912
3913        Ok(Arc::new(Self::new(
3914            children[0].clone(),
3915            self.source_column.clone(),
3916            self.type_names.clone(),
3917            self.direction,
3918            self.min_hops,
3919            self.max_hops,
3920            self.target_variable.clone(),
3921            self.step_variable.clone(),
3922            self.path_variable.clone(),
3923            self.target_properties.clone(),
3924            self.graph_ctx.clone(),
3925            self.is_optional,
3926            self.bound_target_column.clone(),
3927            self.edge_lance_filter.clone(),
3928            self.edge_property_conditions.clone(),
3929            self.used_edge_columns.clone(),
3930            self.path_mode.clone(),
3931            self.output_mode.clone(),
3932        )))
3933    }
3934
3935    fn execute(
3936        &self,
3937        partition: usize,
3938        context: Arc<TaskContext>,
3939    ) -> DFResult<SendableRecordBatchStream> {
3940        let input_stream = self.input.execute(partition, context)?;
3941        let metrics = BaselineMetrics::new(&self.metrics, partition);
3942
3943        // Build adjacency map from main edges table (async)
3944        let graph_ctx = self.graph_ctx.clone();
3945        let type_names = self.type_names.clone();
3946        let direction = self.direction;
3947        let load_fut =
3948            async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3949
3950        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3951            input: input_stream,
3952            source_column: self.source_column.clone(),
3953            type_names: self.type_names.clone(),
3954            direction: self.direction,
3955            min_hops: self.min_hops,
3956            max_hops: self.max_hops,
3957            target_variable: self.target_variable.clone(),
3958            step_variable: self.step_variable.clone(),
3959            path_variable: self.path_variable.clone(),
3960            target_properties: self.target_properties.clone(),
3961            graph_ctx: self.graph_ctx.clone(),
3962            is_optional: self.is_optional,
3963            bound_target_column: self.bound_target_column.clone(),
3964            edge_lance_filter: self.edge_lance_filter.clone(),
3965            edge_property_conditions: self.edge_property_conditions.clone(),
3966            used_edge_columns: self.used_edge_columns.clone(),
3967            path_mode: self.path_mode.clone(),
3968            output_mode: self.output_mode.clone(),
3969            schema: self.schema.clone(),
3970            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3971            metrics,
3972        }))
3973    }
3974
3975    fn metrics(&self) -> Option<MetricsSet> {
3976        Some(self.metrics.clone_inner())
3977    }
3978}
3979
3980/// State machine for VLP schemaless stream.
3981enum VarLengthMainStreamState {
3982    /// Loading adjacency map from main edges table.
3983    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3984    /// Processing input batches with loaded adjacency.
3985    Processing(EdgeAdjacencyMap),
3986    /// Materializing properties for a batch.
3987    Materializing {
3988        adjacency: EdgeAdjacencyMap,
3989        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3990    },
3991    /// Stream is done.
3992    Done,
3993}
3994
3995/// Stream for variable-length traversal on schemaless edges.
3996#[expect(dead_code, reason = "VLP fields used in Phase 3")]
3997struct GraphVariableLengthTraverseMainStream {
3998    input: SendableRecordBatchStream,
3999    source_column: String,
4000    type_names: Vec<String>,
4001    direction: Direction,
4002    min_hops: usize,
4003    max_hops: usize,
4004    target_variable: String,
4005    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4006    step_variable: Option<String>,
4007    path_variable: Option<String>,
4008    target_properties: Vec<String>,
4009    graph_ctx: Arc<GraphExecutionContext>,
4010    is_optional: bool,
4011    bound_target_column: Option<String>,
4012    edge_lance_filter: Option<String>,
4013    /// Edge property conditions to check during BFS.
4014    edge_property_conditions: Vec<(String, UniValue)>,
4015    used_edge_columns: Vec<String>,
4016    path_mode: super::nfa::PathMode,
4017    output_mode: super::nfa::VlpOutputMode,
4018    schema: SchemaRef,
4019    state: VarLengthMainStreamState,
4020    metrics: BaselineMetrics,
4021}
4022
4023/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4024type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4025
4026impl GraphVariableLengthTraverseMainStream {
4027    /// Perform BFS from a source vertex using the adjacency map.
4028    ///
4029    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4030    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4031    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4032    fn bfs(
4033        &self,
4034        source: Vid,
4035        adjacency: &EdgeAdjacencyMap,
4036        used_eids: &FxHashSet<u64>,
4037    ) -> Vec<MainBfsResult> {
4038        let mut results = Vec::new();
4039        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4040
4041        queue.push_back((source, 0, vec![source], vec![]));
4042
4043        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4044            // Emit result if within hop range (including zero-length patterns)
4045            if depth >= self.min_hops && depth <= self.max_hops {
4046                results.push((current, depth, node_path.clone(), edge_path.clone()));
4047            }
4048
4049            // Stop if at max depth
4050            if depth >= self.max_hops {
4051                continue;
4052            }
4053
4054            // Get neighbors from adjacency map
4055            if let Some(neighbors) = adjacency.get(&current) {
4056                let is_undirected = matches!(self.direction, Direction::Both);
4057                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4058
4059                for (neighbor, eid, _edge_type, props) in neighbors {
4060                    // Deduplicate edges for undirected patterns
4061                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4062                        continue;
4063                    }
4064
4065                    // Enforce relationship uniqueness per-path (Cypher semantics).
4066                    if edge_path.contains(eid) {
4067                        continue;
4068                    }
4069
4070                    // Enforce cross-pattern relationship uniqueness: skip edges
4071                    // already bound by earlier pattern elements in the same MATCH.
4072                    if used_eids.contains(&eid.as_u64()) {
4073                        continue;
4074                    }
4075
4076                    // Check edge property conditions (e.g., {year: 1988}).
4077                    if !self.edge_property_conditions.is_empty() {
4078                        let passes =
4079                            self.edge_property_conditions
4080                                .iter()
4081                                .all(|(name, expected)| {
4082                                    props.get(name).is_some_and(|actual| actual == expected)
4083                                });
4084                        if !passes {
4085                            continue;
4086                        }
4087                    }
4088
4089                    let mut new_node_path = node_path.clone();
4090                    new_node_path.push(*neighbor);
4091                    let mut new_edge_path = edge_path.clone();
4092                    new_edge_path.push(*eid);
4093                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4094                }
4095            }
4096        }
4097
4098        results
4099    }
4100
4101    /// Process a batch using the adjacency map.
4102    fn process_batch(
4103        &self,
4104        batch: RecordBatch,
4105        adjacency: &EdgeAdjacencyMap,
4106    ) -> DFResult<RecordBatch> {
4107        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4108            datafusion::error::DataFusionError::Execution(format!(
4109                "Source column '{}' not found in input batch",
4110                self.source_column
4111            ))
4112        })?;
4113
4114        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4115        let source_vids: &UInt64Array = &source_vid_cow;
4116
4117        // Read bound target VIDs if column exists
4118        let bound_target_cow = self
4119            .bound_target_column
4120            .as_ref()
4121            .and_then(|col| batch.column_by_name(col))
4122            .map(|c| column_as_vid_array(c.as_ref()))
4123            .transpose()?;
4124        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4125
4126        // Extract used edge columns for cross-pattern relationship uniqueness
4127        let used_edge_arrays: Vec<&UInt64Array> = self
4128            .used_edge_columns
4129            .iter()
4130            .filter_map(|col| {
4131                batch
4132                    .column_by_name(col)?
4133                    .as_any()
4134                    .downcast_ref::<UInt64Array>()
4135            })
4136            .collect();
4137
4138        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4139        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4140
4141        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4142            let mut emitted_for_row = false;
4143
4144            if let Some(source_u64) = source_opt {
4145                let source = Vid::from(source_u64);
4146
4147                // Collect used edge IDs from previous hops for this row
4148                let used_eids: FxHashSet<u64> = used_edge_arrays
4149                    .iter()
4150                    .filter_map(|arr| {
4151                        if arr.is_null(row_idx) {
4152                            None
4153                        } else {
4154                            Some(arr.value(row_idx))
4155                        }
4156                    })
4157                    .collect();
4158
4159                let bfs_results = self.bfs(source, adjacency, &used_eids);
4160
4161                for (target, hops, node_path, edge_path) in bfs_results {
4162                    // Filter by bound target VID if set (for patterns where target is in scope).
4163                    // NULL bound targets do not match anything.
4164                    if let Some(targets) = expected_targets {
4165                        if targets.is_null(row_idx) {
4166                            continue;
4167                        }
4168                        let expected_vid = targets.value(row_idx);
4169                        if target.as_u64() != expected_vid {
4170                            continue;
4171                        }
4172                    }
4173
4174                    expansions.push((row_idx, target, hops, node_path, edge_path));
4175                    emitted_for_row = true;
4176                }
4177            }
4178
4179            if self.is_optional && !emitted_for_row {
4180                // Preserve source row with NULL optional bindings.
4181                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4182            }
4183        }
4184
4185        if expansions.is_empty() {
4186            if self.is_optional {
4187                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4188                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4189            }
4190            return Ok(RecordBatch::new_empty(self.schema.clone()));
4191        }
4192
4193        let num_rows = expansions.len();
4194        self.metrics.record_output(num_rows);
4195
4196        // Build output columns
4197        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4198
4199        // Expand input columns
4200        for col_idx in 0..batch.num_columns() {
4201            let array = batch.column(col_idx);
4202            let indices: Vec<u64> = expansions
4203                .iter()
4204                .map(|(idx, _, _, _, _)| *idx as u64)
4205                .collect();
4206            let take_indices = UInt64Array::from(indices);
4207            let expanded = arrow::compute::take(array, &take_indices, None)?;
4208            columns.push(expanded);
4209        }
4210
4211        // Add target VID column (only if not already in input)
4212        let target_vid_name = format!("{}._vid", self.target_variable);
4213        if batch.schema().column_with_name(&target_vid_name).is_none() {
4214            let target_vids: Vec<Option<u64>> = expansions
4215                .iter()
4216                .map(|(_, vid, _, node_path, edge_path)| {
4217                    if node_path.is_empty() && edge_path.is_empty() {
4218                        None
4219                    } else {
4220                        Some(vid.as_u64())
4221                    }
4222                })
4223                .collect();
4224            columns.push(Arc::new(UInt64Array::from(target_vids)));
4225        }
4226
4227        // Add target ._labels column (only if not already in input)
4228        let target_labels_name = format!("{}._labels", self.target_variable);
4229        if batch
4230            .schema()
4231            .column_with_name(&target_labels_name)
4232            .is_none()
4233        {
4234            use arrow_array::builder::{ListBuilder, StringBuilder};
4235            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4236            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4237                if node_path.is_empty() && edge_path.is_empty() {
4238                    labels_builder.append(false);
4239                    continue;
4240                }
4241                let mut row_labels: Vec<String> = Vec::new();
4242                let labels =
4243                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4244                for lbl in &labels {
4245                    if !row_labels.contains(lbl) {
4246                        row_labels.push(lbl.clone());
4247                    }
4248                }
4249                let values = labels_builder.values();
4250                for lbl in &row_labels {
4251                    values.append_value(lbl);
4252                }
4253                labels_builder.append(true);
4254            }
4255            columns.push(Arc::new(labels_builder.finish()));
4256        }
4257
4258        // Add hop count column
4259        let hop_counts: Vec<u64> = expansions
4260            .iter()
4261            .map(|(_, _, hops, _, _)| *hops as u64)
4262            .collect();
4263        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4264
4265        // Add step variable column if bound (list of edge structs).
4266        if self.step_variable.is_some() {
4267            let mut edges_builder = new_edge_list_builder();
4268            let query_ctx = self.graph_ctx.query_context();
4269            let type_names_str = self.type_names.join("|");
4270
4271            for (_, _, _, node_path, edge_path) in expansions.iter() {
4272                if node_path.is_empty() && edge_path.is_empty() {
4273                    edges_builder.append_null();
4274                } else if edge_path.is_empty() {
4275                    // Zero-hop match: empty list.
4276                    edges_builder.append(true);
4277                } else {
4278                    for (i, eid) in edge_path.iter().enumerate() {
4279                        append_edge_to_struct(
4280                            edges_builder.values(),
4281                            *eid,
4282                            &type_names_str,
4283                            node_path[i].as_u64(),
4284                            node_path[i + 1].as_u64(),
4285                            &query_ctx,
4286                        );
4287                    }
4288                    edges_builder.append(true);
4289                }
4290            }
4291
4292            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4293        }
4294
4295        // Add path variable column if bound.
4296        // If a path column already exists in input (from a prior BindFixedPath), extend it
4297        // rather than building from scratch.
4298        if let Some(path_var_name) = &self.path_variable {
4299            let existing_path_col_idx = batch
4300                .schema()
4301                .column_with_name(path_var_name)
4302                .map(|(idx, _)| idx);
4303            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4304            let existing_path = existing_path_arc
4305                .as_ref()
4306                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4307
4308            let mut nodes_builder = new_node_list_builder();
4309            let mut rels_builder = new_edge_list_builder();
4310            let query_ctx = self.graph_ctx.query_context();
4311            let type_names_str = self.type_names.join("|");
4312            let mut path_validity = Vec::with_capacity(expansions.len());
4313
4314            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4315                if node_path.is_empty() && edge_path.is_empty() {
4316                    nodes_builder.append(false);
4317                    rels_builder.append(false);
4318                    path_validity.push(false);
4319                    continue;
4320                }
4321
4322                // Prepend existing path prefix if extending
4323                let skip_first_vlp_node = if let Some(existing) = existing_path {
4324                    if !existing.is_null(row_out_idx) {
4325                        prepend_existing_path(
4326                            existing,
4327                            row_out_idx,
4328                            &mut nodes_builder,
4329                            &mut rels_builder,
4330                            &query_ctx,
4331                        );
4332                        true
4333                    } else {
4334                        false
4335                    }
4336                } else {
4337                    false
4338                };
4339
4340                // Append VLP nodes (skip first if extending — it's the junction point)
4341                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4342                for vid in &node_path[start_idx..] {
4343                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4344                }
4345                nodes_builder.append(true);
4346
4347                for (i, eid) in edge_path.iter().enumerate() {
4348                    append_edge_to_struct(
4349                        rels_builder.values(),
4350                        *eid,
4351                        &type_names_str,
4352                        node_path[i].as_u64(),
4353                        node_path[i + 1].as_u64(),
4354                        &query_ctx,
4355                    );
4356                }
4357                rels_builder.append(true);
4358                path_validity.push(true);
4359            }
4360
4361            // Finish the builders to get the arrays
4362            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4363            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4364
4365            // Build the path struct with nodes and relationships fields
4366            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4367            let rels_field = Arc::new(Field::new(
4368                "relationships",
4369                rels_array.data_type().clone(),
4370                true,
4371            ));
4372
4373            // Create the path struct array
4374            let path_struct = arrow_array::StructArray::try_new(
4375                vec![nodes_field, rels_field].into(),
4376                vec![nodes_array, rels_array],
4377                Some(arrow::buffer::NullBuffer::from(path_validity)),
4378            )
4379            .map_err(arrow_err)?;
4380
4381            if let Some(idx) = existing_path_col_idx {
4382                columns[idx] = Arc::new(path_struct);
4383            } else {
4384                columns.push(Arc::new(path_struct));
4385            }
4386        }
4387
4388        // Add target property columns as NULL for now (skip if already in input).
4389        // Property hydration happens via PropertyManager in the query execution pipeline.
4390        for prop_name in &self.target_properties {
4391            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4392            if batch.schema().column_with_name(&full_prop_name).is_none() {
4393                columns.push(arrow_array::new_null_array(
4394                    &DataType::LargeBinary,
4395                    num_rows,
4396                ));
4397            }
4398        }
4399
4400        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4401    }
4402}
4403
4404impl Stream for GraphVariableLengthTraverseMainStream {
4405    type Item = DFResult<RecordBatch>;
4406
4407    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4408        loop {
4409            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4410
4411            match state {
4412                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4413                    Poll::Ready(Ok(adjacency)) => {
4414                        self.state = VarLengthMainStreamState::Processing(adjacency);
4415                        // Continue loop to start processing
4416                    }
4417                    Poll::Ready(Err(e)) => {
4418                        self.state = VarLengthMainStreamState::Done;
4419                        return Poll::Ready(Some(Err(e)));
4420                    }
4421                    Poll::Pending => {
4422                        self.state = VarLengthMainStreamState::Loading(fut);
4423                        return Poll::Pending;
4424                    }
4425                },
4426                VarLengthMainStreamState::Processing(adjacency) => {
4427                    match self.input.poll_next_unpin(cx) {
4428                        Poll::Ready(Some(Ok(batch))) => {
4429                            let base_batch = match self.process_batch(batch, &adjacency) {
4430                                Ok(b) => b,
4431                                Err(e) => {
4432                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4433                                    return Poll::Ready(Some(Err(e)));
4434                                }
4435                            };
4436
4437                            // If no properties need async hydration, return directly
4438                            if self.target_properties.is_empty() {
4439                                self.state = VarLengthMainStreamState::Processing(adjacency);
4440                                return Poll::Ready(Some(Ok(base_batch)));
4441                            }
4442
4443                            // Create async hydration future
4444                            let schema = self.schema.clone();
4445                            let target_variable = self.target_variable.clone();
4446                            let target_properties = self.target_properties.clone();
4447                            let graph_ctx = self.graph_ctx.clone();
4448
4449                            let fut = hydrate_vlp_target_properties(
4450                                base_batch,
4451                                schema,
4452                                target_variable,
4453                                target_properties,
4454                                None, // schemaless — no label name
4455                                graph_ctx,
4456                            );
4457
4458                            self.state = VarLengthMainStreamState::Materializing {
4459                                adjacency,
4460                                fut: Box::pin(fut),
4461                            };
4462                            // Continue loop to poll the future
4463                        }
4464                        Poll::Ready(Some(Err(e))) => {
4465                            self.state = VarLengthMainStreamState::Done;
4466                            return Poll::Ready(Some(Err(e)));
4467                        }
4468                        Poll::Ready(None) => {
4469                            self.state = VarLengthMainStreamState::Done;
4470                            return Poll::Ready(None);
4471                        }
4472                        Poll::Pending => {
4473                            self.state = VarLengthMainStreamState::Processing(adjacency);
4474                            return Poll::Pending;
4475                        }
4476                    }
4477                }
4478                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4479                    match fut.as_mut().poll(cx) {
4480                        Poll::Ready(Ok(batch)) => {
4481                            self.state = VarLengthMainStreamState::Processing(adjacency);
4482                            return Poll::Ready(Some(Ok(batch)));
4483                        }
4484                        Poll::Ready(Err(e)) => {
4485                            self.state = VarLengthMainStreamState::Done;
4486                            return Poll::Ready(Some(Err(e)));
4487                        }
4488                        Poll::Pending => {
4489                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4490                            return Poll::Pending;
4491                        }
4492                    }
4493                }
4494                VarLengthMainStreamState::Done => {
4495                    return Poll::Ready(None);
4496                }
4497            }
4498        }
4499    }
4500}
4501
4502impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4503    fn schema(&self) -> SchemaRef {
4504        self.schema.clone()
4505    }
4506}
4507
4508#[cfg(test)]
4509mod tests {
4510    use super::*;
4511
4512    #[test]
4513    fn test_traverse_schema_without_edge() {
4514        let input_schema = Arc::new(Schema::new(vec![Field::new(
4515            "a._vid",
4516            DataType::UInt64,
4517            false,
4518        )]));
4519
4520        let output_schema =
4521            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4522
4523        // Schema: input + target VID + target _labels + internal edge ID
4524        assert_eq!(output_schema.fields().len(), 4);
4525        assert_eq!(output_schema.field(0).name(), "a._vid");
4526        assert_eq!(output_schema.field(1).name(), "m._vid");
4527        assert_eq!(output_schema.field(2).name(), "m._labels");
4528        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4529    }
4530
4531    #[test]
4532    fn test_traverse_schema_with_edge() {
4533        let input_schema = Arc::new(Schema::new(vec![Field::new(
4534            "a._vid",
4535            DataType::UInt64,
4536            false,
4537        )]));
4538
4539        let output_schema = GraphTraverseExec::build_schema(
4540            input_schema,
4541            "m",
4542            Some("r"),
4543            &[],
4544            &[],
4545            None,
4546            None,
4547            false,
4548        );
4549
4550        // Schema: input + target VID + target _labels + edge EID + edge _type
4551        assert_eq!(output_schema.fields().len(), 5);
4552        assert_eq!(output_schema.field(0).name(), "a._vid");
4553        assert_eq!(output_schema.field(1).name(), "m._vid");
4554        assert_eq!(output_schema.field(2).name(), "m._labels");
4555        assert_eq!(output_schema.field(3).name(), "r._eid");
4556        assert_eq!(output_schema.field(4).name(), "r._type");
4557    }
4558
4559    #[test]
4560    fn test_traverse_schema_with_target_properties() {
4561        let input_schema = Arc::new(Schema::new(vec![Field::new(
4562            "a._vid",
4563            DataType::UInt64,
4564            false,
4565        )]));
4566
4567        let target_props = vec!["name".to_string(), "age".to_string()];
4568        let output_schema = GraphTraverseExec::build_schema(
4569            input_schema,
4570            "m",
4571            Some("r"),
4572            &[],
4573            &target_props,
4574            None,
4575            None,
4576            false,
4577        );
4578
4579        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4580        assert_eq!(output_schema.fields().len(), 7);
4581        assert_eq!(output_schema.field(0).name(), "a._vid");
4582        assert_eq!(output_schema.field(1).name(), "m._vid");
4583        assert_eq!(output_schema.field(2).name(), "m._labels");
4584        assert_eq!(output_schema.field(3).name(), "m.name");
4585        assert_eq!(output_schema.field(4).name(), "m.age");
4586        assert_eq!(output_schema.field(5).name(), "r._eid");
4587        assert_eq!(output_schema.field(6).name(), "r._type");
4588    }
4589
4590    #[test]
4591    fn test_variable_length_schema() {
4592        let input_schema = Arc::new(Schema::new(vec![Field::new(
4593            "a._vid",
4594            DataType::UInt64,
4595            false,
4596        )]));
4597
4598        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4599            input_schema,
4600            "b",
4601            None,
4602            Some("p"),
4603            &[],
4604            None,
4605        );
4606
4607        assert_eq!(output_schema.fields().len(), 5);
4608        assert_eq!(output_schema.field(0).name(), "a._vid");
4609        assert_eq!(output_schema.field(1).name(), "b._vid");
4610        assert_eq!(output_schema.field(2).name(), "b._labels");
4611        assert_eq!(output_schema.field(3).name(), "_hop_count");
4612        assert_eq!(output_schema.field(4).name(), "p");
4613    }
4614
4615    #[test]
4616    fn test_traverse_main_schema_without_edge() {
4617        let input_schema = Arc::new(Schema::new(vec![Field::new(
4618            "a._vid",
4619            DataType::UInt64,
4620            false,
4621        )]));
4622
4623        let output_schema =
4624            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4625
4626        // a._vid, m._vid, m._labels, __eid_to_m
4627        assert_eq!(output_schema.fields().len(), 4);
4628        assert_eq!(output_schema.field(0).name(), "a._vid");
4629        assert_eq!(output_schema.field(1).name(), "m._vid");
4630        assert_eq!(output_schema.field(2).name(), "m._labels");
4631        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4632    }
4633
4634    #[test]
4635    fn test_traverse_main_schema_with_edge() {
4636        let input_schema = Arc::new(Schema::new(vec![Field::new(
4637            "a._vid",
4638            DataType::UInt64,
4639            false,
4640        )]));
4641
4642        let output_schema = GraphTraverseMainExec::build_schema(
4643            &input_schema,
4644            "m",
4645            &Some("r".to_string()),
4646            &[],
4647            &[],
4648            false,
4649        );
4650
4651        // a._vid, m._vid, m._labels, r._eid, r._type
4652        assert_eq!(output_schema.fields().len(), 5);
4653        assert_eq!(output_schema.field(0).name(), "a._vid");
4654        assert_eq!(output_schema.field(1).name(), "m._vid");
4655        assert_eq!(output_schema.field(2).name(), "m._labels");
4656        assert_eq!(output_schema.field(3).name(), "r._eid");
4657        assert_eq!(output_schema.field(4).name(), "r._type");
4658    }
4659
4660    #[test]
4661    fn test_traverse_main_schema_with_edge_properties() {
4662        let input_schema = Arc::new(Schema::new(vec![Field::new(
4663            "a._vid",
4664            DataType::UInt64,
4665            false,
4666        )]));
4667
4668        let edge_props = vec!["weight".to_string(), "since".to_string()];
4669        let output_schema = GraphTraverseMainExec::build_schema(
4670            &input_schema,
4671            "m",
4672            &Some("r".to_string()),
4673            &edge_props,
4674            &[],
4675            false,
4676        );
4677
4678        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4679        assert_eq!(output_schema.fields().len(), 7);
4680        assert_eq!(output_schema.field(0).name(), "a._vid");
4681        assert_eq!(output_schema.field(1).name(), "m._vid");
4682        assert_eq!(output_schema.field(2).name(), "m._labels");
4683        assert_eq!(output_schema.field(3).name(), "r._eid");
4684        assert_eq!(output_schema.field(4).name(), "r._type");
4685        assert_eq!(output_schema.field(5).name(), "r.weight");
4686        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4687        assert_eq!(output_schema.field(6).name(), "r.since");
4688        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4689    }
4690
4691    #[test]
4692    fn test_traverse_main_schema_with_target_properties() {
4693        let input_schema = Arc::new(Schema::new(vec![Field::new(
4694            "a._vid",
4695            DataType::UInt64,
4696            false,
4697        )]));
4698
4699        let target_props = vec!["name".to_string(), "age".to_string()];
4700        let output_schema = GraphTraverseMainExec::build_schema(
4701            &input_schema,
4702            "m",
4703            &Some("r".to_string()),
4704            &[],
4705            &target_props,
4706            false,
4707        );
4708
4709        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
4710        assert_eq!(output_schema.fields().len(), 7);
4711        assert_eq!(output_schema.field(0).name(), "a._vid");
4712        assert_eq!(output_schema.field(1).name(), "m._vid");
4713        assert_eq!(output_schema.field(2).name(), "m._labels");
4714        assert_eq!(output_schema.field(3).name(), "r._eid");
4715        assert_eq!(output_schema.field(4).name(), "r._type");
4716        assert_eq!(output_schema.field(5).name(), "m.name");
4717        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4718        assert_eq!(output_schema.field(6).name(), "m.age");
4719        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4720    }
4721}