Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62/// BFS result: (target_vid, hop_count, node_path, edge_path)
63type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
66type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68/// Prepend nodes and edges from an existing path struct column into builders.
69///
70/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
71/// Reads the nodes and relationships from the existing path at `row_idx` and appends
72/// them to the provided builders. The caller should then skip the first VLP node
73/// (which is the junction point already present in the existing path).
74fn prepend_existing_path(
75    existing_path: &arrow_array::StructArray,
76    row_idx: usize,
77    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79    query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81    // Read existing nodes
82    let nodes_list = existing_path
83        .column(0)
84        .as_any()
85        .downcast_ref::<arrow_array::ListArray>()
86        .unwrap();
87    let node_values = nodes_list.value(row_idx);
88    let node_struct = node_values
89        .as_any()
90        .downcast_ref::<arrow_array::StructArray>()
91        .unwrap();
92    let vid_col = node_struct
93        .column(0)
94        .as_any()
95        .downcast_ref::<UInt64Array>()
96        .unwrap();
97    for i in 0..vid_col.len() {
98        append_node_to_struct(
99            nodes_builder.values(),
100            Vid::from(vid_col.value(i)),
101            query_ctx,
102        );
103    }
104
105    // Read existing edges
106    let rels_list = existing_path
107        .column(1)
108        .as_any()
109        .downcast_ref::<arrow_array::ListArray>()
110        .unwrap();
111    let edge_values = rels_list.value(row_idx);
112    let edge_struct = edge_values
113        .as_any()
114        .downcast_ref::<arrow_array::StructArray>()
115        .unwrap();
116    let eid_col = edge_struct
117        .column(0)
118        .as_any()
119        .downcast_ref::<UInt64Array>()
120        .unwrap();
121    let type_col = edge_struct
122        .column(1)
123        .as_any()
124        .downcast_ref::<arrow_array::StringArray>()
125        .unwrap();
126    let src_col = edge_struct
127        .column(2)
128        .as_any()
129        .downcast_ref::<UInt64Array>()
130        .unwrap();
131    let dst_col = edge_struct
132        .column(3)
133        .as_any()
134        .downcast_ref::<UInt64Array>()
135        .unwrap();
136    for i in 0..eid_col.len() {
137        append_edge_to_struct(
138            rels_builder.values(),
139            Eid::from(eid_col.value(i)),
140            type_col.value(i),
141            src_col.value(i),
142            dst_col.value(i),
143            query_ctx,
144        );
145    }
146}
147
148/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
149/// schemaless properties. Unlike vertex properties, schemaless edge properties must
150/// preserve original JSON value types (int, float, etc.) since edge types commonly
151/// lack explicit property definitions.
152fn resolve_edge_property_type(
153    prop: &str,
154    schema_props: Option<
155        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156    >,
157) -> DataType {
158    if prop == "overflow_json" {
159        DataType::LargeBinary
160    } else {
161        schema_props
162            .and_then(|props| props.get(prop))
163            .map(|meta| meta.r#type.to_arrow())
164            .unwrap_or(DataType::LargeBinary)
165    }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
171type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173/// Single-hop graph traversal execution plan.
174///
175/// Expands each input row by traversing edges to find neighbors.
176/// For each (source, edge, target) triple, produces one output row
177/// containing the input columns plus target vertex and edge columns.
178///
179/// # Example
180///
181/// ```ignore
182/// // Input: batch with _vid column
183/// // Traverse KNOWS edges outgoing
184/// let traverse = GraphTraverseExec::new(
185///     input_plan,
186///     "_vid",
187///     vec![knows_type_id],
188///     Direction::Outgoing,
189///     "m",           // target variable
190///     Some("r"),     // edge variable
191///     None,          // no target label filter
192///     graph_ctx,
193/// );
194///
195/// // Output: input columns + m._vid + r._eid
196/// ```
197pub struct GraphTraverseExec {
198    /// Input execution plan.
199    input: Arc<dyn ExecutionPlan>,
200
201    /// Column name containing source VIDs.
202    source_column: String,
203
204    /// Edge type IDs to traverse.
205    edge_type_ids: Vec<u32>,
206
207    /// Traversal direction.
208    direction: Direction,
209
210    /// Variable name for target vertex columns.
211    target_variable: String,
212
213    /// Variable name for edge columns (if edge is bound).
214    edge_variable: Option<String>,
215
216    /// Edge properties to materialize (for pushdown hydration).
217    edge_properties: Vec<String>,
218
219    /// Target vertex properties to materialize.
220    target_properties: Vec<String>,
221
222    /// Target label name for property type resolution.
223    target_label_name: Option<String>,
224
225    /// Optional target label filter.
226    target_label_id: Option<u16>,
227
228    /// Graph execution context.
229    graph_ctx: Arc<GraphExecutionContext>,
230
231    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
232    optional: bool,
233
234    /// Variables introduced by the OPTIONAL MATCH pattern.
235    /// Used to determine which columns should be null-extended on failure.
236    optional_pattern_vars: HashSet<String>,
237
238    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
239    /// When set, only traversals that reach this VID are included.
240    bound_target_column: Option<String>,
241
242    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
243    /// Edges matching any of these IDs are excluded from traversal results.
244    used_edge_columns: Vec<String>,
245
246    /// Output schema.
247    schema: SchemaRef,
248
249    /// Cached plan properties.
250    properties: PlanProperties,
251
252    /// Execution metrics.
253    metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("GraphTraverseExec")
259            .field("source_column", &self.source_column)
260            .field("edge_type_ids", &self.edge_type_ids)
261            .field("direction", &self.direction)
262            .field("target_variable", &self.target_variable)
263            .field("edge_variable", &self.edge_variable)
264            .finish()
265    }
266}
267
268impl GraphTraverseExec {
269    /// Create a new single-hop traversal plan.
270    ///
271    /// # Arguments
272    ///
273    /// * `input` - Input plan providing source vertices
274    /// * `source_column` - Column name containing source VIDs
275    /// * `edge_type_ids` - Edge types to traverse
276    /// * `direction` - Traversal direction
277    /// * `target_variable` - Variable name for target vertices
278    /// * `edge_variable` - Optional variable name for edges
279    /// * `edge_properties` - Edge properties to materialize
280    /// * `target_label_id` - Optional target label filter
281    /// * `graph_ctx` - Graph execution context
282    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
283    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
284    #[expect(clippy::too_many_arguments)]
285    pub fn new(
286        input: Arc<dyn ExecutionPlan>,
287        source_column: impl Into<String>,
288        edge_type_ids: Vec<u32>,
289        direction: Direction,
290        target_variable: impl Into<String>,
291        edge_variable: Option<String>,
292        edge_properties: Vec<String>,
293        target_properties: Vec<String>,
294        target_label_name: Option<String>,
295        target_label_id: Option<u16>,
296        graph_ctx: Arc<GraphExecutionContext>,
297        optional: bool,
298        optional_pattern_vars: HashSet<String>,
299        bound_target_column: Option<String>,
300        used_edge_columns: Vec<String>,
301    ) -> Self {
302        let source_column = source_column.into();
303        let target_variable = target_variable.into();
304
305        // Resolve target property Arrow types from the schema
306        let uni_schema = graph_ctx.storage().schema_manager().schema();
307        let label_props = target_label_name
308            .as_deref()
309            .and_then(|ln| uni_schema.properties.get(ln));
310        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311        let edge_props = if merged_edge_props.is_empty() {
312            None
313        } else {
314            Some(&merged_edge_props)
315        };
316
317        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
318        let schema = Self::build_schema(
319            input.schema(),
320            &target_variable,
321            edge_variable.as_deref(),
322            &edge_properties,
323            &target_properties,
324            label_props,
325            edge_props,
326            optional,
327        );
328
329        let properties = compute_plan_properties(schema.clone());
330
331        Self {
332            input,
333            source_column,
334            edge_type_ids,
335            direction,
336            target_variable,
337            edge_variable,
338            edge_properties,
339            target_properties,
340            target_label_name,
341            target_label_id,
342            graph_ctx,
343            optional,
344            optional_pattern_vars,
345            bound_target_column,
346            used_edge_columns,
347            schema,
348            properties,
349            metrics: ExecutionPlanMetricsSet::new(),
350        }
351    }
352
353    /// Build output schema.
354    #[expect(
355        clippy::too_many_arguments,
356        reason = "Schema construction needs all field metadata"
357    )]
358    fn build_schema(
359        input_schema: SchemaRef,
360        target_variable: &str,
361        edge_variable: Option<&str>,
362        edge_properties: &[String],
363        target_properties: &[String],
364        label_props: Option<
365            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366        >,
367        edge_props: Option<
368            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369        >,
370        optional: bool,
371    ) -> SchemaRef {
372        let mut fields: Vec<Field> = input_schema
373            .fields()
374            .iter()
375            .map(|f| f.as_ref().clone())
376            .collect();
377
378        // Add target VID column (nullable when optional — unmatched rows get NULL)
379        let target_vid_name = format!("{}._vid", target_variable);
380        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
383        fields.push(Field::new(
384            format!("{}._labels", target_variable),
385            labels_data_type(),
386            true,
387        ));
388
389        // Add target vertex property columns
390        for prop_name in target_properties {
391            let col_name = format!("{}.{}", target_variable, prop_name);
392            let arrow_type = resolve_property_type(prop_name, label_props);
393            fields.push(Field::new(&col_name, arrow_type, true));
394        }
395
396        // Add edge ID column if edge variable is bound
397        if let Some(edge_var) = edge_variable {
398            let edge_id_name = format!("{}._eid", edge_var);
399            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401            // Add edge _type column for type(r) support
402            fields.push(Field::new(
403                format!("{}._type", edge_var),
404                DataType::Utf8,
405                true,
406            ));
407
408            // Add edge property columns with types resolved from schema
409            for prop_name in edge_properties {
410                let prop_col_name = format!("{}.{}", edge_var, prop_name);
411                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412                fields.push(Field::new(&prop_col_name, arrow_type, true));
413            }
414        } else {
415            // Add internal edge ID column for relationship uniqueness tracking
416            // even when edge variable is not explicitly bound.
417            let internal_eid_name = format!("__eid_to_{}", target_variable);
418            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419        }
420
421        Arc::new(Schema::new(fields))
422    }
423}
424
425impl DisplayAs for GraphTraverseExec {
426    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427        write!(
428            f,
429            "GraphTraverseExec: {} --[{:?}]--> {}",
430            self.source_column, self.edge_type_ids, self.target_variable
431        )?;
432        if let Some(ref edge_var) = self.edge_variable {
433            write!(f, " as {}", edge_var)?;
434        }
435        Ok(())
436    }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440    fn name(&self) -> &str {
441        "GraphTraverseExec"
442    }
443
444    fn as_any(&self) -> &dyn Any {
445        self
446    }
447
448    fn schema(&self) -> SchemaRef {
449        self.schema.clone()
450    }
451
452    fn properties(&self) -> &PlanProperties {
453        &self.properties
454    }
455
456    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457        vec![&self.input]
458    }
459
460    fn with_new_children(
461        self: Arc<Self>,
462        children: Vec<Arc<dyn ExecutionPlan>>,
463    ) -> DFResult<Arc<dyn ExecutionPlan>> {
464        if children.len() != 1 {
465            return Err(datafusion::error::DataFusionError::Plan(
466                "GraphTraverseExec requires exactly one child".to_string(),
467            ));
468        }
469
470        Ok(Arc::new(Self::new(
471            children[0].clone(),
472            self.source_column.clone(),
473            self.edge_type_ids.clone(),
474            self.direction,
475            self.target_variable.clone(),
476            self.edge_variable.clone(),
477            self.edge_properties.clone(),
478            self.target_properties.clone(),
479            self.target_label_name.clone(),
480            self.target_label_id,
481            self.graph_ctx.clone(),
482            self.optional,
483            self.optional_pattern_vars.clone(),
484            self.bound_target_column.clone(),
485            self.used_edge_columns.clone(),
486        )))
487    }
488
489    fn execute(
490        &self,
491        partition: usize,
492        context: Arc<TaskContext>,
493    ) -> DFResult<SendableRecordBatchStream> {
494        let input_stream = self.input.execute(partition, context)?;
495
496        let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498        let warm_fut = self
499            .graph_ctx
500            .warming_future(self.edge_type_ids.clone(), self.direction);
501
502        Ok(Box::pin(GraphTraverseStream {
503            input: input_stream,
504            source_column: self.source_column.clone(),
505            edge_type_ids: self.edge_type_ids.clone(),
506            direction: self.direction,
507            target_variable: self.target_variable.clone(),
508            edge_variable: self.edge_variable.clone(),
509            edge_properties: self.edge_properties.clone(),
510            target_properties: self.target_properties.clone(),
511            target_label_name: self.target_label_name.clone(),
512            graph_ctx: self.graph_ctx.clone(),
513            optional: self.optional,
514            optional_pattern_vars: self.optional_pattern_vars.clone(),
515            bound_target_column: self.bound_target_column.clone(),
516            used_edge_columns: self.used_edge_columns.clone(),
517            schema: self.schema.clone(),
518            state: TraverseStreamState::Warming(warm_fut),
519            metrics,
520        }))
521    }
522
523    fn metrics(&self) -> Option<MetricsSet> {
524        Some(self.metrics.clone_inner())
525    }
526}
527
528/// State machine for traverse stream execution.
529enum TraverseStreamState {
530    /// Warming adjacency CSRs before first batch.
531    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532    /// Polling the input stream for batches.
533    Reading,
534    /// Materializing target vertex properties asynchronously.
535    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536    /// Stream is done.
537    Done,
538}
539
540/// Stream that performs single-hop traversal with async property materialization.
541struct GraphTraverseStream {
542    /// Input stream.
543    input: SendableRecordBatchStream,
544
545    /// Column name containing source VIDs.
546    source_column: String,
547
548    /// Edge type IDs to traverse.
549    edge_type_ids: Vec<u32>,
550
551    /// Traversal direction.
552    direction: Direction,
553
554    /// Variable name for target vertex (retained for diagnostics).
555    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556    target_variable: String,
557
558    /// Variable name for edge (if bound).
559    edge_variable: Option<String>,
560
561    /// Edge properties to materialize.
562    edge_properties: Vec<String>,
563
564    /// Target vertex properties to materialize.
565    target_properties: Vec<String>,
566
567    /// Target label name for property resolution and filtering.
568    target_label_name: Option<String>,
569
570    /// Graph execution context.
571    graph_ctx: Arc<GraphExecutionContext>,
572
573    /// Whether this is an OPTIONAL MATCH.
574    optional: bool,
575
576    /// Variables introduced by the OPTIONAL MATCH pattern.
577    optional_pattern_vars: HashSet<String>,
578
579    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
580    bound_target_column: Option<String>,
581
582    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
583    used_edge_columns: Vec<String>,
584
585    /// Output schema.
586    schema: SchemaRef,
587
588    /// Stream state.
589    state: TraverseStreamState,
590
591    /// Metrics.
592    metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596    /// Expand neighbors synchronously and return expansions.
597    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
598    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600            datafusion::error::DataFusionError::Execution(format!(
601                "Source column '{}' not found",
602                self.source_column
603            ))
604        })?;
605
606        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607        let source_vids: &UInt64Array = &source_vid_cow;
608
609        // If bound_target_column is set, get the expected target VIDs for each row.
610        // This is used for cycle patterns like n-->k<--n where the target must match.
611        let bound_target_cow = self
612            .bound_target_column
613            .as_ref()
614            .and_then(|col| batch.column_by_name(col))
615            .map(|c| column_as_vid_array(c.as_ref()))
616            .transpose()?;
617        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
620        let used_edge_arrays: Vec<&UInt64Array> = self
621            .used_edge_columns
622            .iter()
623            .filter_map(|col| {
624                batch
625                    .column_by_name(col)
626                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627            })
628            .collect();
629
630        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631        let is_undirected = matches!(self.direction, Direction::Both);
632
633        for (row_idx, source_vid) in source_vids.iter().enumerate() {
634            let Some(src) = source_vid else {
635                continue;
636            };
637
638            // Get expected target VID if this is a bound target pattern.
639            // Distinguish between:
640            // - no bound target column (no filtering),
641            // - bound target present but NULL for this row (must produce no expansion),
642            // - bound target present with VID.
643            let expected_target = bound_target_vids.map(|arr| {
644                if arr.is_null(row_idx) {
645                    None
646                } else {
647                    Some(arr.value(row_idx))
648                }
649            });
650
651            // Collect used edge IDs for this row from all previous hops
652            let used_eids: HashSet<u64> = used_edge_arrays
653                .iter()
654                .filter_map(|arr| {
655                    if arr.is_null(row_idx) {
656                        None
657                    } else {
658                        Some(arr.value(row_idx))
659                    }
660                })
661                .collect();
662
663            let vid = Vid::from(src);
664            // For Direction::Both, deduplicate edges by eid within each source.
665            // This prevents the same edge being counted twice (once outgoing, once incoming).
666            let mut seen_edges: HashSet<u64> = HashSet::new();
667
668            for &edge_type in &self.edge_type_ids {
669                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671                for (target_vid, eid) in neighbors {
672                    let eid_u64 = eid.as_u64();
673
674                    // Skip edges already used in previous hops (relationship uniqueness)
675                    if used_eids.contains(&eid_u64) {
676                        continue;
677                    }
678
679                    // Deduplicate edges for undirected patterns
680                    if is_undirected && !seen_edges.insert(eid_u64) {
681                        continue;
682                    }
683
684                    // Filter by bound target VID if set (for cycle patterns).
685                    // NULL bound targets do not match anything.
686                    if let Some(expected_opt) = expected_target {
687                        let Some(expected) = expected_opt else {
688                            continue;
689                        };
690                        if target_vid.as_u64() != expected {
691                            continue;
692                        }
693                    }
694
695                    // Filter by target label using L0 visibility.
696                    // VIDs no longer embed label information, so we must look up labels.
697                    if let Some(ref label_name) = self.target_label_name {
698                        let query_ctx = self.graph_ctx.query_context();
699                        if let Some(vertex_labels) =
700                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701                        {
702                            // Vertex is in L0 — require actual label match
703                            if !vertex_labels.contains(label_name) {
704                                continue;
705                            }
706                        }
707                        // else: vertex not in L0 → trust storage-level filtering
708                    }
709
710                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711                }
712            }
713        }
714
715        Ok(expanded_rows)
716    }
717}
718
719/// Build target vertex labels column from L0 buffers.
720fn build_target_labels_column(
721    target_vids: &[Vid],
722    target_label_name: &Option<String>,
723    graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725    use arrow_array::builder::{ListBuilder, StringBuilder};
726    let mut labels_builder = ListBuilder::new(StringBuilder::new());
727    let query_ctx = graph_ctx.query_context();
728    for vid in target_vids {
729        let row_labels: Vec<String> =
730            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731                Some(labels) => labels,
732                None => {
733                    // Vertex not in L0 — trust schema label (storage already filtered)
734                    if let Some(label_name) = target_label_name {
735                        vec![label_name.clone()]
736                    } else {
737                        vec![]
738                    }
739                }
740            };
741        let values = labels_builder.values();
742        for lbl in &row_labels {
743            values.append_value(lbl);
744        }
745        labels_builder.append(true);
746    }
747    Arc::new(labels_builder.finish())
748}
749
750/// Build target vertex property columns from storage and L0.
751async fn build_target_property_columns(
752    target_vids: &[Vid],
753    target_properties: &[String],
754    target_label_name: &Option<String>,
755    graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757    let mut columns = Vec::new();
758
759    if let Some(label_name) = target_label_name {
760        let property_manager = graph_ctx.property_manager();
761        let query_ctx = graph_ctx.query_context();
762
763        let props_map = property_manager
764            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765            .await
766            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768        let uni_schema = graph_ctx.storage().schema_manager().schema();
769        let label_props = uni_schema.properties.get(label_name.as_str());
770
771        for prop_name in target_properties {
772            let data_type = resolve_property_type(prop_name, label_props);
773            let column =
774                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775            columns.push(column);
776        }
777    } else {
778        // No label name — use label-agnostic property lookup.
779        let non_internal_props: Vec<&str> = target_properties
780            .iter()
781            .filter(|p| *p != "_all_props")
782            .map(|s| s.as_str())
783            .collect();
784        let property_manager = graph_ctx.property_manager();
785        let query_ctx = graph_ctx.query_context();
786
787        let props_map = if !non_internal_props.is_empty() {
788            property_manager
789                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790                .await
791                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792        } else {
793            std::collections::HashMap::new()
794        };
795
796        for prop_name in target_properties {
797            if prop_name == "_all_props" {
798                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799            } else {
800                let column = build_property_column_static(
801                    target_vids,
802                    &props_map,
803                    prop_name,
804                    &arrow::datatypes::DataType::LargeBinary,
805                )?;
806                columns.push(column);
807            }
808        }
809    }
810
811    Ok(columns)
812}
813
814/// Build a CypherValue blob column from all vertex properties (L0 + storage).
815fn build_all_props_column(
816    target_vids: &[Vid],
817    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818    graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820    use crate::query::df_graph::scan::encode_cypher_value;
821    use arrow_array::builder::LargeBinaryBuilder;
822
823    let mut builder = LargeBinaryBuilder::new();
824    let l0_ctx = graph_ctx.l0_context();
825    for vid in target_vids {
826        let mut merged_props = serde_json::Map::new();
827        if let Some(vid_props) = props_map.get(vid) {
828            for (k, v) in vid_props.iter() {
829                let json_val: serde_json::Value = v.clone().into();
830                merged_props.insert(k.to_string(), json_val);
831            }
832        }
833        for l0 in l0_ctx.iter_l0_buffers() {
834            let guard = l0.read();
835            if let Some(l0_props) = guard.vertex_properties.get(vid) {
836                for (k, v) in l0_props.iter() {
837                    let json_val: serde_json::Value = v.clone().into();
838                    merged_props.insert(k.to_string(), json_val);
839                }
840            }
841        }
842        if merged_props.is_empty() {
843            builder.append_null();
844        } else {
845            let json = serde_json::Value::Object(merged_props);
846            match encode_cypher_value(&json) {
847                Ok(bytes) => builder.append_value(bytes),
848                Err(_) => builder.append_null(),
849            }
850        }
851    }
852    Arc::new(builder.finish())
853}
854
855/// Build edge ID, type, and property columns for bound edge variables.
856async fn build_edge_columns(
857    expansions: &[(usize, Vid, u64, u32)],
858    edge_properties: &[String],
859    edge_type_ids: &[u32],
860    graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862    let mut columns = Vec::new();
863
864    let eids: Vec<Eid> = expansions
865        .iter()
866        .map(|(_, _, eid, _)| Eid::from(*eid))
867        .collect();
868    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871    // Edge _type column
872    {
873        let uni_schema = graph_ctx.storage().schema_manager().schema();
874        let mut type_builder = arrow_array::builder::StringBuilder::new();
875        for (_, _, _, edge_type_id) in expansions {
876            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877                type_builder.append_value(&name);
878            } else {
879                type_builder.append_null();
880            }
881        }
882        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883    }
884
885    if !edge_properties.is_empty() {
886        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887        let property_manager = graph_ctx.property_manager();
888        let query_ctx = graph_ctx.query_context();
889
890        let props_map = property_manager
891            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892            .await
893            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895        let uni_schema = graph_ctx.storage().schema_manager().schema();
896        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897        let edge_type_props = if merged_edge_props.is_empty() {
898            None
899        } else {
900            Some(&merged_edge_props)
901        };
902
903        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905        for prop_name in edge_properties {
906            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907            let column =
908                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909            columns.push(column);
910        }
911    }
912
913    Ok(columns)
914}
915
916/// Build the output batch with target vertex properties.
917///
918/// This is a standalone async function so it can be boxed into a `Send` future
919/// without borrowing from `GraphTraverseStream`.
920#[expect(
921    clippy::too_many_arguments,
922    reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925    input: RecordBatch,
926    expansions: Vec<(usize, Vid, u64, u32)>,
927    schema: SchemaRef,
928    edge_variable: Option<String>,
929    edge_properties: Vec<String>,
930    edge_type_ids: Vec<u32>,
931    target_properties: Vec<String>,
932    target_label_name: Option<String>,
933    graph_ctx: Arc<GraphExecutionContext>,
934    optional: bool,
935    optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937    if expansions.is_empty() {
938        if !optional {
939            return Ok(RecordBatch::new_empty(schema));
940        }
941        let unmatched_reps = collect_unmatched_optional_group_rows(
942            &input,
943            &HashSet::new(),
944            &schema,
945            &optional_pattern_vars,
946        )?;
947        if unmatched_reps.is_empty() {
948            return Ok(RecordBatch::new_empty(schema));
949        }
950        return build_optional_null_batch_for_rows_with_optional_vars(
951            &input,
952            &unmatched_reps,
953            &schema,
954            &optional_pattern_vars,
955        );
956    }
957
958    // Expand input columns via index array
959    let indices: Vec<u64> = expansions
960        .iter()
961        .map(|(idx, _, _, _)| *idx as u64)
962        .collect();
963    let indices_array = UInt64Array::from(indices);
964    let mut columns: Vec<ArrayRef> = input
965        .columns()
966        .iter()
967        .map(|col| take(col.as_ref(), &indices_array, None))
968        .collect::<Result<_, _>>()?;
969
970    // Target VID column
971    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975    // Target labels column
976    columns.push(build_target_labels_column(
977        &target_vids,
978        &target_label_name,
979        &graph_ctx,
980    ));
981
982    // Target vertex property columns
983    if !target_properties.is_empty() {
984        let prop_cols = build_target_property_columns(
985            &target_vids,
986            &target_properties,
987            &target_label_name,
988            &graph_ctx,
989        )
990        .await?;
991        columns.extend(prop_cols);
992    }
993
994    // Edge columns (bound or internal tracking)
995    if edge_variable.is_some() {
996        let edge_cols =
997            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998        columns.extend(edge_cols);
999    } else {
1000        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002    }
1003
1004    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1005
1006    // Append null rows for unmatched optional sources
1007    if optional {
1008        let matched_indices: HashSet<usize> =
1009            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1010        let unmatched = collect_unmatched_optional_group_rows(
1011            &input,
1012            &matched_indices,
1013            &schema,
1014            &optional_pattern_vars,
1015        )?;
1016
1017        if !unmatched.is_empty() {
1018            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1019                &input,
1020                &unmatched,
1021                &schema,
1022                &optional_pattern_vars,
1023            )?;
1024            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1025                .map_err(arrow_err)?;
1026            return Ok(combined);
1027        }
1028    }
1029
1030    Ok(expanded_batch)
1031}
1032
1033/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1034/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1035fn build_optional_null_batch_for_rows(
1036    input: &RecordBatch,
1037    unmatched_indices: &[usize],
1038    schema: &SchemaRef,
1039) -> DFResult<RecordBatch> {
1040    let num_rows = unmatched_indices.len();
1041    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1042    let indices_array = UInt64Array::from(indices);
1043
1044    // Take the unmatched input rows
1045    let mut columns: Vec<ArrayRef> = Vec::new();
1046    for col in input.columns() {
1047        let taken = take(col.as_ref(), &indices_array, None)?;
1048        columns.push(taken);
1049    }
1050    // Fill remaining columns with nulls
1051    for field in schema.fields().iter().skip(input.num_columns()) {
1052        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1053    }
1054    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1055}
1056
1057fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1058    optional_vars.contains(col_name)
1059        || optional_vars.iter().any(|var| {
1060            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1061                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1062        })
1063}
1064
1065fn collect_unmatched_optional_group_rows(
1066    input: &RecordBatch,
1067    matched_indices: &HashSet<usize>,
1068    schema: &SchemaRef,
1069    optional_vars: &HashSet<String>,
1070) -> DFResult<Vec<usize>> {
1071    if input.num_rows() == 0 {
1072        return Ok(Vec::new());
1073    }
1074
1075    if optional_vars.is_empty() {
1076        return Ok((0..input.num_rows())
1077            .filter(|idx| !matched_indices.contains(idx))
1078            .collect());
1079    }
1080
1081    let source_vid_indices: Vec<usize> = schema
1082        .fields()
1083        .iter()
1084        .enumerate()
1085        .filter_map(|(idx, field)| {
1086            if idx >= input.num_columns() {
1087                return None;
1088            }
1089            let name = field.name();
1090            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1091                Some(idx)
1092            } else {
1093                None
1094            }
1095        })
1096        .collect();
1097
1098    // Group rows by non-optional VID bindings and preserve group order.
1099    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1100    let mut group_order: Vec<Vec<u8>> = Vec::new();
1101
1102    for row_idx in 0..input.num_rows() {
1103        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1104        let entry = groups.entry(key.clone());
1105        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1106            group_order.push(key.clone());
1107        }
1108        let matched = matched_indices.contains(&row_idx);
1109        entry
1110            .and_modify(|(_, any_matched)| *any_matched |= matched)
1111            .or_insert((row_idx, matched));
1112    }
1113
1114    Ok(group_order
1115        .into_iter()
1116        .filter_map(|key| {
1117            groups
1118                .get(&key)
1119                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1120        })
1121        .collect())
1122}
1123
1124fn compute_optional_group_key(
1125    batch: &RecordBatch,
1126    row_idx: usize,
1127    source_vid_indices: &[usize],
1128) -> DFResult<Vec<u8>> {
1129    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1130    for &col_idx in source_vid_indices {
1131        let col = batch.column(col_idx);
1132        let vid_cow = column_as_vid_array(col.as_ref())?;
1133        let arr: &UInt64Array = &vid_cow;
1134        if arr.is_null(row_idx) {
1135            key.extend_from_slice(&u64::MAX.to_le_bytes());
1136        } else {
1137            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1138        }
1139    }
1140    Ok(key)
1141}
1142
1143fn build_optional_null_batch_for_rows_with_optional_vars(
1144    input: &RecordBatch,
1145    unmatched_indices: &[usize],
1146    schema: &SchemaRef,
1147    optional_vars: &HashSet<String>,
1148) -> DFResult<RecordBatch> {
1149    if optional_vars.is_empty() {
1150        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1151    }
1152
1153    let num_rows = unmatched_indices.len();
1154    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1155    let indices_array = UInt64Array::from(indices);
1156
1157    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1158    for (col_idx, field) in schema.fields().iter().enumerate() {
1159        if col_idx < input.num_columns() {
1160            if is_optional_column_for_vars(field.name(), optional_vars) {
1161                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1162            } else {
1163                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1164                columns.push(taken);
1165            }
1166        } else {
1167            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1168        }
1169    }
1170
1171    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1172}
1173
1174impl Stream for GraphTraverseStream {
1175    type Item = DFResult<RecordBatch>;
1176
1177    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1178        loop {
1179            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1180
1181            match state {
1182                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1183                    Poll::Ready(Ok(())) => {
1184                        self.state = TraverseStreamState::Reading;
1185                        // Continue loop to start reading
1186                    }
1187                    Poll::Ready(Err(e)) => {
1188                        self.state = TraverseStreamState::Done;
1189                        return Poll::Ready(Some(Err(e)));
1190                    }
1191                    Poll::Pending => {
1192                        self.state = TraverseStreamState::Warming(fut);
1193                        return Poll::Pending;
1194                    }
1195                },
1196                TraverseStreamState::Reading => {
1197                    // Check timeout
1198                    if let Err(e) = self.graph_ctx.check_timeout() {
1199                        return Poll::Ready(Some(Err(
1200                            datafusion::error::DataFusionError::Execution(e.to_string()),
1201                        )));
1202                    }
1203
1204                    match self.input.poll_next_unpin(cx) {
1205                        Poll::Ready(Some(Ok(batch))) => {
1206                            // Expand neighbors synchronously
1207                            let expansions = match self.expand_neighbors(&batch) {
1208                                Ok(exp) => exp,
1209                                Err(e) => {
1210                                    self.state = TraverseStreamState::Reading;
1211                                    return Poll::Ready(Some(Err(e)));
1212                                }
1213                            };
1214
1215                            // Build output synchronously only when no properties need async hydration
1216                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1217                            {
1218                                let result = build_traverse_output_batch_sync(
1219                                    &batch,
1220                                    &expansions,
1221                                    &self.schema,
1222                                    self.edge_variable.as_ref(),
1223                                    &self.graph_ctx,
1224                                    self.optional,
1225                                    &self.optional_pattern_vars,
1226                                );
1227                                self.state = TraverseStreamState::Reading;
1228                                if let Ok(ref r) = result {
1229                                    self.metrics.record_output(r.num_rows());
1230                                }
1231                                return Poll::Ready(Some(result));
1232                            }
1233
1234                            // Properties needed — create async future for hydration
1235                            let schema = self.schema.clone();
1236                            let edge_variable = self.edge_variable.clone();
1237                            let edge_properties = self.edge_properties.clone();
1238                            let edge_type_ids = self.edge_type_ids.clone();
1239                            let target_properties = self.target_properties.clone();
1240                            let target_label_name = self.target_label_name.clone();
1241                            let graph_ctx = self.graph_ctx.clone();
1242
1243                            let optional = self.optional;
1244                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1245
1246                            let fut = build_traverse_output_batch(
1247                                batch,
1248                                expansions,
1249                                schema,
1250                                edge_variable,
1251                                edge_properties,
1252                                edge_type_ids,
1253                                target_properties,
1254                                target_label_name,
1255                                graph_ctx,
1256                                optional,
1257                                optional_pattern_vars,
1258                            );
1259
1260                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1261                            // Continue loop to poll the future
1262                        }
1263                        Poll::Ready(Some(Err(e))) => {
1264                            self.state = TraverseStreamState::Done;
1265                            return Poll::Ready(Some(Err(e)));
1266                        }
1267                        Poll::Ready(None) => {
1268                            self.state = TraverseStreamState::Done;
1269                            return Poll::Ready(None);
1270                        }
1271                        Poll::Pending => {
1272                            self.state = TraverseStreamState::Reading;
1273                            return Poll::Pending;
1274                        }
1275                    }
1276                }
1277                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1278                    Poll::Ready(Ok(batch)) => {
1279                        self.state = TraverseStreamState::Reading;
1280                        self.metrics.record_output(batch.num_rows());
1281                        return Poll::Ready(Some(Ok(batch)));
1282                    }
1283                    Poll::Ready(Err(e)) => {
1284                        self.state = TraverseStreamState::Done;
1285                        return Poll::Ready(Some(Err(e)));
1286                    }
1287                    Poll::Pending => {
1288                        self.state = TraverseStreamState::Materializing(fut);
1289                        return Poll::Pending;
1290                    }
1291                },
1292                TraverseStreamState::Done => {
1293                    return Poll::Ready(None);
1294                }
1295            }
1296        }
1297    }
1298}
1299
1300/// Build output batch synchronously when no properties need async hydration.
1301///
1302/// Only called when both `target_properties` and `edge_properties` are empty,
1303/// so no property columns need to be materialized.
1304fn build_traverse_output_batch_sync(
1305    input: &RecordBatch,
1306    expansions: &[(usize, Vid, u64, u32)],
1307    schema: &SchemaRef,
1308    edge_variable: Option<&String>,
1309    graph_ctx: &GraphExecutionContext,
1310    optional: bool,
1311    optional_pattern_vars: &HashSet<String>,
1312) -> DFResult<RecordBatch> {
1313    if expansions.is_empty() {
1314        if !optional {
1315            return Ok(RecordBatch::new_empty(schema.clone()));
1316        }
1317        let unmatched_reps = collect_unmatched_optional_group_rows(
1318            input,
1319            &HashSet::new(),
1320            schema,
1321            optional_pattern_vars,
1322        )?;
1323        if unmatched_reps.is_empty() {
1324            return Ok(RecordBatch::new_empty(schema.clone()));
1325        }
1326        return build_optional_null_batch_for_rows_with_optional_vars(
1327            input,
1328            &unmatched_reps,
1329            schema,
1330            optional_pattern_vars,
1331        );
1332    }
1333
1334    let indices: Vec<u64> = expansions
1335        .iter()
1336        .map(|(idx, _, _, _)| *idx as u64)
1337        .collect();
1338    let indices_array = UInt64Array::from(indices);
1339
1340    let mut columns: Vec<ArrayRef> = Vec::new();
1341    for col in input.columns() {
1342        let expanded = take(col.as_ref(), &indices_array, None)?;
1343        columns.push(expanded);
1344    }
1345
1346    // Add target VID column
1347    let target_vids: Vec<u64> = expansions
1348        .iter()
1349        .map(|(_, vid, _, _)| vid.as_u64())
1350        .collect();
1351    columns.push(Arc::new(UInt64Array::from(target_vids)));
1352
1353    // Add target ._labels column (from L0 buffers)
1354    {
1355        use arrow_array::builder::{ListBuilder, StringBuilder};
1356        let l0_ctx = graph_ctx.l0_context();
1357        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1358        for (_, vid, _, _) in expansions {
1359            let mut row_labels: Vec<String> = Vec::new();
1360            for l0 in l0_ctx.iter_l0_buffers() {
1361                let guard = l0.read();
1362                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1363                    for lbl in l0_labels {
1364                        if !row_labels.contains(lbl) {
1365                            row_labels.push(lbl.clone());
1366                        }
1367                    }
1368                }
1369            }
1370            let values = labels_builder.values();
1371            for lbl in &row_labels {
1372                values.append_value(lbl);
1373            }
1374            labels_builder.append(true);
1375        }
1376        columns.push(Arc::new(labels_builder.finish()));
1377    }
1378
1379    // Add edge columns if edge is bound (no properties in sync path)
1380    if edge_variable.is_some() {
1381        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1382        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1383
1384        // Add edge _type column
1385        let uni_schema = graph_ctx.storage().schema_manager().schema();
1386        let mut type_builder = arrow_array::builder::StringBuilder::new();
1387        for (_, _, _, edge_type_id) in expansions {
1388            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1389                type_builder.append_value(&name);
1390            } else {
1391                type_builder.append_null();
1392            }
1393        }
1394        columns.push(Arc::new(type_builder.finish()));
1395    } else {
1396        // Internal EID column for relationship uniqueness tracking (matches schema)
1397        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1398        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1399    }
1400
1401    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1402
1403    if optional {
1404        let matched_indices: HashSet<usize> =
1405            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1406        let unmatched = collect_unmatched_optional_group_rows(
1407            input,
1408            &matched_indices,
1409            schema,
1410            optional_pattern_vars,
1411        )?;
1412
1413        if !unmatched.is_empty() {
1414            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1415                input,
1416                &unmatched,
1417                schema,
1418                optional_pattern_vars,
1419            )?;
1420            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1421                .map_err(|e| {
1422                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1423            })?;
1424            return Ok(combined);
1425        }
1426    }
1427
1428    Ok(expanded_batch)
1429}
1430
1431impl RecordBatchStream for GraphTraverseStream {
1432    fn schema(&self) -> SchemaRef {
1433        self.schema.clone()
1434    }
1435}
1436
1437/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1438type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1439
1440/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1441///
1442/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1443/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1444///
1445/// # Example
1446///
1447/// ```ignore
1448/// // Traverse schemaless "CUSTOM" edges
1449/// let traverse = GraphTraverseMainExec::new(
1450///     input_plan,
1451///     "_vid",
1452///     "CUSTOM",
1453///     Direction::Outgoing,
1454///     "m",           // target variable
1455///     Some("r"),     // edge variable
1456///     vec![],        // edge properties
1457///     vec![],        // target properties
1458///     graph_ctx,
1459///     false,         // not optional
1460/// );
1461/// ```
1462pub struct GraphTraverseMainExec {
1463    /// Input execution plan.
1464    input: Arc<dyn ExecutionPlan>,
1465
1466    /// Column name containing source VIDs.
1467    source_column: String,
1468
1469    /// Edge type names (not IDs, since schemaless types may not have IDs).
1470    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1471    type_names: Vec<String>,
1472
1473    /// Traversal direction.
1474    direction: Direction,
1475
1476    /// Variable name for target vertex columns.
1477    target_variable: String,
1478
1479    /// Variable name for edge columns (if edge is bound).
1480    edge_variable: Option<String>,
1481
1482    /// Edge properties to materialize.
1483    edge_properties: Vec<String>,
1484
1485    /// Target vertex properties to materialize.
1486    target_properties: Vec<String>,
1487
1488    /// Graph execution context.
1489    graph_ctx: Arc<GraphExecutionContext>,
1490
1491    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1492    optional: bool,
1493
1494    /// Variables introduced by the OPTIONAL MATCH pattern.
1495    optional_pattern_vars: HashSet<String>,
1496
1497    /// Column name of an already-bound target VID (for patterns where target is in scope).
1498    /// When set, only traversals reaching this exact VID are included.
1499    bound_target_column: Option<String>,
1500
1501    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1502    /// Edges matching any of these IDs are excluded from traversal results.
1503    used_edge_columns: Vec<String>,
1504
1505    /// Output schema.
1506    schema: SchemaRef,
1507
1508    /// Cached plan properties.
1509    properties: PlanProperties,
1510
1511    /// Execution metrics.
1512    metrics: ExecutionPlanMetricsSet,
1513}
1514
1515impl fmt::Debug for GraphTraverseMainExec {
1516    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1517        f.debug_struct("GraphTraverseMainExec")
1518            .field("type_names", &self.type_names)
1519            .field("direction", &self.direction)
1520            .field("target_variable", &self.target_variable)
1521            .field("edge_variable", &self.edge_variable)
1522            .finish()
1523    }
1524}
1525
1526impl GraphTraverseMainExec {
1527    /// Create a new schemaless traversal executor.
1528    #[expect(clippy::too_many_arguments)]
1529    pub fn new(
1530        input: Arc<dyn ExecutionPlan>,
1531        source_column: impl Into<String>,
1532        type_names: Vec<String>,
1533        direction: Direction,
1534        target_variable: impl Into<String>,
1535        edge_variable: Option<String>,
1536        edge_properties: Vec<String>,
1537        target_properties: Vec<String>,
1538        graph_ctx: Arc<GraphExecutionContext>,
1539        optional: bool,
1540        optional_pattern_vars: HashSet<String>,
1541        bound_target_column: Option<String>,
1542        used_edge_columns: Vec<String>,
1543    ) -> Self {
1544        let source_column = source_column.into();
1545        let target_variable = target_variable.into();
1546
1547        // Build output schema
1548        let schema = Self::build_schema(
1549            &input.schema(),
1550            &target_variable,
1551            &edge_variable,
1552            &edge_properties,
1553            &target_properties,
1554            optional,
1555        );
1556
1557        let properties = compute_plan_properties(schema.clone());
1558
1559        Self {
1560            input,
1561            source_column,
1562            type_names,
1563            direction,
1564            target_variable,
1565            edge_variable,
1566            edge_properties,
1567            target_properties,
1568            graph_ctx,
1569            optional,
1570            optional_pattern_vars,
1571            bound_target_column,
1572            used_edge_columns,
1573            schema,
1574            properties,
1575            metrics: ExecutionPlanMetricsSet::new(),
1576        }
1577    }
1578
1579    /// Build output schema for traversal.
1580    fn build_schema(
1581        input_schema: &SchemaRef,
1582        target_variable: &str,
1583        edge_variable: &Option<String>,
1584        edge_properties: &[String],
1585        target_properties: &[String],
1586        optional: bool,
1587    ) -> SchemaRef {
1588        let mut fields: Vec<Field> = input_schema
1589            .fields()
1590            .iter()
1591            .map(|f| f.as_ref().clone())
1592            .collect();
1593
1594        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1595        let target_vid_name = format!("{}._vid", target_variable);
1596        if input_schema.column_with_name(&target_vid_name).is_none() {
1597            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1598        }
1599
1600        // Add target ._labels column (only if not already in input)
1601        let target_labels_name = format!("{}._labels", target_variable);
1602        if input_schema.column_with_name(&target_labels_name).is_none() {
1603            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1604        }
1605
1606        // Add edge columns if edge variable is bound
1607        if let Some(edge_var) = edge_variable {
1608            fields.push(Field::new(
1609                format!("{}._eid", edge_var),
1610                DataType::UInt64,
1611                optional,
1612            ));
1613
1614            // Add edge ._type column for type(r) support
1615            fields.push(Field::new(
1616                format!("{}._type", edge_var),
1617                DataType::Utf8,
1618                true,
1619            ));
1620
1621            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1622            // Schemaless edges store properties as CypherValue blobs so that
1623            // Int, Float, etc. round-trip correctly through Arrow.
1624            for prop in edge_properties {
1625                let col_name = format!("{}.{}", edge_var, prop);
1626                let mut metadata = std::collections::HashMap::new();
1627                metadata.insert("cv_encoded".to_string(), "true".to_string());
1628                fields.push(
1629                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1630                );
1631            }
1632        } else {
1633            // Add internal edge ID for anonymous relationships so BindPath can
1634            // reconstruct named paths (p = (a)-[:T]->(b)).
1635            fields.push(Field::new(
1636                format!("__eid_to_{}", target_variable),
1637                DataType::UInt64,
1638                optional,
1639            ));
1640        }
1641
1642        // Target properties: all as LargeBinary (deferred to PropertyManager)
1643        for prop in target_properties {
1644            fields.push(Field::new(
1645                format!("{}.{}", target_variable, prop),
1646                DataType::LargeBinary,
1647                true,
1648            ));
1649        }
1650
1651        Arc::new(Schema::new(fields))
1652    }
1653}
1654
1655impl DisplayAs for GraphTraverseMainExec {
1656    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657        write!(
1658            f,
1659            "GraphTraverseMainExec: types={:?}, direction={:?}",
1660            self.type_names, self.direction
1661        )
1662    }
1663}
1664
1665impl ExecutionPlan for GraphTraverseMainExec {
1666    fn name(&self) -> &str {
1667        "GraphTraverseMainExec"
1668    }
1669
1670    fn as_any(&self) -> &dyn Any {
1671        self
1672    }
1673
1674    fn schema(&self) -> SchemaRef {
1675        self.schema.clone()
1676    }
1677
1678    fn properties(&self) -> &PlanProperties {
1679        &self.properties
1680    }
1681
1682    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1683        vec![&self.input]
1684    }
1685
1686    fn with_new_children(
1687        self: Arc<Self>,
1688        children: Vec<Arc<dyn ExecutionPlan>>,
1689    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1690        if children.len() != 1 {
1691            return Err(datafusion::error::DataFusionError::Plan(
1692                "GraphTraverseMainExec expects exactly one child".to_string(),
1693            ));
1694        }
1695
1696        Ok(Arc::new(Self {
1697            input: children[0].clone(),
1698            source_column: self.source_column.clone(),
1699            type_names: self.type_names.clone(),
1700            direction: self.direction,
1701            target_variable: self.target_variable.clone(),
1702            edge_variable: self.edge_variable.clone(),
1703            edge_properties: self.edge_properties.clone(),
1704            target_properties: self.target_properties.clone(),
1705            graph_ctx: self.graph_ctx.clone(),
1706            optional: self.optional,
1707            optional_pattern_vars: self.optional_pattern_vars.clone(),
1708            bound_target_column: self.bound_target_column.clone(),
1709            used_edge_columns: self.used_edge_columns.clone(),
1710            schema: self.schema.clone(),
1711            properties: self.properties.clone(),
1712            metrics: self.metrics.clone(),
1713        }))
1714    }
1715
1716    fn execute(
1717        &self,
1718        partition: usize,
1719        context: Arc<TaskContext>,
1720    ) -> DFResult<SendableRecordBatchStream> {
1721        let input_stream = self.input.execute(partition, context)?;
1722        let metrics = BaselineMetrics::new(&self.metrics, partition);
1723
1724        Ok(Box::pin(GraphTraverseMainStream::new(
1725            input_stream,
1726            self.source_column.clone(),
1727            self.type_names.clone(),
1728            self.direction,
1729            self.target_variable.clone(),
1730            self.edge_variable.clone(),
1731            self.edge_properties.clone(),
1732            self.target_properties.clone(),
1733            self.graph_ctx.clone(),
1734            self.optional,
1735            self.optional_pattern_vars.clone(),
1736            self.bound_target_column.clone(),
1737            self.used_edge_columns.clone(),
1738            self.schema.clone(),
1739            metrics,
1740        )))
1741    }
1742
1743    fn metrics(&self) -> Option<MetricsSet> {
1744        Some(self.metrics.clone_inner())
1745    }
1746}
1747
1748/// State machine for GraphTraverseMainStream.
1749enum GraphTraverseMainState {
1750    /// Loading adjacency map from main edges table.
1751    LoadingEdges {
1752        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1753        input_stream: SendableRecordBatchStream,
1754    },
1755    /// Processing input stream with loaded adjacency.
1756    Processing {
1757        adjacency: EdgeAdjacencyMap,
1758        input_stream: SendableRecordBatchStream,
1759    },
1760    /// Stream is done.
1761    Done,
1762}
1763
1764/// Stream that executes schemaless edge traversal.
1765struct GraphTraverseMainStream {
1766    /// Source column name.
1767    source_column: String,
1768
1769    /// Target variable name.
1770    target_variable: String,
1771
1772    /// Edge variable name.
1773    edge_variable: Option<String>,
1774
1775    /// Edge properties to materialize.
1776    edge_properties: Vec<String>,
1777
1778    /// Target properties to materialize.
1779    target_properties: Vec<String>,
1780
1781    /// Graph execution context.
1782    graph_ctx: Arc<GraphExecutionContext>,
1783
1784    /// Whether this is optional (preserve unmatched rows).
1785    optional: bool,
1786
1787    /// Variables introduced by OPTIONAL pattern.
1788    optional_pattern_vars: HashSet<String>,
1789
1790    /// Column name of an already-bound target VID (for filtering).
1791    bound_target_column: Option<String>,
1792
1793    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1794    used_edge_columns: Vec<String>,
1795
1796    /// Output schema.
1797    schema: SchemaRef,
1798
1799    /// Stream state.
1800    state: GraphTraverseMainState,
1801
1802    /// Metrics.
1803    metrics: BaselineMetrics,
1804}
1805
1806impl GraphTraverseMainStream {
1807    /// Create a new traverse main stream.
1808    #[expect(clippy::too_many_arguments)]
1809    fn new(
1810        input_stream: SendableRecordBatchStream,
1811        source_column: String,
1812        type_names: Vec<String>,
1813        direction: Direction,
1814        target_variable: String,
1815        edge_variable: Option<String>,
1816        edge_properties: Vec<String>,
1817        target_properties: Vec<String>,
1818        graph_ctx: Arc<GraphExecutionContext>,
1819        optional: bool,
1820        optional_pattern_vars: HashSet<String>,
1821        bound_target_column: Option<String>,
1822        used_edge_columns: Vec<String>,
1823        schema: SchemaRef,
1824        metrics: BaselineMetrics,
1825    ) -> Self {
1826        // Start by loading the adjacency map from the main edges table
1827        let loading_ctx = graph_ctx.clone();
1828        let loading_types = type_names.clone();
1829        let fut =
1830            async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1831
1832        Self {
1833            source_column,
1834            target_variable,
1835            edge_variable,
1836            edge_properties,
1837            target_properties,
1838            graph_ctx,
1839            optional,
1840            optional_pattern_vars,
1841            bound_target_column,
1842            used_edge_columns,
1843            schema,
1844            state: GraphTraverseMainState::LoadingEdges {
1845                future: Box::pin(fut),
1846                input_stream,
1847            },
1848            metrics,
1849        }
1850    }
1851
1852    /// Expand input batch using adjacency map (synchronous version).
1853    fn expand_batch(
1854        &self,
1855        input: &RecordBatch,
1856        adjacency: &EdgeAdjacencyMap,
1857    ) -> DFResult<RecordBatch> {
1858        // Extract source VIDs from source column
1859        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1860            datafusion::error::DataFusionError::Execution(format!(
1861                "Source column {} not found",
1862                self.source_column
1863            ))
1864        })?;
1865
1866        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1867        let source_vids: &UInt64Array = &source_vid_cow;
1868
1869        // Read bound target VIDs if column exists
1870        let bound_target_cow = self
1871            .bound_target_column
1872            .as_ref()
1873            .and_then(|col| input.column_by_name(col))
1874            .map(|c| column_as_vid_array(c.as_ref()))
1875            .transpose()?;
1876        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1877
1878        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1879        let used_edge_arrays: Vec<&UInt64Array> = self
1880            .used_edge_columns
1881            .iter()
1882            .filter_map(|col| {
1883                input
1884                    .column_by_name(col)
1885                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1886            })
1887            .collect();
1888
1889        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props)
1890        type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1891        let mut expansions: Vec<Expansion> = Vec::new();
1892
1893        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1894            if let Some(src_u64) = src_u64 {
1895                let src_vid = Vid::from(src_u64);
1896
1897                // Collect used edge IDs for this row from all previous hops
1898                let used_eids: HashSet<u64> = used_edge_arrays
1899                    .iter()
1900                    .filter_map(|arr| {
1901                        if arr.is_null(row_idx) {
1902                            None
1903                        } else {
1904                            Some(arr.value(row_idx))
1905                        }
1906                    })
1907                    .collect();
1908
1909                if let Some(neighbors) = adjacency.get(&src_vid) {
1910                    for (target_vid, eid, edge_type, props) in neighbors {
1911                        // Skip edges already used in previous hops (relationship uniqueness)
1912                        if used_eids.contains(&eid.as_u64()) {
1913                            continue;
1914                        }
1915
1916                        // Filter by bound target VID if set (for patterns where target is in scope).
1917                        // Only include traversals where the target matches the expected VID.
1918                        if let Some(targets) = expected_targets {
1919                            if targets.is_null(row_idx) {
1920                                continue;
1921                            }
1922                            let expected_vid = targets.value(row_idx);
1923                            if target_vid.as_u64() != expected_vid {
1924                                continue;
1925                            }
1926                        }
1927
1928                        expansions.push((
1929                            row_idx,
1930                            *target_vid,
1931                            *eid,
1932                            edge_type.clone(),
1933                            props.clone(),
1934                        ));
1935                    }
1936                }
1937            }
1938        }
1939
1940        // Handle OPTIONAL: preserve unmatched rows
1941        if expansions.is_empty() && self.optional {
1942            // No matches - return input with NULL columns appended
1943            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1944            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1945        }
1946
1947        if expansions.is_empty() {
1948            // No matches, not optional - return empty batch
1949            return Ok(RecordBatch::new_empty(self.schema.clone()));
1950        }
1951
1952        // Track matched rows for OPTIONAL handling
1953        let matched_rows: HashSet<usize> = if self.optional {
1954            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1955        } else {
1956            HashSet::new()
1957        };
1958
1959        // Expand input columns using Arrow take()
1960        let mut columns: Vec<ArrayRef> = Vec::new();
1961        let indices: Vec<u64> = expansions
1962            .iter()
1963            .map(|(idx, _, _, _, _)| *idx as u64)
1964            .collect();
1965        let indices_array = UInt64Array::from(indices);
1966
1967        for col in input.columns() {
1968            let expanded = take(col.as_ref(), &indices_array, None)?;
1969            columns.push(expanded);
1970        }
1971
1972        // Add target ._vid column (only if not already in input)
1973        let target_vid_name = format!("{}._vid", self.target_variable);
1974        let target_vids: Vec<u64> = expansions
1975            .iter()
1976            .map(|(_, vid, _, _, _)| vid.as_u64())
1977            .collect();
1978        if input.schema().column_with_name(&target_vid_name).is_none() {
1979            columns.push(Arc::new(UInt64Array::from(target_vids)));
1980        }
1981
1982        // Add target ._labels column (only if not already in input)
1983        let target_labels_name = format!("{}._labels", self.target_variable);
1984        if input
1985            .schema()
1986            .column_with_name(&target_labels_name)
1987            .is_none()
1988        {
1989            use arrow_array::builder::{ListBuilder, StringBuilder};
1990            let l0_ctx = self.graph_ctx.l0_context();
1991            let mut labels_builder = ListBuilder::new(StringBuilder::new());
1992            for (_, target_vid, _, _, _) in &expansions {
1993                let mut row_labels: Vec<String> = Vec::new();
1994                for l0 in l0_ctx.iter_l0_buffers() {
1995                    let guard = l0.read();
1996                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
1997                        for lbl in l0_labels {
1998                            if !row_labels.contains(lbl) {
1999                                row_labels.push(lbl.clone());
2000                            }
2001                        }
2002                    }
2003                }
2004                let values = labels_builder.values();
2005                for lbl in &row_labels {
2006                    values.append_value(lbl);
2007                }
2008                labels_builder.append(true);
2009            }
2010            columns.push(Arc::new(labels_builder.finish()));
2011        }
2012
2013        // Add edge columns if edge variable is bound.
2014        // For anonymous relationships, emit internal edge IDs for BindPath.
2015        if self.edge_variable.is_some() {
2016            // Add edge ._eid column
2017            let eids: Vec<u64> = expansions
2018                .iter()
2019                .map(|(_, _, eid, _, _)| eid.as_u64())
2020                .collect();
2021            columns.push(Arc::new(UInt64Array::from(eids)));
2022
2023            // Add edge ._type column
2024            {
2025                let mut type_builder = arrow_array::builder::StringBuilder::new();
2026                for (_, _, _, edge_type, _) in &expansions {
2027                    type_builder.append_value(edge_type);
2028                }
2029                columns.push(Arc::new(type_builder.finish()));
2030            }
2031
2032            // Add edge property columns as cv_encoded LargeBinary to preserve types
2033            for prop_name in &self.edge_properties {
2034                use crate::query::df_graph::scan::encode_cypher_value;
2035                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2036                if prop_name == "_all_props" {
2037                    // Serialize all edge properties to CypherValue blob
2038                    for (_, _, _, _, props) in &expansions {
2039                        if props.is_empty() {
2040                            builder.append_null();
2041                        } else {
2042                            let mut json_map = serde_json::Map::new();
2043                            for (k, v) in props.iter() {
2044                                let json_val: serde_json::Value = v.clone().into();
2045                                json_map.insert(k.clone(), json_val);
2046                            }
2047                            let json = serde_json::Value::Object(json_map);
2048                            match encode_cypher_value(&json) {
2049                                Ok(bytes) => builder.append_value(bytes),
2050                                Err(_) => builder.append_null(),
2051                            }
2052                        }
2053                    }
2054                } else {
2055                    // Named property as cv_encoded CypherValue
2056                    for (_, _, _, _, props) in &expansions {
2057                        match props.get(prop_name) {
2058                            Some(uni_common::Value::Null) | None => builder.append_null(),
2059                            Some(val) => {
2060                                let json_val: serde_json::Value = val.clone().into();
2061                                match encode_cypher_value(&json_val) {
2062                                    Ok(bytes) => builder.append_value(bytes),
2063                                    Err(_) => builder.append_null(),
2064                                }
2065                            }
2066                        }
2067                    }
2068                }
2069                columns.push(Arc::new(builder.finish()));
2070            }
2071        } else {
2072            let eids: Vec<u64> = expansions
2073                .iter()
2074                .map(|(_, _, eid, _, _)| eid.as_u64())
2075                .collect();
2076            columns.push(Arc::new(UInt64Array::from(eids)));
2077        }
2078
2079        // Add target property columns (hydrate from L0 buffers)
2080        {
2081            use crate::query::df_graph::scan::encode_cypher_value;
2082            let l0_ctx = self.graph_ctx.l0_context();
2083
2084            for prop_name in &self.target_properties {
2085                if prop_name == "_all_props" {
2086                    // Build full CypherValue blob from all L0 vertex properties
2087                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2088                    for (_, target_vid, _, _, _) in &expansions {
2089                        let mut merged_props = serde_json::Map::new();
2090                        for l0 in l0_ctx.iter_l0_buffers() {
2091                            let guard = l0.read();
2092                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2093                                for (k, v) in props.iter() {
2094                                    let json_val: serde_json::Value = v.clone().into();
2095                                    merged_props.insert(k.to_string(), json_val);
2096                                }
2097                            }
2098                        }
2099                        if merged_props.is_empty() {
2100                            builder.append_null();
2101                        } else {
2102                            let json = serde_json::Value::Object(merged_props);
2103                            match encode_cypher_value(&json) {
2104                                Ok(bytes) => builder.append_value(bytes),
2105                                Err(_) => builder.append_null(),
2106                            }
2107                        }
2108                    }
2109                    columns.push(Arc::new(builder.finish()));
2110                } else {
2111                    // Extract individual property from L0 and encode as CypherValue
2112                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2113                    for (_, target_vid, _, _, _) in &expansions {
2114                        let mut found = false;
2115                        for l0 in l0_ctx.iter_l0_buffers() {
2116                            let guard = l0.read();
2117                            if let Some(props) = guard.vertex_properties.get(target_vid)
2118                                && let Some(val) = props.get(prop_name.as_str())
2119                                && !val.is_null()
2120                            {
2121                                let json_val: serde_json::Value = val.clone().into();
2122                                if let Ok(bytes) = encode_cypher_value(&json_val) {
2123                                    builder.append_value(bytes);
2124                                    found = true;
2125                                    break;
2126                                }
2127                            }
2128                        }
2129                        if !found {
2130                            builder.append_null();
2131                        }
2132                    }
2133                    columns.push(Arc::new(builder.finish()));
2134                }
2135            }
2136        }
2137
2138        let matched_batch =
2139            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2140
2141        // Handle OPTIONAL: append unmatched rows with NULLs
2142        if self.optional {
2143            let unmatched = collect_unmatched_optional_group_rows(
2144                input,
2145                &matched_rows,
2146                &self.schema,
2147                &self.optional_pattern_vars,
2148            )?;
2149
2150            if unmatched.is_empty() {
2151                return Ok(matched_batch);
2152            }
2153
2154            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2155                input,
2156                &unmatched,
2157                &self.schema,
2158                &self.optional_pattern_vars,
2159            )?;
2160
2161            // Concatenate matched and unmatched batches
2162            use arrow::compute::concat_batches;
2163            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2164        } else {
2165            Ok(matched_batch)
2166        }
2167    }
2168}
2169
2170/// Build adjacency map from main edges table for given type names and direction.
2171///
2172/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2173/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2174/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2175async fn build_edge_adjacency_map(
2176    graph_ctx: &GraphExecutionContext,
2177    type_names: &[String],
2178    direction: Direction,
2179) -> DFResult<EdgeAdjacencyMap> {
2180    let storage = graph_ctx.storage();
2181    let l0_ctx = graph_ctx.l0_context();
2182
2183    // Step 1: Query main edges table for all type names
2184    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2185    let edges_with_type = storage
2186        .find_edges_by_type_names(&type_refs)
2187        .await
2188        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2189
2190    // Preserve edge type name in the adjacency map for type(r) support
2191    let mut edges: Vec<(
2192        uni_common::Eid,
2193        uni_common::Vid,
2194        uni_common::Vid,
2195        String,
2196        uni_common::Properties,
2197    )> = edges_with_type.into_iter().collect();
2198
2199    // Step 2: Overlay L0 buffers for all type names
2200    for l0 in l0_ctx.iter_l0_buffers() {
2201        let l0_guard = l0.read();
2202
2203        for type_name in type_names {
2204            let l0_eids = l0_guard.eids_for_type(type_name);
2205
2206            // For each L0 edge, extract its information
2207            for &eid in &l0_eids {
2208                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2209                    let src_vid = edge_ref.src_vid;
2210                    let dst_vid = edge_ref.dst_vid;
2211
2212                    // Get properties for this edge from L0
2213                    let props = l0_guard
2214                        .edge_properties
2215                        .get(&eid)
2216                        .cloned()
2217                        .unwrap_or_default();
2218
2219                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2220                }
2221            }
2222        }
2223    }
2224
2225    // Step 3: Deduplicate by EID (L0 takes precedence)
2226    let mut seen_eids = HashSet::new();
2227    let mut unique_edges = Vec::new();
2228    for edge in edges.into_iter().rev() {
2229        if seen_eids.insert(edge.0) {
2230            unique_edges.push(edge);
2231        }
2232    }
2233    unique_edges.reverse();
2234
2235    // Step 4: Filter out edges tombstoned in any L0 buffer
2236    let mut tombstoned_eids = HashSet::new();
2237    for l0 in l0_ctx.iter_l0_buffers() {
2238        let l0_guard = l0.read();
2239        for eid in l0_guard.tombstones.keys() {
2240            tombstoned_eids.insert(*eid);
2241        }
2242    }
2243    if !tombstoned_eids.is_empty() {
2244        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2245    }
2246
2247    // Step 5: Build adjacency map based on direction
2248    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2249
2250    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2251        match direction {
2252            Direction::Outgoing => {
2253                adjacency
2254                    .entry(src_vid)
2255                    .or_default()
2256                    .push((dst_vid, eid, edge_type, props));
2257            }
2258            Direction::Incoming => {
2259                adjacency
2260                    .entry(dst_vid)
2261                    .or_default()
2262                    .push((src_vid, eid, edge_type, props));
2263            }
2264            Direction::Both => {
2265                adjacency.entry(src_vid).or_default().push((
2266                    dst_vid,
2267                    eid,
2268                    edge_type.clone(),
2269                    props.clone(),
2270                ));
2271                adjacency
2272                    .entry(dst_vid)
2273                    .or_default()
2274                    .push((src_vid, eid, edge_type, props));
2275            }
2276        }
2277    }
2278
2279    Ok(adjacency)
2280}
2281
2282impl Stream for GraphTraverseMainStream {
2283    type Item = DFResult<RecordBatch>;
2284
2285    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2286        loop {
2287            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2288
2289            match state {
2290                GraphTraverseMainState::LoadingEdges {
2291                    mut future,
2292                    input_stream,
2293                } => match future.as_mut().poll(cx) {
2294                    Poll::Ready(Ok(adjacency)) => {
2295                        // Move to processing state with loaded adjacency
2296                        self.state = GraphTraverseMainState::Processing {
2297                            adjacency,
2298                            input_stream,
2299                        };
2300                        // Continue loop to start processing
2301                    }
2302                    Poll::Ready(Err(e)) => {
2303                        self.state = GraphTraverseMainState::Done;
2304                        return Poll::Ready(Some(Err(e)));
2305                    }
2306                    Poll::Pending => {
2307                        self.state = GraphTraverseMainState::LoadingEdges {
2308                            future,
2309                            input_stream,
2310                        };
2311                        return Poll::Pending;
2312                    }
2313                },
2314                GraphTraverseMainState::Processing {
2315                    adjacency,
2316                    mut input_stream,
2317                } => {
2318                    // Check timeout
2319                    if let Err(e) = self.graph_ctx.check_timeout() {
2320                        return Poll::Ready(Some(Err(
2321                            datafusion::error::DataFusionError::Execution(e.to_string()),
2322                        )));
2323                    }
2324
2325                    match input_stream.poll_next_unpin(cx) {
2326                        Poll::Ready(Some(Ok(batch))) => {
2327                            // Expand batch using adjacency map
2328                            let result = self.expand_batch(&batch, &adjacency);
2329
2330                            self.state = GraphTraverseMainState::Processing {
2331                                adjacency,
2332                                input_stream,
2333                            };
2334
2335                            if let Ok(ref r) = result {
2336                                self.metrics.record_output(r.num_rows());
2337                            }
2338                            return Poll::Ready(Some(result));
2339                        }
2340                        Poll::Ready(Some(Err(e))) => {
2341                            self.state = GraphTraverseMainState::Done;
2342                            return Poll::Ready(Some(Err(e)));
2343                        }
2344                        Poll::Ready(None) => {
2345                            self.state = GraphTraverseMainState::Done;
2346                            return Poll::Ready(None);
2347                        }
2348                        Poll::Pending => {
2349                            self.state = GraphTraverseMainState::Processing {
2350                                adjacency,
2351                                input_stream,
2352                            };
2353                            return Poll::Pending;
2354                        }
2355                    }
2356                }
2357                GraphTraverseMainState::Done => {
2358                    return Poll::Ready(None);
2359                }
2360            }
2361        }
2362    }
2363}
2364
2365impl RecordBatchStream for GraphTraverseMainStream {
2366    fn schema(&self) -> SchemaRef {
2367        self.schema.clone()
2368    }
2369}
2370
2371/// Variable-length graph traversal execution plan.
2372///
2373/// Performs BFS traversal from source vertices with configurable min/max hops.
2374/// Tracks visited nodes to avoid cycles.
2375///
2376/// # Example
2377///
2378/// ```ignore
2379/// // Find all nodes 1-3 hops away via KNOWS edges
2380/// let traverse = GraphVariableLengthTraverseExec::new(
2381///     input_plan,
2382///     "_vid",
2383///     knows_type_id,
2384///     Direction::Outgoing,
2385///     1,  // min_hops
2386///     3,  // max_hops
2387///     Some("p"), // path variable
2388///     graph_ctx,
2389/// );
2390/// ```
2391pub struct GraphVariableLengthTraverseExec {
2392    /// Input execution plan.
2393    input: Arc<dyn ExecutionPlan>,
2394
2395    /// Column name containing source VIDs.
2396    source_column: String,
2397
2398    /// Edge type IDs to traverse.
2399    edge_type_ids: Vec<u32>,
2400
2401    /// Traversal direction.
2402    direction: Direction,
2403
2404    /// Minimum number of hops.
2405    min_hops: usize,
2406
2407    /// Maximum number of hops.
2408    max_hops: usize,
2409
2410    /// Variable name for target vertex columns.
2411    target_variable: String,
2412
2413    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2414    step_variable: Option<String>,
2415
2416    /// Variable name for path (if path is bound).
2417    path_variable: Option<String>,
2418
2419    /// Target vertex properties to materialize.
2420    target_properties: Vec<String>,
2421
2422    /// Target label name for property type resolution.
2423    target_label_name: Option<String>,
2424
2425    /// Whether this is an optional match (LEFT JOIN semantics).
2426    is_optional: bool,
2427
2428    /// Column name of an already-bound target VID (for patterns where target is in scope).
2429    bound_target_column: Option<String>,
2430
2431    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2432    edge_lance_filter: Option<String>,
2433
2434    /// Simple property equality conditions for per-edge L0 checking during BFS.
2435    /// Each entry is (property_name, expected_value).
2436    edge_property_conditions: Vec<(String, UniValue)>,
2437
2438    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2439    used_edge_columns: Vec<String>,
2440
2441    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2442    path_mode: super::nfa::PathMode,
2443
2444    /// Output mode determining BFS strategy.
2445    output_mode: super::nfa::VlpOutputMode,
2446
2447    /// Compiled NFA for path pattern matching.
2448    nfa: Arc<PathNfa>,
2449
2450    /// Graph execution context.
2451    graph_ctx: Arc<GraphExecutionContext>,
2452
2453    /// Output schema.
2454    schema: SchemaRef,
2455
2456    /// Cached plan properties.
2457    properties: PlanProperties,
2458
2459    /// Execution metrics.
2460    metrics: ExecutionPlanMetricsSet,
2461}
2462
2463impl fmt::Debug for GraphVariableLengthTraverseExec {
2464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2465        f.debug_struct("GraphVariableLengthTraverseExec")
2466            .field("source_column", &self.source_column)
2467            .field("edge_type_ids", &self.edge_type_ids)
2468            .field("direction", &self.direction)
2469            .field("min_hops", &self.min_hops)
2470            .field("max_hops", &self.max_hops)
2471            .field("target_variable", &self.target_variable)
2472            .finish()
2473    }
2474}
2475
2476impl GraphVariableLengthTraverseExec {
2477    /// Create a new variable-length traversal plan.
2478    ///
2479    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2480    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2481    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2482    #[expect(clippy::too_many_arguments)]
2483    pub fn new(
2484        input: Arc<dyn ExecutionPlan>,
2485        source_column: impl Into<String>,
2486        edge_type_ids: Vec<u32>,
2487        direction: Direction,
2488        min_hops: usize,
2489        max_hops: usize,
2490        target_variable: impl Into<String>,
2491        step_variable: Option<String>,
2492        path_variable: Option<String>,
2493        target_properties: Vec<String>,
2494        target_label_name: Option<String>,
2495        graph_ctx: Arc<GraphExecutionContext>,
2496        is_optional: bool,
2497        bound_target_column: Option<String>,
2498        edge_lance_filter: Option<String>,
2499        edge_property_conditions: Vec<(String, UniValue)>,
2500        used_edge_columns: Vec<String>,
2501        path_mode: super::nfa::PathMode,
2502        output_mode: super::nfa::VlpOutputMode,
2503        qpp_nfa: Option<PathNfa>,
2504    ) -> Self {
2505        let source_column = source_column.into();
2506        let target_variable = target_variable.into();
2507
2508        // Resolve target property Arrow types from the schema
2509        let uni_schema = graph_ctx.storage().schema_manager().schema();
2510        let label_props = target_label_name
2511            .as_deref()
2512            .and_then(|ln| uni_schema.properties.get(ln));
2513
2514        // Build output schema
2515        let schema = Self::build_schema(
2516            input.schema(),
2517            &target_variable,
2518            step_variable.as_deref(),
2519            path_variable.as_deref(),
2520            &target_properties,
2521            label_props,
2522        );
2523        let properties = compute_plan_properties(schema.clone());
2524
2525        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2526        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2527            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2528        }));
2529
2530        Self {
2531            input,
2532            source_column,
2533            edge_type_ids,
2534            direction,
2535            min_hops,
2536            max_hops,
2537            target_variable,
2538            step_variable,
2539            path_variable,
2540            target_properties,
2541            target_label_name,
2542            is_optional,
2543            bound_target_column,
2544            edge_lance_filter,
2545            edge_property_conditions,
2546            used_edge_columns,
2547            path_mode,
2548            output_mode,
2549            nfa,
2550            graph_ctx,
2551            schema,
2552            properties,
2553            metrics: ExecutionPlanMetricsSet::new(),
2554        }
2555    }
2556
2557    /// Build output schema.
2558    fn build_schema(
2559        input_schema: SchemaRef,
2560        target_variable: &str,
2561        step_variable: Option<&str>,
2562        path_variable: Option<&str>,
2563        target_properties: &[String],
2564        label_props: Option<
2565            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2566        >,
2567    ) -> SchemaRef {
2568        let mut fields: Vec<Field> = input_schema
2569            .fields()
2570            .iter()
2571            .map(|f| f.as_ref().clone())
2572            .collect();
2573
2574        // Add target VID column (only if not already in input)
2575        let target_vid_name = format!("{}._vid", target_variable);
2576        if input_schema.column_with_name(&target_vid_name).is_none() {
2577            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2578        }
2579
2580        // Add target ._labels column (only if not already in input)
2581        let target_labels_name = format!("{}._labels", target_variable);
2582        if input_schema.column_with_name(&target_labels_name).is_none() {
2583            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2584        }
2585
2586        // Add target vertex property columns (skip if already in input)
2587        for prop_name in target_properties {
2588            let col_name = format!("{}.{}", target_variable, prop_name);
2589            if input_schema.column_with_name(&col_name).is_none() {
2590                let arrow_type = resolve_property_type(prop_name, label_props);
2591                fields.push(Field::new(&col_name, arrow_type, true));
2592            }
2593        }
2594
2595        // Add hop count
2596        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2597
2598        // Add step variable (edge list) if bound
2599        if let Some(step_var) = step_variable {
2600            fields.push(build_edge_list_field(step_var));
2601        }
2602
2603        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2604        if let Some(path_var) = path_variable
2605            && input_schema.column_with_name(path_var).is_none()
2606        {
2607            fields.push(build_path_struct_field(path_var));
2608        }
2609
2610        Arc::new(Schema::new(fields))
2611    }
2612}
2613
2614impl DisplayAs for GraphVariableLengthTraverseExec {
2615    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2616        write!(
2617            f,
2618            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2619            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2620        )
2621    }
2622}
2623
2624impl ExecutionPlan for GraphVariableLengthTraverseExec {
2625    fn name(&self) -> &str {
2626        "GraphVariableLengthTraverseExec"
2627    }
2628
2629    fn as_any(&self) -> &dyn Any {
2630        self
2631    }
2632
2633    fn schema(&self) -> SchemaRef {
2634        self.schema.clone()
2635    }
2636
2637    fn properties(&self) -> &PlanProperties {
2638        &self.properties
2639    }
2640
2641    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2642        vec![&self.input]
2643    }
2644
2645    fn with_new_children(
2646        self: Arc<Self>,
2647        children: Vec<Arc<dyn ExecutionPlan>>,
2648    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2649        if children.len() != 1 {
2650            return Err(datafusion::error::DataFusionError::Plan(
2651                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2652            ));
2653        }
2654
2655        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2656        Ok(Arc::new(Self::new(
2657            children[0].clone(),
2658            self.source_column.clone(),
2659            self.edge_type_ids.clone(),
2660            self.direction,
2661            self.min_hops,
2662            self.max_hops,
2663            self.target_variable.clone(),
2664            self.step_variable.clone(),
2665            self.path_variable.clone(),
2666            self.target_properties.clone(),
2667            self.target_label_name.clone(),
2668            self.graph_ctx.clone(),
2669            self.is_optional,
2670            self.bound_target_column.clone(),
2671            self.edge_lance_filter.clone(),
2672            self.edge_property_conditions.clone(),
2673            self.used_edge_columns.clone(),
2674            self.path_mode.clone(),
2675            self.output_mode.clone(),
2676            Some((*self.nfa).clone()),
2677        )))
2678    }
2679
2680    fn execute(
2681        &self,
2682        partition: usize,
2683        context: Arc<TaskContext>,
2684    ) -> DFResult<SendableRecordBatchStream> {
2685        let input_stream = self.input.execute(partition, context)?;
2686
2687        let metrics = BaselineMetrics::new(&self.metrics, partition);
2688
2689        let warm_fut = self
2690            .graph_ctx
2691            .warming_future(self.edge_type_ids.clone(), self.direction);
2692
2693        Ok(Box::pin(GraphVariableLengthTraverseStream {
2694            input: input_stream,
2695            exec: Arc::new(self.clone_for_stream()),
2696            schema: self.schema.clone(),
2697            state: VarLengthStreamState::Warming(warm_fut),
2698            metrics,
2699        }))
2700    }
2701
2702    fn metrics(&self) -> Option<MetricsSet> {
2703        Some(self.metrics.clone_inner())
2704    }
2705}
2706
2707impl GraphVariableLengthTraverseExec {
2708    /// Clone fields needed for stream (avoids cloning the full struct).
2709    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2710        GraphVariableLengthTraverseExecData {
2711            source_column: self.source_column.clone(),
2712            edge_type_ids: self.edge_type_ids.clone(),
2713            direction: self.direction,
2714            min_hops: self.min_hops,
2715            max_hops: self.max_hops,
2716            target_variable: self.target_variable.clone(),
2717            step_variable: self.step_variable.clone(),
2718            path_variable: self.path_variable.clone(),
2719            target_properties: self.target_properties.clone(),
2720            target_label_name: self.target_label_name.clone(),
2721            is_optional: self.is_optional,
2722            bound_target_column: self.bound_target_column.clone(),
2723            edge_lance_filter: self.edge_lance_filter.clone(),
2724            edge_property_conditions: self.edge_property_conditions.clone(),
2725            used_edge_columns: self.used_edge_columns.clone(),
2726            path_mode: self.path_mode.clone(),
2727            output_mode: self.output_mode.clone(),
2728            nfa: self.nfa.clone(),
2729            graph_ctx: self.graph_ctx.clone(),
2730        }
2731    }
2732}
2733
2734/// Data needed by the stream (without ExecutionPlan overhead).
2735#[expect(
2736    dead_code,
2737    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2738)]
2739struct GraphVariableLengthTraverseExecData {
2740    source_column: String,
2741    edge_type_ids: Vec<u32>,
2742    direction: Direction,
2743    min_hops: usize,
2744    max_hops: usize,
2745    target_variable: String,
2746    step_variable: Option<String>,
2747    path_variable: Option<String>,
2748    target_properties: Vec<String>,
2749    target_label_name: Option<String>,
2750    is_optional: bool,
2751    bound_target_column: Option<String>,
2752    #[expect(dead_code, reason = "Used in Phase 3 warming")]
2753    edge_lance_filter: Option<String>,
2754    /// Simple property equality conditions for per-edge L0 checking during BFS.
2755    edge_property_conditions: Vec<(String, UniValue)>,
2756    used_edge_columns: Vec<String>,
2757    path_mode: super::nfa::PathMode,
2758    output_mode: super::nfa::VlpOutputMode,
2759    nfa: Arc<PathNfa>,
2760    graph_ctx: Arc<GraphExecutionContext>,
2761}
2762
2763/// Safety cap for frontier size to prevent OOM on pathological graphs.
2764const MAX_FRONTIER_SIZE: usize = 500_000;
2765/// Safety cap for predecessor pool size.
2766const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2767
2768impl GraphVariableLengthTraverseExecData {
2769    /// Check if a vertex passes the target label filter.
2770    fn check_target_label(&self, vid: Vid) -> bool {
2771        if let Some(ref label_name) = self.target_label_name {
2772            let query_ctx = self.graph_ctx.query_context();
2773            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2774                Some(labels) => labels.contains(label_name),
2775                None => true, // not in L0, trust storage
2776            }
2777        } else {
2778            true
2779        }
2780    }
2781
2782    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
2783    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2784        match constraint {
2785            super::nfa::VertexConstraint::Label(label_name) => {
2786                let query_ctx = self.graph_ctx.query_context();
2787                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2788                    Some(labels) => labels.contains(label_name),
2789                    None => true, // not in L0, trust storage
2790                }
2791            }
2792        }
2793    }
2794
2795    /// Expand neighbors from a vertex through all NFA transitions from the given state.
2796    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
2797    fn expand_neighbors(
2798        &self,
2799        vid: Vid,
2800        state: NfaStateId,
2801        eid_filter: &EidFilter,
2802        used_eids: &FxHashSet<u64>,
2803    ) -> Vec<(Vid, Eid, NfaStateId)> {
2804        let is_undirected = matches!(self.direction, Direction::Both);
2805        let mut results = Vec::new();
2806
2807        for transition in self.nfa.transitions_from(state) {
2808            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2809
2810            for &etype in &transition.edge_type_ids {
2811                for (neighbor, eid) in
2812                    self.graph_ctx
2813                        .get_neighbors(vid, etype, transition.direction)
2814                {
2815                    // Deduplicate edges for undirected patterns
2816                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
2817                        continue;
2818                    }
2819
2820                    // Check EidFilter (edge property bitmap preselection)
2821                    if !eid_filter.contains(eid) {
2822                        continue;
2823                    }
2824
2825                    // Check edge property conditions (L0 in-memory properties)
2826                    if !self.edge_property_conditions.is_empty() {
2827                        let query_ctx = self.graph_ctx.query_context();
2828                        let passes = if let Some(props) =
2829                            l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2830                        {
2831                            self.edge_property_conditions
2832                                .iter()
2833                                .all(|(name, expected)| {
2834                                    props.get(name).is_some_and(|actual| actual == expected)
2835                                })
2836                        } else {
2837                            // Edge not in L0 (CSR/Lance) — relies on EidFilter
2838                            // for correctness. TODO: build EidFilter from Lance
2839                            // during warming for flushed edges.
2840                            true
2841                        };
2842                        if !passes {
2843                            continue;
2844                        }
2845                    }
2846
2847                    // Check cross-pattern relationship uniqueness
2848                    if used_eids.contains(&eid.as_u64()) {
2849                        continue;
2850                    }
2851
2852                    // Check NFA state constraint on the destination state (QPP label filters)
2853                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
2854                        && !self.check_state_constraint(neighbor, constraint)
2855                    {
2856                        continue;
2857                    }
2858
2859                    results.push((neighbor, eid, transition.to));
2860                }
2861            }
2862        }
2863
2864        results
2865    }
2866
2867    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
2868    ///
2869    /// Returns BFS results in the same format as the old bfs() for compatibility
2870    /// with build_output_batch.
2871    fn bfs_with_dag(
2872        &self,
2873        source: Vid,
2874        eid_filter: &EidFilter,
2875        used_eids: &FxHashSet<u64>,
2876        vid_filter: &VidFilter,
2877    ) -> Vec<BfsResult> {
2878        let nfa = &self.nfa;
2879        let selector = PathSelector::All;
2880        let mut dag = PredecessorDag::new(selector);
2881        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2882
2883        // Handle zero-length paths (min_hops == 0)
2884        if nfa.is_accepting(nfa.start_state())
2885            && self.check_target_label(source)
2886            && vid_filter.contains(source)
2887        {
2888            accepting.push((source, nfa.start_state(), 0));
2889        }
2890
2891        // Per-depth frontier BFS
2892        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2893        let mut depth: u32 = 0;
2894
2895        while !frontier.is_empty() && depth < self.max_hops as u32 {
2896            depth += 1;
2897            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2898            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2899
2900            for &(vid, state) in &frontier {
2901                for (neighbor, eid, dst_state) in
2902                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2903                {
2904                    // Record in predecessor DAG
2905                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2906
2907                    // Add to next frontier (deduplicated per depth)
2908                    if seen_at_depth.insert((neighbor, dst_state)) {
2909                        next_frontier.push((neighbor, dst_state));
2910
2911                        // Check if accepting
2912                        if nfa.is_accepting(dst_state)
2913                            && self.check_target_label(neighbor)
2914                            && vid_filter.contains(neighbor)
2915                        {
2916                            accepting.push((neighbor, dst_state, depth));
2917                        }
2918                    }
2919                }
2920            }
2921
2922            // Safety cap
2923            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2924                break;
2925            }
2926
2927            frontier = next_frontier;
2928        }
2929
2930        // Enumerate paths from DAG to produce BfsResult tuples
2931        let mut results: Vec<BfsResult> = Vec::new();
2932        for &(target, state, depth) in &accepting {
2933            dag.enumerate_paths(
2934                source,
2935                target,
2936                state,
2937                depth,
2938                depth,
2939                &self.path_mode,
2940                &mut |nodes, edges| {
2941                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2942                    std::ops::ControlFlow::Continue(())
2943                },
2944            );
2945        }
2946
2947        results
2948    }
2949
2950    /// NFA-driven BFS returning only endpoints and depths (Mode A).
2951    ///
2952    /// More efficient when no path/step variable is bound — skips full path enumeration.
2953    /// Uses lightweight trail verification via has_trail_valid_path().
2954    fn bfs_endpoints_only(
2955        &self,
2956        source: Vid,
2957        eid_filter: &EidFilter,
2958        used_eids: &FxHashSet<u64>,
2959        vid_filter: &VidFilter,
2960    ) -> Vec<(Vid, u32)> {
2961        let nfa = &self.nfa;
2962        let selector = PathSelector::Any; // Only need existence, not all paths
2963        let mut dag = PredecessorDag::new(selector);
2964        let mut results: Vec<(Vid, u32)> = Vec::new();
2965
2966        // Handle zero-length paths
2967        if nfa.is_accepting(nfa.start_state())
2968            && self.check_target_label(source)
2969            && vid_filter.contains(source)
2970        {
2971            results.push((source, 0));
2972        }
2973
2974        // Per-depth frontier BFS
2975        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2976        let mut depth: u32 = 0;
2977
2978        while !frontier.is_empty() && depth < self.max_hops as u32 {
2979            depth += 1;
2980            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2981            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2982
2983            for &(vid, state) in &frontier {
2984                for (neighbor, eid, dst_state) in
2985                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2986                {
2987                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2988
2989                    if seen_at_depth.insert((neighbor, dst_state)) {
2990                        next_frontier.push((neighbor, dst_state));
2991
2992                        // Check if accepting with trail verification
2993                        if nfa.is_accepting(dst_state)
2994                            && self.check_target_label(neighbor)
2995                            && vid_filter.contains(neighbor)
2996                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
2997                        {
2998                            results.push((neighbor, depth));
2999                        }
3000                    }
3001                }
3002            }
3003
3004            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3005                break;
3006            }
3007
3008            frontier = next_frontier;
3009        }
3010
3011        results
3012    }
3013}
3014
3015/// State machine for variable-length traverse stream.
3016enum VarLengthStreamState {
3017    /// Warming adjacency CSRs before first batch.
3018    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3019    /// Processing input batches.
3020    Reading,
3021    /// Materializing target vertex properties asynchronously.
3022    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3023    /// Stream is done.
3024    Done,
3025}
3026
3027/// Stream for variable-length traversal.
3028struct GraphVariableLengthTraverseStream {
3029    input: SendableRecordBatchStream,
3030    exec: Arc<GraphVariableLengthTraverseExecData>,
3031    schema: SchemaRef,
3032    state: VarLengthStreamState,
3033    metrics: BaselineMetrics,
3034}
3035
3036impl Stream for GraphVariableLengthTraverseStream {
3037    type Item = DFResult<RecordBatch>;
3038
3039    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3040        loop {
3041            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3042
3043            match state {
3044                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3045                    Poll::Ready(Ok(())) => {
3046                        self.state = VarLengthStreamState::Reading;
3047                        // Continue loop to start reading
3048                    }
3049                    Poll::Ready(Err(e)) => {
3050                        self.state = VarLengthStreamState::Done;
3051                        return Poll::Ready(Some(Err(e)));
3052                    }
3053                    Poll::Pending => {
3054                        self.state = VarLengthStreamState::Warming(fut);
3055                        return Poll::Pending;
3056                    }
3057                },
3058                VarLengthStreamState::Reading => {
3059                    // Check timeout
3060                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3061                        return Poll::Ready(Some(Err(
3062                            datafusion::error::DataFusionError::Execution(e.to_string()),
3063                        )));
3064                    }
3065
3066                    match self.input.poll_next_unpin(cx) {
3067                        Poll::Ready(Some(Ok(batch))) => {
3068                            // Build base batch synchronously (BFS + expand)
3069                            // TODO(Phase 3.5): Build real EidFilter/VidFilter during warming
3070                            let eid_filter = EidFilter::AllAllowed;
3071                            let vid_filter = VidFilter::AllAllowed;
3072                            let base_result =
3073                                self.process_batch_base(batch, &eid_filter, &vid_filter);
3074                            let base_batch = match base_result {
3075                                Ok(b) => b,
3076                                Err(e) => {
3077                                    self.state = VarLengthStreamState::Reading;
3078                                    return Poll::Ready(Some(Err(e)));
3079                                }
3080                            };
3081
3082                            // If no properties need async hydration, return directly
3083                            if self.exec.target_properties.is_empty() {
3084                                self.state = VarLengthStreamState::Reading;
3085                                return Poll::Ready(Some(Ok(base_batch)));
3086                            }
3087
3088                            // Properties needed — create async future for hydration
3089                            let schema = self.schema.clone();
3090                            let target_variable = self.exec.target_variable.clone();
3091                            let target_properties = self.exec.target_properties.clone();
3092                            let target_label_name = self.exec.target_label_name.clone();
3093                            let graph_ctx = self.exec.graph_ctx.clone();
3094
3095                            let fut = hydrate_vlp_target_properties(
3096                                base_batch,
3097                                schema,
3098                                target_variable,
3099                                target_properties,
3100                                target_label_name,
3101                                graph_ctx,
3102                            );
3103
3104                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3105                            // Continue loop to poll the future
3106                        }
3107                        Poll::Ready(Some(Err(e))) => {
3108                            self.state = VarLengthStreamState::Done;
3109                            return Poll::Ready(Some(Err(e)));
3110                        }
3111                        Poll::Ready(None) => {
3112                            self.state = VarLengthStreamState::Done;
3113                            return Poll::Ready(None);
3114                        }
3115                        Poll::Pending => {
3116                            self.state = VarLengthStreamState::Reading;
3117                            return Poll::Pending;
3118                        }
3119                    }
3120                }
3121                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3122                    Poll::Ready(Ok(batch)) => {
3123                        self.state = VarLengthStreamState::Reading;
3124                        self.metrics.record_output(batch.num_rows());
3125                        return Poll::Ready(Some(Ok(batch)));
3126                    }
3127                    Poll::Ready(Err(e)) => {
3128                        self.state = VarLengthStreamState::Done;
3129                        return Poll::Ready(Some(Err(e)));
3130                    }
3131                    Poll::Pending => {
3132                        self.state = VarLengthStreamState::Materializing(fut);
3133                        return Poll::Pending;
3134                    }
3135                },
3136                VarLengthStreamState::Done => {
3137                    return Poll::Ready(None);
3138                }
3139            }
3140        }
3141    }
3142}
3143
3144impl GraphVariableLengthTraverseStream {
3145    fn process_batch_base(
3146        &self,
3147        batch: RecordBatch,
3148        eid_filter: &EidFilter,
3149        vid_filter: &VidFilter,
3150    ) -> DFResult<RecordBatch> {
3151        let source_col = batch
3152            .column_by_name(&self.exec.source_column)
3153            .ok_or_else(|| {
3154                datafusion::error::DataFusionError::Execution(format!(
3155                    "Source column '{}' not found",
3156                    self.exec.source_column
3157                ))
3158            })?;
3159
3160        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3161        let source_vids: &UInt64Array = &source_vid_cow;
3162
3163        // Read bound target VIDs if column exists
3164        let bound_target_cow = self
3165            .exec
3166            .bound_target_column
3167            .as_ref()
3168            .and_then(|col| batch.column_by_name(col))
3169            .map(|c| column_as_vid_array(c.as_ref()))
3170            .transpose()?;
3171        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3172
3173        // Extract used edge columns for cross-pattern relationship uniqueness
3174        let used_edge_arrays: Vec<&UInt64Array> = self
3175            .exec
3176            .used_edge_columns
3177            .iter()
3178            .filter_map(|col| {
3179                batch
3180                    .column_by_name(col)?
3181                    .as_any()
3182                    .downcast_ref::<UInt64Array>()
3183            })
3184            .collect();
3185
3186        // Collect all BFS results
3187        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3188
3189        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3190            let mut emitted_for_row = false;
3191
3192            if let Some(src) = source_vid {
3193                let vid = Vid::from(src);
3194
3195                // Collect used edge IDs from previous hops for this row
3196                let used_eids: FxHashSet<u64> = used_edge_arrays
3197                    .iter()
3198                    .filter_map(|arr| {
3199                        if arr.is_null(row_idx) {
3200                            None
3201                        } else {
3202                            Some(arr.value(row_idx))
3203                        }
3204                    })
3205                    .collect();
3206
3207                // Dispatch to appropriate BFS mode based on output_mode
3208                match &self.exec.output_mode {
3209                    VlpOutputMode::EndpointsOnly => {
3210                        let endpoints = self
3211                            .exec
3212                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3213                        for (target, depth) in endpoints {
3214                            // Filter by bound target VID
3215                            if let Some(targets) = expected_targets {
3216                                if targets.is_null(row_idx) {
3217                                    continue;
3218                                }
3219                                if target.as_u64() != targets.value(row_idx) {
3220                                    continue;
3221                                }
3222                            }
3223                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3224                            emitted_for_row = true;
3225                        }
3226                    }
3227                    _ => {
3228                        // FullPath, StepVariable, CountOnly, etc.
3229                        let bfs_results = self
3230                            .exec
3231                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3232                        for (target, hop_count, node_path, edge_path) in bfs_results {
3233                            // Filter by bound target VID
3234                            if let Some(targets) = expected_targets {
3235                                if targets.is_null(row_idx) {
3236                                    continue;
3237                                }
3238                                if target.as_u64() != targets.value(row_idx) {
3239                                    continue;
3240                                }
3241                            }
3242                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3243                            emitted_for_row = true;
3244                        }
3245                    }
3246                }
3247            }
3248
3249            if self.exec.is_optional && !emitted_for_row {
3250                // Preserve the source row with NULL optional bindings.
3251                // We use empty node/edge paths to mark unmatched rows.
3252                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3253            }
3254        }
3255
3256        self.build_output_batch(&batch, &expansions)
3257    }
3258
3259    fn build_output_batch(
3260        &self,
3261        input: &RecordBatch,
3262        expansions: &[VarLengthExpansion],
3263    ) -> DFResult<RecordBatch> {
3264        if expansions.is_empty() {
3265            return Ok(RecordBatch::new_empty(self.schema.clone()));
3266        }
3267
3268        let num_rows = expansions.len();
3269
3270        // Build index array
3271        let indices: Vec<u64> = expansions
3272            .iter()
3273            .map(|(idx, _, _, _, _)| *idx as u64)
3274            .collect();
3275        let indices_array = UInt64Array::from(indices);
3276
3277        // Expand input columns
3278        let mut columns: Vec<ArrayRef> = Vec::new();
3279        for col in input.columns() {
3280            let expanded = take(col.as_ref(), &indices_array, None)?;
3281            columns.push(expanded);
3282        }
3283
3284        // Collect target VIDs and unmatched markers for use in multiple places.
3285        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3286        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3287        let unmatched_rows: Vec<bool> = expansions
3288            .iter()
3289            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3290            .collect();
3291        let target_vids: Vec<Option<u64>> = expansions
3292            .iter()
3293            .zip(unmatched_rows.iter())
3294            .map(
3295                |((_, vid, _, _, _), unmatched)| {
3296                    if *unmatched { None } else { Some(vid.as_u64()) }
3297                },
3298            )
3299            .collect();
3300
3301        // Add target VID column (only if not already in input)
3302        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3303        if input.schema().column_with_name(&target_vid_name).is_none() {
3304            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3305        }
3306
3307        // Add target ._labels column (only if not already in input)
3308        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3309        if input
3310            .schema()
3311            .column_with_name(&target_labels_name)
3312            .is_none()
3313        {
3314            use arrow_array::builder::{ListBuilder, StringBuilder};
3315            let query_ctx = self.exec.graph_ctx.query_context();
3316            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3317            for target_vid in &target_vids {
3318                let Some(vid_u64) = target_vid else {
3319                    labels_builder.append(false);
3320                    continue;
3321                };
3322                let vid = Vid::from(*vid_u64);
3323                let row_labels: Vec<String> =
3324                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3325                        Some(labels) => {
3326                            // Vertex is in L0 — use actual labels only
3327                            labels
3328                        }
3329                        None => {
3330                            // Vertex not in L0 — trust schema label (storage already filtered)
3331                            if let Some(ref label_name) = self.exec.target_label_name {
3332                                vec![label_name.clone()]
3333                            } else {
3334                                vec![]
3335                            }
3336                        }
3337                    };
3338                let values = labels_builder.values();
3339                for lbl in &row_labels {
3340                    values.append_value(lbl);
3341                }
3342                labels_builder.append(true);
3343            }
3344            columns.push(Arc::new(labels_builder.finish()));
3345        }
3346
3347        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3348        for prop_name in &self.exec.target_properties {
3349            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3350            if input.schema().column_with_name(&full_prop_name).is_none() {
3351                let col_idx = columns.len();
3352                if col_idx < self.schema.fields().len() {
3353                    let field = self.schema.field(col_idx);
3354                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3355                }
3356            }
3357        }
3358
3359        // Add hop count column
3360        let hop_counts: Vec<u64> = expansions
3361            .iter()
3362            .map(|(_, _, hops, _, _)| *hops as u64)
3363            .collect();
3364        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3365
3366        // Add step variable (edge list) column if bound
3367        if self.exec.step_variable.is_some() {
3368            let mut edges_builder = new_edge_list_builder();
3369            let query_ctx = self.exec.graph_ctx.query_context();
3370
3371            for (_, _, _, node_path, edge_path) in expansions {
3372                if node_path.is_empty() && edge_path.is_empty() {
3373                    // Null row for OPTIONAL MATCH unmatched
3374                    edges_builder.append_null();
3375                } else if edge_path.is_empty() {
3376                    // Zero-hop match: empty list
3377                    edges_builder.append(true);
3378                } else {
3379                    for (i, eid) in edge_path.iter().enumerate() {
3380                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3381                            .unwrap_or_else(|| "UNKNOWN".to_string());
3382                        append_edge_to_struct(
3383                            edges_builder.values(),
3384                            *eid,
3385                            &type_name,
3386                            node_path[i].as_u64(),
3387                            node_path[i + 1].as_u64(),
3388                            &query_ctx,
3389                        );
3390                    }
3391                    edges_builder.append(true);
3392                }
3393            }
3394
3395            columns.push(Arc::new(edges_builder.finish()));
3396        }
3397
3398        // Add path variable column if bound.
3399        // For named paths, we output a Path struct with nodes and relationships arrays.
3400        // If a path column already exists in input (from a prior BindFixedPath), extend it
3401        // rather than building from scratch.
3402        if let Some(path_var_name) = &self.exec.path_variable {
3403            let existing_path_col_idx = input
3404                .schema()
3405                .column_with_name(path_var_name)
3406                .map(|(idx, _)| idx);
3407            // Clone the Arc so we can read existing path without borrowing `columns`
3408            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3409            let existing_path = existing_path_arc
3410                .as_ref()
3411                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3412
3413            let mut nodes_builder = new_node_list_builder();
3414            let mut rels_builder = new_edge_list_builder();
3415            let query_ctx = self.exec.graph_ctx.query_context();
3416            let mut path_validity = Vec::with_capacity(expansions.len());
3417
3418            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3419                if node_path.is_empty() && edge_path.is_empty() {
3420                    nodes_builder.append(false);
3421                    rels_builder.append(false);
3422                    path_validity.push(false);
3423                    continue;
3424                }
3425
3426                // Prepend existing path prefix if extending
3427                let skip_first_vlp_node = if let Some(existing) = existing_path {
3428                    if !existing.is_null(row_out_idx) {
3429                        prepend_existing_path(
3430                            existing,
3431                            row_out_idx,
3432                            &mut nodes_builder,
3433                            &mut rels_builder,
3434                            &query_ctx,
3435                        );
3436                        true
3437                    } else {
3438                        false
3439                    }
3440                } else {
3441                    false
3442                };
3443
3444                // Append VLP nodes (skip first if extending — it's the junction point)
3445                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3446                for vid in &node_path[start_idx..] {
3447                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3448                }
3449                nodes_builder.append(true);
3450
3451                for (i, eid) in edge_path.iter().enumerate() {
3452                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3453                        .unwrap_or_else(|| "UNKNOWN".to_string());
3454                    append_edge_to_struct(
3455                        rels_builder.values(),
3456                        *eid,
3457                        &type_name,
3458                        node_path[i].as_u64(),
3459                        node_path[i + 1].as_u64(),
3460                        &query_ctx,
3461                    );
3462                }
3463                rels_builder.append(true);
3464                path_validity.push(true);
3465            }
3466
3467            // Finish builders and get ListArrays
3468            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3469            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3470
3471            // Build the path struct fields
3472            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3473            let rels_field = Arc::new(Field::new(
3474                "relationships",
3475                rels_array.data_type().clone(),
3476                true,
3477            ));
3478
3479            // Create the path struct array
3480            let path_struct = arrow_array::StructArray::try_new(
3481                vec![nodes_field, rels_field].into(),
3482                vec![nodes_array, rels_array],
3483                Some(arrow::buffer::NullBuffer::from(path_validity)),
3484            )
3485            .map_err(arrow_err)?;
3486
3487            if let Some(idx) = existing_path_col_idx {
3488                columns[idx] = Arc::new(path_struct);
3489            } else {
3490                columns.push(Arc::new(path_struct));
3491            }
3492        }
3493
3494        self.metrics.record_output(num_rows);
3495
3496        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3497    }
3498}
3499
3500impl RecordBatchStream for GraphVariableLengthTraverseStream {
3501    fn schema(&self) -> SchemaRef {
3502        self.schema.clone()
3503    }
3504}
3505
3506/// Hydrate target vertex properties into a VLP batch.
3507///
3508/// The base batch already has null placeholder columns for target properties.
3509/// This function replaces them with actual property values fetched from storage.
3510async fn hydrate_vlp_target_properties(
3511    base_batch: RecordBatch,
3512    schema: SchemaRef,
3513    target_variable: String,
3514    target_properties: Vec<String>,
3515    target_label_name: Option<String>,
3516    graph_ctx: Arc<GraphExecutionContext>,
3517) -> DFResult<RecordBatch> {
3518    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3519        return Ok(base_batch);
3520    }
3521
3522    // Find the target VID column by exact name.
3523    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3524    //
3525    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3526    // clauses referencing the same variable), there may be duplicate column names. We need
3527    // the LAST occurrence of target._vid, which is the one added by the VLP.
3528    let target_vid_col_name = format!("{}._vid", target_variable);
3529    let vid_col_idx = schema
3530        .fields()
3531        .iter()
3532        .enumerate()
3533        .rev()
3534        .find(|(_, f)| f.name() == &target_vid_col_name)
3535        .map(|(i, _)| i);
3536
3537    let Some(vid_col_idx) = vid_col_idx else {
3538        return Ok(base_batch);
3539    };
3540
3541    let vid_col = base_batch.column(vid_col_idx);
3542    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3543    let target_vid_array: &UInt64Array = &target_vid_cow;
3544
3545    let target_vids: Vec<Vid> = target_vid_array
3546        .iter()
3547        // Preserve null rows by mapping them to a sentinel VID that never resolves
3548        // to stored properties. The output property columns remain NULL for these rows.
3549        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3550        .collect();
3551
3552    // Fetch properties from storage
3553    let mut property_columns: Vec<ArrayRef> = Vec::new();
3554
3555    if let Some(ref label_name) = target_label_name {
3556        let property_manager = graph_ctx.property_manager();
3557        let query_ctx = graph_ctx.query_context();
3558
3559        let props_map = property_manager
3560            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3561            .await
3562            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3563
3564        let uni_schema = graph_ctx.storage().schema_manager().schema();
3565        let label_props = uni_schema.properties.get(label_name.as_str());
3566
3567        for prop_name in &target_properties {
3568            let data_type = resolve_property_type(prop_name, label_props);
3569            let column =
3570                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3571            property_columns.push(column);
3572        }
3573    } else {
3574        // No label name — use label-agnostic property lookup.
3575        // This scans all label datasets, slower but correct for label-less traversals.
3576        let non_internal_props: Vec<&str> = target_properties
3577            .iter()
3578            .filter(|p| *p != "_all_props")
3579            .map(|s| s.as_str())
3580            .collect();
3581        let property_manager = graph_ctx.property_manager();
3582        let query_ctx = graph_ctx.query_context();
3583
3584        let props_map = if !non_internal_props.is_empty() {
3585            property_manager
3586                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3587                .await
3588                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3589        } else {
3590            std::collections::HashMap::new()
3591        };
3592
3593        for prop_name in &target_properties {
3594            if prop_name == "_all_props" {
3595                // Build CypherValue blob from all vertex properties (L0 + storage)
3596                use crate::query::df_graph::scan::encode_cypher_value;
3597                use arrow_array::builder::LargeBinaryBuilder;
3598
3599                let mut builder = LargeBinaryBuilder::new();
3600                let l0_ctx = graph_ctx.l0_context();
3601                for vid in &target_vids {
3602                    let mut merged_props = serde_json::Map::new();
3603                    // Collect from storage-hydrated props
3604                    if let Some(vid_props) = props_map.get(vid) {
3605                        for (k, v) in vid_props.iter() {
3606                            let json_val: serde_json::Value = v.clone().into();
3607                            merged_props.insert(k.to_string(), json_val);
3608                        }
3609                    }
3610                    // Overlay L0 properties
3611                    for l0 in l0_ctx.iter_l0_buffers() {
3612                        let guard = l0.read();
3613                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3614                            for (k, v) in l0_props.iter() {
3615                                let json_val: serde_json::Value = v.clone().into();
3616                                merged_props.insert(k.to_string(), json_val);
3617                            }
3618                        }
3619                    }
3620                    if merged_props.is_empty() {
3621                        builder.append_null();
3622                    } else {
3623                        let json = serde_json::Value::Object(merged_props);
3624                        match encode_cypher_value(&json) {
3625                            Ok(bytes) => builder.append_value(bytes),
3626                            Err(_) => builder.append_null(),
3627                        }
3628                    }
3629                }
3630                property_columns.push(Arc::new(builder.finish()));
3631            } else {
3632                let column = build_property_column_static(
3633                    &target_vids,
3634                    &props_map,
3635                    prop_name,
3636                    &arrow::datatypes::DataType::LargeBinary,
3637                )?;
3638                property_columns.push(column);
3639            }
3640        }
3641    }
3642
3643    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3644    // Find each property column by name — works regardless of column ordering
3645    // (schema-aware puts props before _hop_count; schemaless puts them after).
3646    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3647    // not pre-existing input columns with the same name (duplicate variable binding).
3648    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3649    let mut prop_idx = 0;
3650    for (col_idx, field) in schema.fields().iter().enumerate() {
3651        let is_target_prop = col_idx > vid_col_idx
3652            && target_properties
3653                .iter()
3654                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3655        if is_target_prop && prop_idx < property_columns.len() {
3656            new_columns.push(property_columns[prop_idx].clone());
3657            prop_idx += 1;
3658        } else {
3659            new_columns.push(base_batch.column(col_idx).clone());
3660        }
3661    }
3662
3663    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3664}
3665
3666// ============================================================================
3667// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3668// ============================================================================
3669
3670/// Execution plan for variable-length path traversal on schemaless edge types.
3671///
3672/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3673/// that don't have schema-defined IDs. It queries the main edges table by type name.
3674/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3675pub struct GraphVariableLengthTraverseMainExec {
3676    /// Input execution plan.
3677    input: Arc<dyn ExecutionPlan>,
3678
3679    /// Column name containing source VIDs.
3680    source_column: String,
3681
3682    /// Edge type names (not IDs, since schemaless types may not have IDs).
3683    type_names: Vec<String>,
3684
3685    /// Traversal direction.
3686    direction: Direction,
3687
3688    /// Minimum number of hops.
3689    min_hops: usize,
3690
3691    /// Maximum number of hops.
3692    max_hops: usize,
3693
3694    /// Variable name for target vertex columns.
3695    target_variable: String,
3696
3697    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3698    step_variable: Option<String>,
3699
3700    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3701    path_variable: Option<String>,
3702
3703    /// Target vertex properties to materialize.
3704    target_properties: Vec<String>,
3705
3706    /// Whether this is an optional match (LEFT JOIN semantics).
3707    is_optional: bool,
3708
3709    /// Column name of an already-bound target VID (for patterns where target is in scope).
3710    bound_target_column: Option<String>,
3711
3712    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3713    edge_lance_filter: Option<String>,
3714
3715    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
3716    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
3717    edge_property_conditions: Vec<(String, UniValue)>,
3718
3719    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
3720    used_edge_columns: Vec<String>,
3721
3722    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
3723    path_mode: super::nfa::PathMode,
3724
3725    /// Output mode determining BFS strategy.
3726    output_mode: super::nfa::VlpOutputMode,
3727
3728    /// Graph execution context.
3729    graph_ctx: Arc<GraphExecutionContext>,
3730
3731    /// Output schema.
3732    schema: SchemaRef,
3733
3734    /// Cached plan properties.
3735    properties: PlanProperties,
3736
3737    /// Execution metrics.
3738    metrics: ExecutionPlanMetricsSet,
3739}
3740
3741impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3742    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3743        f.debug_struct("GraphVariableLengthTraverseMainExec")
3744            .field("source_column", &self.source_column)
3745            .field("type_names", &self.type_names)
3746            .field("direction", &self.direction)
3747            .field("min_hops", &self.min_hops)
3748            .field("max_hops", &self.max_hops)
3749            .field("target_variable", &self.target_variable)
3750            .finish()
3751    }
3752}
3753
3754impl GraphVariableLengthTraverseMainExec {
3755    /// Create a new variable-length traversal plan for schemaless edges.
3756    #[expect(clippy::too_many_arguments)]
3757    pub fn new(
3758        input: Arc<dyn ExecutionPlan>,
3759        source_column: impl Into<String>,
3760        type_names: Vec<String>,
3761        direction: Direction,
3762        min_hops: usize,
3763        max_hops: usize,
3764        target_variable: impl Into<String>,
3765        step_variable: Option<String>,
3766        path_variable: Option<String>,
3767        target_properties: Vec<String>,
3768        graph_ctx: Arc<GraphExecutionContext>,
3769        is_optional: bool,
3770        bound_target_column: Option<String>,
3771        edge_lance_filter: Option<String>,
3772        edge_property_conditions: Vec<(String, UniValue)>,
3773        used_edge_columns: Vec<String>,
3774        path_mode: super::nfa::PathMode,
3775        output_mode: super::nfa::VlpOutputMode,
3776    ) -> Self {
3777        let source_column = source_column.into();
3778        let target_variable = target_variable.into();
3779
3780        // Build output schema
3781        let schema = Self::build_schema(
3782            input.schema(),
3783            &target_variable,
3784            step_variable.as_deref(),
3785            path_variable.as_deref(),
3786            &target_properties,
3787        );
3788        let properties = compute_plan_properties(schema.clone());
3789
3790        Self {
3791            input,
3792            source_column,
3793            type_names,
3794            direction,
3795            min_hops,
3796            max_hops,
3797            target_variable,
3798            step_variable,
3799            path_variable,
3800            target_properties,
3801            is_optional,
3802            bound_target_column,
3803            edge_lance_filter,
3804            edge_property_conditions,
3805            used_edge_columns,
3806            path_mode,
3807            output_mode,
3808            graph_ctx,
3809            schema,
3810            properties,
3811            metrics: ExecutionPlanMetricsSet::new(),
3812        }
3813    }
3814
3815    /// Build output schema.
3816    fn build_schema(
3817        input_schema: SchemaRef,
3818        target_variable: &str,
3819        step_variable: Option<&str>,
3820        path_variable: Option<&str>,
3821        target_properties: &[String],
3822    ) -> SchemaRef {
3823        let mut fields: Vec<Field> = input_schema
3824            .fields()
3825            .iter()
3826            .map(|f| f.as_ref().clone())
3827            .collect();
3828
3829        // Add target VID column (only if not already in input)
3830        let target_vid_name = format!("{}._vid", target_variable);
3831        if input_schema.column_with_name(&target_vid_name).is_none() {
3832            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3833        }
3834
3835        // Add target ._labels column (only if not already in input)
3836        let target_labels_name = format!("{}._labels", target_variable);
3837        if input_schema.column_with_name(&target_labels_name).is_none() {
3838            fields.push(Field::new(target_labels_name, labels_data_type(), true));
3839        }
3840
3841        // Add hop count
3842        fields.push(Field::new("_hop_count", DataType::UInt64, false));
3843
3844        // Add step variable column (list of edge structs) if bound
3845        // This is the relationship variable like `r` in `[r*1..3]`
3846        if let Some(step_var) = step_variable {
3847            fields.push(build_edge_list_field(step_var));
3848        }
3849
3850        // Add path struct if bound (only if not already in input from prior BindFixedPath)
3851        if let Some(path_var) = path_variable
3852            && input_schema.column_with_name(path_var).is_none()
3853        {
3854            fields.push(build_path_struct_field(path_var));
3855        }
3856
3857        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
3858        // Skip properties that are already in the input schema
3859        for prop in target_properties {
3860            let prop_name = format!("{}.{}", target_variable, prop);
3861            if input_schema.column_with_name(&prop_name).is_none() {
3862                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3863            }
3864        }
3865
3866        Arc::new(Schema::new(fields))
3867    }
3868}
3869
3870impl DisplayAs for GraphVariableLengthTraverseMainExec {
3871    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3872        write!(
3873            f,
3874            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3875            self.source_column, self.type_names, self.min_hops, self.max_hops
3876        )
3877    }
3878}
3879
3880impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3881    fn name(&self) -> &str {
3882        "GraphVariableLengthTraverseMainExec"
3883    }
3884
3885    fn as_any(&self) -> &dyn Any {
3886        self
3887    }
3888
3889    fn schema(&self) -> SchemaRef {
3890        self.schema.clone()
3891    }
3892
3893    fn properties(&self) -> &PlanProperties {
3894        &self.properties
3895    }
3896
3897    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3898        vec![&self.input]
3899    }
3900
3901    fn with_new_children(
3902        self: Arc<Self>,
3903        children: Vec<Arc<dyn ExecutionPlan>>,
3904    ) -> DFResult<Arc<dyn ExecutionPlan>> {
3905        if children.len() != 1 {
3906            return Err(datafusion::error::DataFusionError::Plan(
3907                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3908            ));
3909        }
3910
3911        Ok(Arc::new(Self::new(
3912            children[0].clone(),
3913            self.source_column.clone(),
3914            self.type_names.clone(),
3915            self.direction,
3916            self.min_hops,
3917            self.max_hops,
3918            self.target_variable.clone(),
3919            self.step_variable.clone(),
3920            self.path_variable.clone(),
3921            self.target_properties.clone(),
3922            self.graph_ctx.clone(),
3923            self.is_optional,
3924            self.bound_target_column.clone(),
3925            self.edge_lance_filter.clone(),
3926            self.edge_property_conditions.clone(),
3927            self.used_edge_columns.clone(),
3928            self.path_mode.clone(),
3929            self.output_mode.clone(),
3930        )))
3931    }
3932
3933    fn execute(
3934        &self,
3935        partition: usize,
3936        context: Arc<TaskContext>,
3937    ) -> DFResult<SendableRecordBatchStream> {
3938        let input_stream = self.input.execute(partition, context)?;
3939        let metrics = BaselineMetrics::new(&self.metrics, partition);
3940
3941        // Build adjacency map from main edges table (async)
3942        let graph_ctx = self.graph_ctx.clone();
3943        let type_names = self.type_names.clone();
3944        let direction = self.direction;
3945        let load_fut =
3946            async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3947
3948        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3949            input: input_stream,
3950            source_column: self.source_column.clone(),
3951            type_names: self.type_names.clone(),
3952            direction: self.direction,
3953            min_hops: self.min_hops,
3954            max_hops: self.max_hops,
3955            target_variable: self.target_variable.clone(),
3956            step_variable: self.step_variable.clone(),
3957            path_variable: self.path_variable.clone(),
3958            target_properties: self.target_properties.clone(),
3959            graph_ctx: self.graph_ctx.clone(),
3960            is_optional: self.is_optional,
3961            bound_target_column: self.bound_target_column.clone(),
3962            edge_lance_filter: self.edge_lance_filter.clone(),
3963            edge_property_conditions: self.edge_property_conditions.clone(),
3964            used_edge_columns: self.used_edge_columns.clone(),
3965            path_mode: self.path_mode.clone(),
3966            output_mode: self.output_mode.clone(),
3967            schema: self.schema.clone(),
3968            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3969            metrics,
3970        }))
3971    }
3972
3973    fn metrics(&self) -> Option<MetricsSet> {
3974        Some(self.metrics.clone_inner())
3975    }
3976}
3977
3978/// State machine for VLP schemaless stream.
3979enum VarLengthMainStreamState {
3980    /// Loading adjacency map from main edges table.
3981    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3982    /// Processing input batches with loaded adjacency.
3983    Processing(EdgeAdjacencyMap),
3984    /// Materializing properties for a batch.
3985    Materializing {
3986        adjacency: EdgeAdjacencyMap,
3987        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3988    },
3989    /// Stream is done.
3990    Done,
3991}
3992
3993/// Stream for variable-length traversal on schemaless edges.
3994#[expect(dead_code, reason = "VLP fields used in Phase 3")]
3995struct GraphVariableLengthTraverseMainStream {
3996    input: SendableRecordBatchStream,
3997    source_column: String,
3998    type_names: Vec<String>,
3999    direction: Direction,
4000    min_hops: usize,
4001    max_hops: usize,
4002    target_variable: String,
4003    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4004    step_variable: Option<String>,
4005    path_variable: Option<String>,
4006    target_properties: Vec<String>,
4007    graph_ctx: Arc<GraphExecutionContext>,
4008    is_optional: bool,
4009    bound_target_column: Option<String>,
4010    edge_lance_filter: Option<String>,
4011    /// Edge property conditions to check during BFS.
4012    edge_property_conditions: Vec<(String, UniValue)>,
4013    used_edge_columns: Vec<String>,
4014    path_mode: super::nfa::PathMode,
4015    output_mode: super::nfa::VlpOutputMode,
4016    schema: SchemaRef,
4017    state: VarLengthMainStreamState,
4018    metrics: BaselineMetrics,
4019}
4020
4021/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4022type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4023
4024impl GraphVariableLengthTraverseMainStream {
4025    /// Perform BFS from a source vertex using the adjacency map.
4026    ///
4027    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4028    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4029    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4030    fn bfs(
4031        &self,
4032        source: Vid,
4033        adjacency: &EdgeAdjacencyMap,
4034        used_eids: &FxHashSet<u64>,
4035    ) -> Vec<MainBfsResult> {
4036        let mut results = Vec::new();
4037        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4038
4039        queue.push_back((source, 0, vec![source], vec![]));
4040
4041        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4042            // Emit result if within hop range (including zero-length patterns)
4043            if depth >= self.min_hops && depth <= self.max_hops {
4044                results.push((current, depth, node_path.clone(), edge_path.clone()));
4045            }
4046
4047            // Stop if at max depth
4048            if depth >= self.max_hops {
4049                continue;
4050            }
4051
4052            // Get neighbors from adjacency map
4053            if let Some(neighbors) = adjacency.get(&current) {
4054                let is_undirected = matches!(self.direction, Direction::Both);
4055                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4056
4057                for (neighbor, eid, _edge_type, props) in neighbors {
4058                    // Deduplicate edges for undirected patterns
4059                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4060                        continue;
4061                    }
4062
4063                    // Enforce relationship uniqueness per-path (Cypher semantics).
4064                    if edge_path.contains(eid) {
4065                        continue;
4066                    }
4067
4068                    // Enforce cross-pattern relationship uniqueness: skip edges
4069                    // already bound by earlier pattern elements in the same MATCH.
4070                    if used_eids.contains(&eid.as_u64()) {
4071                        continue;
4072                    }
4073
4074                    // Check edge property conditions (e.g., {year: 1988}).
4075                    if !self.edge_property_conditions.is_empty() {
4076                        let passes =
4077                            self.edge_property_conditions
4078                                .iter()
4079                                .all(|(name, expected)| {
4080                                    props.get(name).is_some_and(|actual| actual == expected)
4081                                });
4082                        if !passes {
4083                            continue;
4084                        }
4085                    }
4086
4087                    let mut new_node_path = node_path.clone();
4088                    new_node_path.push(*neighbor);
4089                    let mut new_edge_path = edge_path.clone();
4090                    new_edge_path.push(*eid);
4091                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4092                }
4093            }
4094        }
4095
4096        results
4097    }
4098
4099    /// Process a batch using the adjacency map.
4100    fn process_batch(
4101        &self,
4102        batch: RecordBatch,
4103        adjacency: &EdgeAdjacencyMap,
4104    ) -> DFResult<RecordBatch> {
4105        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4106            datafusion::error::DataFusionError::Execution(format!(
4107                "Source column '{}' not found in input batch",
4108                self.source_column
4109            ))
4110        })?;
4111
4112        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4113        let source_vids: &UInt64Array = &source_vid_cow;
4114
4115        // Read bound target VIDs if column exists
4116        let bound_target_cow = self
4117            .bound_target_column
4118            .as_ref()
4119            .and_then(|col| batch.column_by_name(col))
4120            .map(|c| column_as_vid_array(c.as_ref()))
4121            .transpose()?;
4122        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4123
4124        // Extract used edge columns for cross-pattern relationship uniqueness
4125        let used_edge_arrays: Vec<&UInt64Array> = self
4126            .used_edge_columns
4127            .iter()
4128            .filter_map(|col| {
4129                batch
4130                    .column_by_name(col)?
4131                    .as_any()
4132                    .downcast_ref::<UInt64Array>()
4133            })
4134            .collect();
4135
4136        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4137        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4138
4139        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4140            let mut emitted_for_row = false;
4141
4142            if let Some(source_u64) = source_opt {
4143                let source = Vid::from(source_u64);
4144
4145                // Collect used edge IDs from previous hops for this row
4146                let used_eids: FxHashSet<u64> = used_edge_arrays
4147                    .iter()
4148                    .filter_map(|arr| {
4149                        if arr.is_null(row_idx) {
4150                            None
4151                        } else {
4152                            Some(arr.value(row_idx))
4153                        }
4154                    })
4155                    .collect();
4156
4157                let bfs_results = self.bfs(source, adjacency, &used_eids);
4158
4159                for (target, hops, node_path, edge_path) in bfs_results {
4160                    // Filter by bound target VID if set (for patterns where target is in scope).
4161                    // NULL bound targets do not match anything.
4162                    if let Some(targets) = expected_targets {
4163                        if targets.is_null(row_idx) {
4164                            continue;
4165                        }
4166                        let expected_vid = targets.value(row_idx);
4167                        if target.as_u64() != expected_vid {
4168                            continue;
4169                        }
4170                    }
4171
4172                    expansions.push((row_idx, target, hops, node_path, edge_path));
4173                    emitted_for_row = true;
4174                }
4175            }
4176
4177            if self.is_optional && !emitted_for_row {
4178                // Preserve source row with NULL optional bindings.
4179                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4180            }
4181        }
4182
4183        if expansions.is_empty() {
4184            if self.is_optional {
4185                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4186                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4187            }
4188            return Ok(RecordBatch::new_empty(self.schema.clone()));
4189        }
4190
4191        let num_rows = expansions.len();
4192        self.metrics.record_output(num_rows);
4193
4194        // Build output columns
4195        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4196
4197        // Expand input columns
4198        for col_idx in 0..batch.num_columns() {
4199            let array = batch.column(col_idx);
4200            let indices: Vec<u64> = expansions
4201                .iter()
4202                .map(|(idx, _, _, _, _)| *idx as u64)
4203                .collect();
4204            let take_indices = UInt64Array::from(indices);
4205            let expanded = arrow::compute::take(array, &take_indices, None)?;
4206            columns.push(expanded);
4207        }
4208
4209        // Add target VID column (only if not already in input)
4210        let target_vid_name = format!("{}._vid", self.target_variable);
4211        if batch.schema().column_with_name(&target_vid_name).is_none() {
4212            let target_vids: Vec<Option<u64>> = expansions
4213                .iter()
4214                .map(|(_, vid, _, node_path, edge_path)| {
4215                    if node_path.is_empty() && edge_path.is_empty() {
4216                        None
4217                    } else {
4218                        Some(vid.as_u64())
4219                    }
4220                })
4221                .collect();
4222            columns.push(Arc::new(UInt64Array::from(target_vids)));
4223        }
4224
4225        // Add target ._labels column (only if not already in input)
4226        let target_labels_name = format!("{}._labels", self.target_variable);
4227        if batch
4228            .schema()
4229            .column_with_name(&target_labels_name)
4230            .is_none()
4231        {
4232            use arrow_array::builder::{ListBuilder, StringBuilder};
4233            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4234            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4235                if node_path.is_empty() && edge_path.is_empty() {
4236                    labels_builder.append(false);
4237                    continue;
4238                }
4239                let mut row_labels: Vec<String> = Vec::new();
4240                let labels =
4241                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4242                for lbl in &labels {
4243                    if !row_labels.contains(lbl) {
4244                        row_labels.push(lbl.clone());
4245                    }
4246                }
4247                let values = labels_builder.values();
4248                for lbl in &row_labels {
4249                    values.append_value(lbl);
4250                }
4251                labels_builder.append(true);
4252            }
4253            columns.push(Arc::new(labels_builder.finish()));
4254        }
4255
4256        // Add hop count column
4257        let hop_counts: Vec<u64> = expansions
4258            .iter()
4259            .map(|(_, _, hops, _, _)| *hops as u64)
4260            .collect();
4261        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4262
4263        // Add step variable column if bound (list of edge structs).
4264        if self.step_variable.is_some() {
4265            let mut edges_builder = new_edge_list_builder();
4266            let query_ctx = self.graph_ctx.query_context();
4267            let type_names_str = self.type_names.join("|");
4268
4269            for (_, _, _, node_path, edge_path) in expansions.iter() {
4270                if node_path.is_empty() && edge_path.is_empty() {
4271                    edges_builder.append_null();
4272                } else if edge_path.is_empty() {
4273                    // Zero-hop match: empty list.
4274                    edges_builder.append(true);
4275                } else {
4276                    for (i, eid) in edge_path.iter().enumerate() {
4277                        append_edge_to_struct(
4278                            edges_builder.values(),
4279                            *eid,
4280                            &type_names_str,
4281                            node_path[i].as_u64(),
4282                            node_path[i + 1].as_u64(),
4283                            &query_ctx,
4284                        );
4285                    }
4286                    edges_builder.append(true);
4287                }
4288            }
4289
4290            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4291        }
4292
4293        // Add path variable column if bound.
4294        // If a path column already exists in input (from a prior BindFixedPath), extend it
4295        // rather than building from scratch.
4296        if let Some(path_var_name) = &self.path_variable {
4297            let existing_path_col_idx = batch
4298                .schema()
4299                .column_with_name(path_var_name)
4300                .map(|(idx, _)| idx);
4301            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4302            let existing_path = existing_path_arc
4303                .as_ref()
4304                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4305
4306            let mut nodes_builder = new_node_list_builder();
4307            let mut rels_builder = new_edge_list_builder();
4308            let query_ctx = self.graph_ctx.query_context();
4309            let type_names_str = self.type_names.join("|");
4310            let mut path_validity = Vec::with_capacity(expansions.len());
4311
4312            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4313                if node_path.is_empty() && edge_path.is_empty() {
4314                    nodes_builder.append(false);
4315                    rels_builder.append(false);
4316                    path_validity.push(false);
4317                    continue;
4318                }
4319
4320                // Prepend existing path prefix if extending
4321                let skip_first_vlp_node = if let Some(existing) = existing_path {
4322                    if !existing.is_null(row_out_idx) {
4323                        prepend_existing_path(
4324                            existing,
4325                            row_out_idx,
4326                            &mut nodes_builder,
4327                            &mut rels_builder,
4328                            &query_ctx,
4329                        );
4330                        true
4331                    } else {
4332                        false
4333                    }
4334                } else {
4335                    false
4336                };
4337
4338                // Append VLP nodes (skip first if extending — it's the junction point)
4339                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4340                for vid in &node_path[start_idx..] {
4341                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4342                }
4343                nodes_builder.append(true);
4344
4345                for (i, eid) in edge_path.iter().enumerate() {
4346                    append_edge_to_struct(
4347                        rels_builder.values(),
4348                        *eid,
4349                        &type_names_str,
4350                        node_path[i].as_u64(),
4351                        node_path[i + 1].as_u64(),
4352                        &query_ctx,
4353                    );
4354                }
4355                rels_builder.append(true);
4356                path_validity.push(true);
4357            }
4358
4359            // Finish the builders to get the arrays
4360            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4361            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4362
4363            // Build the path struct with nodes and relationships fields
4364            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4365            let rels_field = Arc::new(Field::new(
4366                "relationships",
4367                rels_array.data_type().clone(),
4368                true,
4369            ));
4370
4371            // Create the path struct array
4372            let path_struct = arrow_array::StructArray::try_new(
4373                vec![nodes_field, rels_field].into(),
4374                vec![nodes_array, rels_array],
4375                Some(arrow::buffer::NullBuffer::from(path_validity)),
4376            )
4377            .map_err(arrow_err)?;
4378
4379            if let Some(idx) = existing_path_col_idx {
4380                columns[idx] = Arc::new(path_struct);
4381            } else {
4382                columns.push(Arc::new(path_struct));
4383            }
4384        }
4385
4386        // Add target property columns as NULL for now (skip if already in input).
4387        // Property hydration happens via PropertyManager in the query execution pipeline.
4388        for prop_name in &self.target_properties {
4389            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4390            if batch.schema().column_with_name(&full_prop_name).is_none() {
4391                columns.push(arrow_array::new_null_array(
4392                    &DataType::LargeBinary,
4393                    num_rows,
4394                ));
4395            }
4396        }
4397
4398        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4399    }
4400}
4401
4402impl Stream for GraphVariableLengthTraverseMainStream {
4403    type Item = DFResult<RecordBatch>;
4404
4405    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4406        loop {
4407            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4408
4409            match state {
4410                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4411                    Poll::Ready(Ok(adjacency)) => {
4412                        self.state = VarLengthMainStreamState::Processing(adjacency);
4413                        // Continue loop to start processing
4414                    }
4415                    Poll::Ready(Err(e)) => {
4416                        self.state = VarLengthMainStreamState::Done;
4417                        return Poll::Ready(Some(Err(e)));
4418                    }
4419                    Poll::Pending => {
4420                        self.state = VarLengthMainStreamState::Loading(fut);
4421                        return Poll::Pending;
4422                    }
4423                },
4424                VarLengthMainStreamState::Processing(adjacency) => {
4425                    match self.input.poll_next_unpin(cx) {
4426                        Poll::Ready(Some(Ok(batch))) => {
4427                            let base_batch = match self.process_batch(batch, &adjacency) {
4428                                Ok(b) => b,
4429                                Err(e) => {
4430                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4431                                    return Poll::Ready(Some(Err(e)));
4432                                }
4433                            };
4434
4435                            // If no properties need async hydration, return directly
4436                            if self.target_properties.is_empty() {
4437                                self.state = VarLengthMainStreamState::Processing(adjacency);
4438                                return Poll::Ready(Some(Ok(base_batch)));
4439                            }
4440
4441                            // Create async hydration future
4442                            let schema = self.schema.clone();
4443                            let target_variable = self.target_variable.clone();
4444                            let target_properties = self.target_properties.clone();
4445                            let graph_ctx = self.graph_ctx.clone();
4446
4447                            let fut = hydrate_vlp_target_properties(
4448                                base_batch,
4449                                schema,
4450                                target_variable,
4451                                target_properties,
4452                                None, // schemaless — no label name
4453                                graph_ctx,
4454                            );
4455
4456                            self.state = VarLengthMainStreamState::Materializing {
4457                                adjacency,
4458                                fut: Box::pin(fut),
4459                            };
4460                            // Continue loop to poll the future
4461                        }
4462                        Poll::Ready(Some(Err(e))) => {
4463                            self.state = VarLengthMainStreamState::Done;
4464                            return Poll::Ready(Some(Err(e)));
4465                        }
4466                        Poll::Ready(None) => {
4467                            self.state = VarLengthMainStreamState::Done;
4468                            return Poll::Ready(None);
4469                        }
4470                        Poll::Pending => {
4471                            self.state = VarLengthMainStreamState::Processing(adjacency);
4472                            return Poll::Pending;
4473                        }
4474                    }
4475                }
4476                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4477                    match fut.as_mut().poll(cx) {
4478                        Poll::Ready(Ok(batch)) => {
4479                            self.state = VarLengthMainStreamState::Processing(adjacency);
4480                            return Poll::Ready(Some(Ok(batch)));
4481                        }
4482                        Poll::Ready(Err(e)) => {
4483                            self.state = VarLengthMainStreamState::Done;
4484                            return Poll::Ready(Some(Err(e)));
4485                        }
4486                        Poll::Pending => {
4487                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4488                            return Poll::Pending;
4489                        }
4490                    }
4491                }
4492                VarLengthMainStreamState::Done => {
4493                    return Poll::Ready(None);
4494                }
4495            }
4496        }
4497    }
4498}
4499
4500impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4501    fn schema(&self) -> SchemaRef {
4502        self.schema.clone()
4503    }
4504}
4505
4506#[cfg(test)]
4507mod tests {
4508    use super::*;
4509
4510    #[test]
4511    fn test_traverse_schema_without_edge() {
4512        let input_schema = Arc::new(Schema::new(vec![Field::new(
4513            "a._vid",
4514            DataType::UInt64,
4515            false,
4516        )]));
4517
4518        let output_schema =
4519            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4520
4521        // Schema: input + target VID + target _labels + internal edge ID
4522        assert_eq!(output_schema.fields().len(), 4);
4523        assert_eq!(output_schema.field(0).name(), "a._vid");
4524        assert_eq!(output_schema.field(1).name(), "m._vid");
4525        assert_eq!(output_schema.field(2).name(), "m._labels");
4526        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4527    }
4528
4529    #[test]
4530    fn test_traverse_schema_with_edge() {
4531        let input_schema = Arc::new(Schema::new(vec![Field::new(
4532            "a._vid",
4533            DataType::UInt64,
4534            false,
4535        )]));
4536
4537        let output_schema = GraphTraverseExec::build_schema(
4538            input_schema,
4539            "m",
4540            Some("r"),
4541            &[],
4542            &[],
4543            None,
4544            None,
4545            false,
4546        );
4547
4548        // Schema: input + target VID + target _labels + edge EID + edge _type
4549        assert_eq!(output_schema.fields().len(), 5);
4550        assert_eq!(output_schema.field(0).name(), "a._vid");
4551        assert_eq!(output_schema.field(1).name(), "m._vid");
4552        assert_eq!(output_schema.field(2).name(), "m._labels");
4553        assert_eq!(output_schema.field(3).name(), "r._eid");
4554        assert_eq!(output_schema.field(4).name(), "r._type");
4555    }
4556
4557    #[test]
4558    fn test_traverse_schema_with_target_properties() {
4559        let input_schema = Arc::new(Schema::new(vec![Field::new(
4560            "a._vid",
4561            DataType::UInt64,
4562            false,
4563        )]));
4564
4565        let target_props = vec!["name".to_string(), "age".to_string()];
4566        let output_schema = GraphTraverseExec::build_schema(
4567            input_schema,
4568            "m",
4569            Some("r"),
4570            &[],
4571            &target_props,
4572            None,
4573            None,
4574            false,
4575        );
4576
4577        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4578        assert_eq!(output_schema.fields().len(), 7);
4579        assert_eq!(output_schema.field(0).name(), "a._vid");
4580        assert_eq!(output_schema.field(1).name(), "m._vid");
4581        assert_eq!(output_schema.field(2).name(), "m._labels");
4582        assert_eq!(output_schema.field(3).name(), "m.name");
4583        assert_eq!(output_schema.field(4).name(), "m.age");
4584        assert_eq!(output_schema.field(5).name(), "r._eid");
4585        assert_eq!(output_schema.field(6).name(), "r._type");
4586    }
4587
4588    #[test]
4589    fn test_variable_length_schema() {
4590        let input_schema = Arc::new(Schema::new(vec![Field::new(
4591            "a._vid",
4592            DataType::UInt64,
4593            false,
4594        )]));
4595
4596        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4597            input_schema,
4598            "b",
4599            None,
4600            Some("p"),
4601            &[],
4602            None,
4603        );
4604
4605        assert_eq!(output_schema.fields().len(), 5);
4606        assert_eq!(output_schema.field(0).name(), "a._vid");
4607        assert_eq!(output_schema.field(1).name(), "b._vid");
4608        assert_eq!(output_schema.field(2).name(), "b._labels");
4609        assert_eq!(output_schema.field(3).name(), "_hop_count");
4610        assert_eq!(output_schema.field(4).name(), "p");
4611    }
4612
4613    #[test]
4614    fn test_traverse_main_schema_without_edge() {
4615        let input_schema = Arc::new(Schema::new(vec![Field::new(
4616            "a._vid",
4617            DataType::UInt64,
4618            false,
4619        )]));
4620
4621        let output_schema =
4622            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4623
4624        // a._vid, m._vid, m._labels, __eid_to_m
4625        assert_eq!(output_schema.fields().len(), 4);
4626        assert_eq!(output_schema.field(0).name(), "a._vid");
4627        assert_eq!(output_schema.field(1).name(), "m._vid");
4628        assert_eq!(output_schema.field(2).name(), "m._labels");
4629        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4630    }
4631
4632    #[test]
4633    fn test_traverse_main_schema_with_edge() {
4634        let input_schema = Arc::new(Schema::new(vec![Field::new(
4635            "a._vid",
4636            DataType::UInt64,
4637            false,
4638        )]));
4639
4640        let output_schema = GraphTraverseMainExec::build_schema(
4641            &input_schema,
4642            "m",
4643            &Some("r".to_string()),
4644            &[],
4645            &[],
4646            false,
4647        );
4648
4649        // a._vid, m._vid, m._labels, r._eid, r._type
4650        assert_eq!(output_schema.fields().len(), 5);
4651        assert_eq!(output_schema.field(0).name(), "a._vid");
4652        assert_eq!(output_schema.field(1).name(), "m._vid");
4653        assert_eq!(output_schema.field(2).name(), "m._labels");
4654        assert_eq!(output_schema.field(3).name(), "r._eid");
4655        assert_eq!(output_schema.field(4).name(), "r._type");
4656    }
4657
4658    #[test]
4659    fn test_traverse_main_schema_with_edge_properties() {
4660        let input_schema = Arc::new(Schema::new(vec![Field::new(
4661            "a._vid",
4662            DataType::UInt64,
4663            false,
4664        )]));
4665
4666        let edge_props = vec!["weight".to_string(), "since".to_string()];
4667        let output_schema = GraphTraverseMainExec::build_schema(
4668            &input_schema,
4669            "m",
4670            &Some("r".to_string()),
4671            &edge_props,
4672            &[],
4673            false,
4674        );
4675
4676        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4677        assert_eq!(output_schema.fields().len(), 7);
4678        assert_eq!(output_schema.field(0).name(), "a._vid");
4679        assert_eq!(output_schema.field(1).name(), "m._vid");
4680        assert_eq!(output_schema.field(2).name(), "m._labels");
4681        assert_eq!(output_schema.field(3).name(), "r._eid");
4682        assert_eq!(output_schema.field(4).name(), "r._type");
4683        assert_eq!(output_schema.field(5).name(), "r.weight");
4684        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4685        assert_eq!(output_schema.field(6).name(), "r.since");
4686        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4687    }
4688
4689    #[test]
4690    fn test_traverse_main_schema_with_target_properties() {
4691        let input_schema = Arc::new(Schema::new(vec![Field::new(
4692            "a._vid",
4693            DataType::UInt64,
4694            false,
4695        )]));
4696
4697        let target_props = vec!["name".to_string(), "age".to_string()];
4698        let output_schema = GraphTraverseMainExec::build_schema(
4699            &input_schema,
4700            "m",
4701            &Some("r".to_string()),
4702            &[],
4703            &target_props,
4704            false,
4705        );
4706
4707        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
4708        assert_eq!(output_schema.fields().len(), 7);
4709        assert_eq!(output_schema.field(0).name(), "a._vid");
4710        assert_eq!(output_schema.field(1).name(), "m._vid");
4711        assert_eq!(output_schema.field(2).name(), "m._labels");
4712        assert_eq!(output_schema.field(3).name(), "r._eid");
4713        assert_eq!(output_schema.field(4).name(), "r._type");
4714        assert_eq!(output_schema.field(5).name(), "m.name");
4715        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4716        assert_eq!(output_schema.field(6).name(), "m.age");
4717        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4718    }
4719}