Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, build_edge_list_field, build_path_struct_field,
36    column_as_vid_array, compute_plan_properties, labels_data_type, new_edge_list_builder,
37    new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62/// BFS result: (target_vid, hop_count, node_path, edge_path)
63type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
66type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68/// Prepend nodes and edges from an existing path struct column into builders.
69///
70/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
71/// Reads the nodes and relationships from the existing path at `row_idx` and appends
72/// them to the provided builders. The caller should then skip the first VLP node
73/// (which is the junction point already present in the existing path).
74fn prepend_existing_path(
75    existing_path: &arrow_array::StructArray,
76    row_idx: usize,
77    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79    query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81    // Read existing nodes
82    let nodes_list = existing_path
83        .column(0)
84        .as_any()
85        .downcast_ref::<arrow_array::ListArray>()
86        .unwrap();
87    let node_values = nodes_list.value(row_idx);
88    let node_struct = node_values
89        .as_any()
90        .downcast_ref::<arrow_array::StructArray>()
91        .unwrap();
92    let vid_col = node_struct
93        .column(0)
94        .as_any()
95        .downcast_ref::<UInt64Array>()
96        .unwrap();
97    for i in 0..vid_col.len() {
98        append_node_to_struct(
99            nodes_builder.values(),
100            Vid::from(vid_col.value(i)),
101            query_ctx,
102        );
103    }
104
105    // Read existing edges
106    let rels_list = existing_path
107        .column(1)
108        .as_any()
109        .downcast_ref::<arrow_array::ListArray>()
110        .unwrap();
111    let edge_values = rels_list.value(row_idx);
112    let edge_struct = edge_values
113        .as_any()
114        .downcast_ref::<arrow_array::StructArray>()
115        .unwrap();
116    let eid_col = edge_struct
117        .column(0)
118        .as_any()
119        .downcast_ref::<UInt64Array>()
120        .unwrap();
121    let type_col = edge_struct
122        .column(1)
123        .as_any()
124        .downcast_ref::<arrow_array::StringArray>()
125        .unwrap();
126    let src_col = edge_struct
127        .column(2)
128        .as_any()
129        .downcast_ref::<UInt64Array>()
130        .unwrap();
131    let dst_col = edge_struct
132        .column(3)
133        .as_any()
134        .downcast_ref::<UInt64Array>()
135        .unwrap();
136    for i in 0..eid_col.len() {
137        append_edge_to_struct(
138            rels_builder.values(),
139            Eid::from(eid_col.value(i)),
140            type_col.value(i),
141            src_col.value(i),
142            dst_col.value(i),
143            query_ctx,
144        );
145    }
146}
147
148/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
149/// schemaless properties. Unlike vertex properties, schemaless edge properties must
150/// preserve original JSON value types (int, float, etc.) since edge types commonly
151/// lack explicit property definitions.
152fn resolve_edge_property_type(
153    prop: &str,
154    schema_props: Option<
155        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156    >,
157) -> DataType {
158    if prop == "overflow_json" {
159        DataType::LargeBinary
160    } else {
161        schema_props
162            .and_then(|props| props.get(prop))
163            .map(|meta| meta.r#type.to_arrow())
164            .unwrap_or(DataType::LargeBinary)
165    }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
171type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173/// Single-hop graph traversal execution plan.
174///
175/// Expands each input row by traversing edges to find neighbors.
176/// For each (source, edge, target) triple, produces one output row
177/// containing the input columns plus target vertex and edge columns.
178///
179/// # Example
180///
181/// ```ignore
182/// // Input: batch with _vid column
183/// // Traverse KNOWS edges outgoing
184/// let traverse = GraphTraverseExec::new(
185///     input_plan,
186///     "_vid",
187///     vec![knows_type_id],
188///     Direction::Outgoing,
189///     "m",           // target variable
190///     Some("r"),     // edge variable
191///     None,          // no target label filter
192///     graph_ctx,
193/// );
194///
195/// // Output: input columns + m._vid + r._eid
196/// ```
197pub struct GraphTraverseExec {
198    /// Input execution plan.
199    input: Arc<dyn ExecutionPlan>,
200
201    /// Column name containing source VIDs.
202    source_column: String,
203
204    /// Edge type IDs to traverse.
205    edge_type_ids: Vec<u32>,
206
207    /// Traversal direction.
208    direction: Direction,
209
210    /// Variable name for target vertex columns.
211    target_variable: String,
212
213    /// Variable name for edge columns (if edge is bound).
214    edge_variable: Option<String>,
215
216    /// Edge properties to materialize (for pushdown hydration).
217    edge_properties: Vec<String>,
218
219    /// Target vertex properties to materialize.
220    target_properties: Vec<String>,
221
222    /// Target label name for property type resolution.
223    target_label_name: Option<String>,
224
225    /// Optional target label filter.
226    target_label_id: Option<u16>,
227
228    /// Graph execution context.
229    graph_ctx: Arc<GraphExecutionContext>,
230
231    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
232    optional: bool,
233
234    /// Variables introduced by the OPTIONAL MATCH pattern.
235    /// Used to determine which columns should be null-extended on failure.
236    optional_pattern_vars: HashSet<String>,
237
238    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
239    /// When set, only traversals that reach this VID are included.
240    bound_target_column: Option<String>,
241
242    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
243    /// Edges matching any of these IDs are excluded from traversal results.
244    used_edge_columns: Vec<String>,
245
246    /// Output schema.
247    schema: SchemaRef,
248
249    /// Cached plan properties.
250    properties: PlanProperties,
251
252    /// Execution metrics.
253    metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("GraphTraverseExec")
259            .field("source_column", &self.source_column)
260            .field("edge_type_ids", &self.edge_type_ids)
261            .field("direction", &self.direction)
262            .field("target_variable", &self.target_variable)
263            .field("edge_variable", &self.edge_variable)
264            .finish()
265    }
266}
267
268impl GraphTraverseExec {
269    /// Create a new single-hop traversal plan.
270    ///
271    /// # Arguments
272    ///
273    /// * `input` - Input plan providing source vertices
274    /// * `source_column` - Column name containing source VIDs
275    /// * `edge_type_ids` - Edge types to traverse
276    /// * `direction` - Traversal direction
277    /// * `target_variable` - Variable name for target vertices
278    /// * `edge_variable` - Optional variable name for edges
279    /// * `edge_properties` - Edge properties to materialize
280    /// * `target_label_id` - Optional target label filter
281    /// * `graph_ctx` - Graph execution context
282    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
283    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
284    #[expect(clippy::too_many_arguments)]
285    pub fn new(
286        input: Arc<dyn ExecutionPlan>,
287        source_column: impl Into<String>,
288        edge_type_ids: Vec<u32>,
289        direction: Direction,
290        target_variable: impl Into<String>,
291        edge_variable: Option<String>,
292        edge_properties: Vec<String>,
293        target_properties: Vec<String>,
294        target_label_name: Option<String>,
295        target_label_id: Option<u16>,
296        graph_ctx: Arc<GraphExecutionContext>,
297        optional: bool,
298        optional_pattern_vars: HashSet<String>,
299        bound_target_column: Option<String>,
300        used_edge_columns: Vec<String>,
301    ) -> Self {
302        let source_column = source_column.into();
303        let target_variable = target_variable.into();
304
305        // Resolve target property Arrow types from the schema
306        let uni_schema = graph_ctx.storage().schema_manager().schema();
307        let label_props = target_label_name
308            .as_deref()
309            .and_then(|ln| uni_schema.properties.get(ln));
310        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311        let edge_props = if merged_edge_props.is_empty() {
312            None
313        } else {
314            Some(&merged_edge_props)
315        };
316
317        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
318        let schema = Self::build_schema(
319            input.schema(),
320            &target_variable,
321            edge_variable.as_deref(),
322            &edge_properties,
323            &target_properties,
324            label_props,
325            edge_props,
326            optional,
327        );
328
329        let properties = compute_plan_properties(schema.clone());
330
331        Self {
332            input,
333            source_column,
334            edge_type_ids,
335            direction,
336            target_variable,
337            edge_variable,
338            edge_properties,
339            target_properties,
340            target_label_name,
341            target_label_id,
342            graph_ctx,
343            optional,
344            optional_pattern_vars,
345            bound_target_column,
346            used_edge_columns,
347            schema,
348            properties,
349            metrics: ExecutionPlanMetricsSet::new(),
350        }
351    }
352
353    /// Build output schema.
354    #[expect(
355        clippy::too_many_arguments,
356        reason = "Schema construction needs all field metadata"
357    )]
358    fn build_schema(
359        input_schema: SchemaRef,
360        target_variable: &str,
361        edge_variable: Option<&str>,
362        edge_properties: &[String],
363        target_properties: &[String],
364        label_props: Option<
365            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366        >,
367        edge_props: Option<
368            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369        >,
370        optional: bool,
371    ) -> SchemaRef {
372        let mut fields: Vec<Field> = input_schema
373            .fields()
374            .iter()
375            .map(|f| f.as_ref().clone())
376            .collect();
377
378        // Add target VID column (nullable when optional — unmatched rows get NULL)
379        let target_vid_name = format!("{}._vid", target_variable);
380        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
383        fields.push(Field::new(
384            format!("{}._labels", target_variable),
385            labels_data_type(),
386            true,
387        ));
388
389        // Add target vertex property columns
390        for prop_name in target_properties {
391            let col_name = format!("{}.{}", target_variable, prop_name);
392            let arrow_type = resolve_property_type(prop_name, label_props);
393            fields.push(Field::new(&col_name, arrow_type, true));
394        }
395
396        // Add edge ID column if edge variable is bound
397        if let Some(edge_var) = edge_variable {
398            let edge_id_name = format!("{}._eid", edge_var);
399            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401            // Add edge _type column for type(r) support
402            fields.push(Field::new(
403                format!("{}._type", edge_var),
404                DataType::Utf8,
405                true,
406            ));
407
408            // Add edge property columns with types resolved from schema
409            for prop_name in edge_properties {
410                let prop_col_name = format!("{}.{}", edge_var, prop_name);
411                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412                fields.push(Field::new(&prop_col_name, arrow_type, true));
413            }
414        } else {
415            // Add internal edge ID column for relationship uniqueness tracking
416            // even when edge variable is not explicitly bound.
417            let internal_eid_name = format!("__eid_to_{}", target_variable);
418            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419        }
420
421        Arc::new(Schema::new(fields))
422    }
423}
424
425impl DisplayAs for GraphTraverseExec {
426    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427        write!(
428            f,
429            "GraphTraverseExec: {} --[{:?}]--> {}",
430            self.source_column, self.edge_type_ids, self.target_variable
431        )?;
432        if let Some(ref edge_var) = self.edge_variable {
433            write!(f, " as {}", edge_var)?;
434        }
435        Ok(())
436    }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440    fn name(&self) -> &str {
441        "GraphTraverseExec"
442    }
443
444    fn as_any(&self) -> &dyn Any {
445        self
446    }
447
448    fn schema(&self) -> SchemaRef {
449        self.schema.clone()
450    }
451
452    fn properties(&self) -> &PlanProperties {
453        &self.properties
454    }
455
456    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457        vec![&self.input]
458    }
459
460    fn with_new_children(
461        self: Arc<Self>,
462        children: Vec<Arc<dyn ExecutionPlan>>,
463    ) -> DFResult<Arc<dyn ExecutionPlan>> {
464        if children.len() != 1 {
465            return Err(datafusion::error::DataFusionError::Plan(
466                "GraphTraverseExec requires exactly one child".to_string(),
467            ));
468        }
469
470        Ok(Arc::new(Self::new(
471            children[0].clone(),
472            self.source_column.clone(),
473            self.edge_type_ids.clone(),
474            self.direction,
475            self.target_variable.clone(),
476            self.edge_variable.clone(),
477            self.edge_properties.clone(),
478            self.target_properties.clone(),
479            self.target_label_name.clone(),
480            self.target_label_id,
481            self.graph_ctx.clone(),
482            self.optional,
483            self.optional_pattern_vars.clone(),
484            self.bound_target_column.clone(),
485            self.used_edge_columns.clone(),
486        )))
487    }
488
489    fn execute(
490        &self,
491        partition: usize,
492        context: Arc<TaskContext>,
493    ) -> DFResult<SendableRecordBatchStream> {
494        let input_stream = self.input.execute(partition, context)?;
495
496        let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498        let warm_fut = self
499            .graph_ctx
500            .warming_future(self.edge_type_ids.clone(), self.direction);
501
502        Ok(Box::pin(GraphTraverseStream {
503            input: input_stream,
504            source_column: self.source_column.clone(),
505            edge_type_ids: self.edge_type_ids.clone(),
506            direction: self.direction,
507            target_variable: self.target_variable.clone(),
508            edge_variable: self.edge_variable.clone(),
509            edge_properties: self.edge_properties.clone(),
510            target_properties: self.target_properties.clone(),
511            target_label_name: self.target_label_name.clone(),
512            graph_ctx: self.graph_ctx.clone(),
513            optional: self.optional,
514            optional_pattern_vars: self.optional_pattern_vars.clone(),
515            bound_target_column: self.bound_target_column.clone(),
516            used_edge_columns: self.used_edge_columns.clone(),
517            schema: self.schema.clone(),
518            state: TraverseStreamState::Warming(warm_fut),
519            metrics,
520        }))
521    }
522
523    fn metrics(&self) -> Option<MetricsSet> {
524        Some(self.metrics.clone_inner())
525    }
526}
527
528/// State machine for traverse stream execution.
529enum TraverseStreamState {
530    /// Warming adjacency CSRs before first batch.
531    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532    /// Polling the input stream for batches.
533    Reading,
534    /// Materializing target vertex properties asynchronously.
535    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536    /// Stream is done.
537    Done,
538}
539
540/// Stream that performs single-hop traversal with async property materialization.
541struct GraphTraverseStream {
542    /// Input stream.
543    input: SendableRecordBatchStream,
544
545    /// Column name containing source VIDs.
546    source_column: String,
547
548    /// Edge type IDs to traverse.
549    edge_type_ids: Vec<u32>,
550
551    /// Traversal direction.
552    direction: Direction,
553
554    /// Variable name for target vertex (retained for diagnostics).
555    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556    target_variable: String,
557
558    /// Variable name for edge (if bound).
559    edge_variable: Option<String>,
560
561    /// Edge properties to materialize.
562    edge_properties: Vec<String>,
563
564    /// Target vertex properties to materialize.
565    target_properties: Vec<String>,
566
567    /// Target label name for property resolution and filtering.
568    target_label_name: Option<String>,
569
570    /// Graph execution context.
571    graph_ctx: Arc<GraphExecutionContext>,
572
573    /// Whether this is an OPTIONAL MATCH.
574    optional: bool,
575
576    /// Variables introduced by the OPTIONAL MATCH pattern.
577    optional_pattern_vars: HashSet<String>,
578
579    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
580    bound_target_column: Option<String>,
581
582    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
583    used_edge_columns: Vec<String>,
584
585    /// Output schema.
586    schema: SchemaRef,
587
588    /// Stream state.
589    state: TraverseStreamState,
590
591    /// Metrics.
592    metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596    /// Expand neighbors synchronously and return expansions.
597    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
598    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600            datafusion::error::DataFusionError::Execution(format!(
601                "Source column '{}' not found",
602                self.source_column
603            ))
604        })?;
605
606        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607        let source_vids: &UInt64Array = &source_vid_cow;
608
609        // If bound_target_column is set, get the expected target VIDs for each row.
610        // This is used for cycle patterns like n-->k<--n where the target must match.
611        let bound_target_cow = self
612            .bound_target_column
613            .as_ref()
614            .and_then(|col| batch.column_by_name(col))
615            .map(|c| column_as_vid_array(c.as_ref()))
616            .transpose()?;
617        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
620        let used_edge_arrays: Vec<&UInt64Array> = self
621            .used_edge_columns
622            .iter()
623            .filter_map(|col| {
624                batch
625                    .column_by_name(col)
626                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627            })
628            .collect();
629
630        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631        let is_undirected = matches!(self.direction, Direction::Both);
632
633        for (row_idx, source_vid) in source_vids.iter().enumerate() {
634            let Some(src) = source_vid else {
635                continue;
636            };
637
638            // Get expected target VID if this is a bound target pattern.
639            // Distinguish between:
640            // - no bound target column (no filtering),
641            // - bound target present but NULL for this row (must produce no expansion),
642            // - bound target present with VID.
643            let expected_target = bound_target_vids.map(|arr| {
644                if arr.is_null(row_idx) {
645                    None
646                } else {
647                    Some(arr.value(row_idx))
648                }
649            });
650
651            // Collect used edge IDs for this row from all previous hops
652            let used_eids: HashSet<u64> = used_edge_arrays
653                .iter()
654                .filter_map(|arr| {
655                    if arr.is_null(row_idx) {
656                        None
657                    } else {
658                        Some(arr.value(row_idx))
659                    }
660                })
661                .collect();
662
663            let vid = Vid::from(src);
664            // For Direction::Both, deduplicate edges by eid within each source.
665            // This prevents the same edge being counted twice (once outgoing, once incoming).
666            let mut seen_edges: HashSet<u64> = HashSet::new();
667
668            for &edge_type in &self.edge_type_ids {
669                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671                for (target_vid, eid) in neighbors {
672                    let eid_u64 = eid.as_u64();
673
674                    // Skip edges already used in previous hops (relationship uniqueness)
675                    if used_eids.contains(&eid_u64) {
676                        continue;
677                    }
678
679                    // Deduplicate edges for undirected patterns
680                    if is_undirected && !seen_edges.insert(eid_u64) {
681                        continue;
682                    }
683
684                    // Filter by bound target VID if set (for cycle patterns).
685                    // NULL bound targets do not match anything.
686                    if let Some(expected_opt) = expected_target {
687                        let Some(expected) = expected_opt else {
688                            continue;
689                        };
690                        if target_vid.as_u64() != expected {
691                            continue;
692                        }
693                    }
694
695                    // Filter by target label using L0 visibility.
696                    // VIDs no longer embed label information, so we must look up labels.
697                    if let Some(ref label_name) = self.target_label_name {
698                        let query_ctx = self.graph_ctx.query_context();
699                        if let Some(vertex_labels) =
700                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701                        {
702                            // Vertex is in L0 — require actual label match
703                            if !vertex_labels.contains(label_name) {
704                                continue;
705                            }
706                        }
707                        // else: vertex not in L0 → trust storage-level filtering
708                    }
709
710                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711                }
712            }
713        }
714
715        Ok(expanded_rows)
716    }
717}
718
719/// Build target vertex labels column from L0 buffers.
720fn build_target_labels_column(
721    target_vids: &[Vid],
722    target_label_name: &Option<String>,
723    graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725    use arrow_array::builder::{ListBuilder, StringBuilder};
726    let mut labels_builder = ListBuilder::new(StringBuilder::new());
727    let query_ctx = graph_ctx.query_context();
728    for vid in target_vids {
729        let row_labels: Vec<String> =
730            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731                Some(labels) => labels,
732                None => {
733                    // Vertex not in L0 — trust schema label (storage already filtered)
734                    if let Some(label_name) = target_label_name {
735                        vec![label_name.clone()]
736                    } else {
737                        vec![]
738                    }
739                }
740            };
741        let values = labels_builder.values();
742        for lbl in &row_labels {
743            values.append_value(lbl);
744        }
745        labels_builder.append(true);
746    }
747    Arc::new(labels_builder.finish())
748}
749
750/// Build target vertex property columns from storage and L0.
751async fn build_target_property_columns(
752    target_vids: &[Vid],
753    target_properties: &[String],
754    target_label_name: &Option<String>,
755    graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757    let mut columns = Vec::new();
758
759    if let Some(label_name) = target_label_name {
760        let property_manager = graph_ctx.property_manager();
761        let query_ctx = graph_ctx.query_context();
762
763        let props_map = property_manager
764            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765            .await
766            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768        let uni_schema = graph_ctx.storage().schema_manager().schema();
769        let label_props = uni_schema.properties.get(label_name.as_str());
770
771        for prop_name in target_properties {
772            let data_type = resolve_property_type(prop_name, label_props);
773            let column =
774                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775            columns.push(column);
776        }
777    } else {
778        // No label name — use label-agnostic property lookup.
779        let non_internal_props: Vec<&str> = target_properties
780            .iter()
781            .filter(|p| *p != "_all_props")
782            .map(|s| s.as_str())
783            .collect();
784        let property_manager = graph_ctx.property_manager();
785        let query_ctx = graph_ctx.query_context();
786
787        let props_map = if !non_internal_props.is_empty() {
788            property_manager
789                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790                .await
791                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792        } else {
793            std::collections::HashMap::new()
794        };
795
796        for prop_name in target_properties {
797            if prop_name == "_all_props" {
798                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799            } else {
800                let column = build_property_column_static(
801                    target_vids,
802                    &props_map,
803                    prop_name,
804                    &arrow::datatypes::DataType::LargeBinary,
805                )?;
806                columns.push(column);
807            }
808        }
809    }
810
811    Ok(columns)
812}
813
814/// Build a CypherValue blob column from all vertex properties (L0 + storage).
815fn build_all_props_column(
816    target_vids: &[Vid],
817    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818    graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820    use crate::query::df_graph::scan::encode_cypher_value;
821    use arrow_array::builder::LargeBinaryBuilder;
822
823    let mut builder = LargeBinaryBuilder::new();
824    let l0_ctx = graph_ctx.l0_context();
825    for vid in target_vids {
826        let mut merged_props = serde_json::Map::new();
827        if let Some(vid_props) = props_map.get(vid) {
828            for (k, v) in vid_props.iter() {
829                let json_val: serde_json::Value = v.clone().into();
830                merged_props.insert(k.to_string(), json_val);
831            }
832        }
833        for l0 in l0_ctx.iter_l0_buffers() {
834            let guard = l0.read();
835            if let Some(l0_props) = guard.vertex_properties.get(vid) {
836                for (k, v) in l0_props.iter() {
837                    let json_val: serde_json::Value = v.clone().into();
838                    merged_props.insert(k.to_string(), json_val);
839                }
840            }
841        }
842        if merged_props.is_empty() {
843            builder.append_null();
844        } else {
845            let json = serde_json::Value::Object(merged_props);
846            match encode_cypher_value(&json) {
847                Ok(bytes) => builder.append_value(bytes),
848                Err(_) => builder.append_null(),
849            }
850        }
851    }
852    Arc::new(builder.finish())
853}
854
855/// Build edge ID, type, and property columns for bound edge variables.
856async fn build_edge_columns(
857    expansions: &[(usize, Vid, u64, u32)],
858    edge_properties: &[String],
859    edge_type_ids: &[u32],
860    graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862    let mut columns = Vec::new();
863
864    let eids: Vec<Eid> = expansions
865        .iter()
866        .map(|(_, _, eid, _)| Eid::from(*eid))
867        .collect();
868    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871    // Edge _type column
872    {
873        let uni_schema = graph_ctx.storage().schema_manager().schema();
874        let mut type_builder = arrow_array::builder::StringBuilder::new();
875        for (_, _, _, edge_type_id) in expansions {
876            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877                type_builder.append_value(&name);
878            } else {
879                type_builder.append_null();
880            }
881        }
882        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883    }
884
885    if !edge_properties.is_empty() {
886        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887        let property_manager = graph_ctx.property_manager();
888        let query_ctx = graph_ctx.query_context();
889
890        let props_map = property_manager
891            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892            .await
893            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895        let uni_schema = graph_ctx.storage().schema_manager().schema();
896        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897        let edge_type_props = if merged_edge_props.is_empty() {
898            None
899        } else {
900            Some(&merged_edge_props)
901        };
902
903        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905        for prop_name in edge_properties {
906            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907            let column =
908                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909            columns.push(column);
910        }
911    }
912
913    Ok(columns)
914}
915
916/// Build the output batch with target vertex properties.
917///
918/// This is a standalone async function so it can be boxed into a `Send` future
919/// without borrowing from `GraphTraverseStream`.
920#[expect(
921    clippy::too_many_arguments,
922    reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925    input: RecordBatch,
926    expansions: Vec<(usize, Vid, u64, u32)>,
927    schema: SchemaRef,
928    edge_variable: Option<String>,
929    edge_properties: Vec<String>,
930    edge_type_ids: Vec<u32>,
931    target_properties: Vec<String>,
932    target_label_name: Option<String>,
933    graph_ctx: Arc<GraphExecutionContext>,
934    optional: bool,
935    optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937    if expansions.is_empty() {
938        if !optional {
939            return Ok(RecordBatch::new_empty(schema));
940        }
941        let unmatched_reps = collect_unmatched_optional_group_rows(
942            &input,
943            &HashSet::new(),
944            &schema,
945            &optional_pattern_vars,
946        )?;
947        if unmatched_reps.is_empty() {
948            return Ok(RecordBatch::new_empty(schema));
949        }
950        return build_optional_null_batch_for_rows_with_optional_vars(
951            &input,
952            &unmatched_reps,
953            &schema,
954            &optional_pattern_vars,
955        );
956    }
957
958    // Expand input columns via index array
959    let indices: Vec<u64> = expansions
960        .iter()
961        .map(|(idx, _, _, _)| *idx as u64)
962        .collect();
963    let indices_array = UInt64Array::from(indices);
964    let mut columns: Vec<ArrayRef> = input
965        .columns()
966        .iter()
967        .map(|col| take(col.as_ref(), &indices_array, None))
968        .collect::<Result<_, _>>()?;
969
970    // Target VID column
971    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975    // Target labels column
976    columns.push(build_target_labels_column(
977        &target_vids,
978        &target_label_name,
979        &graph_ctx,
980    ));
981
982    // Target vertex property columns
983    if !target_properties.is_empty() {
984        let prop_cols = build_target_property_columns(
985            &target_vids,
986            &target_properties,
987            &target_label_name,
988            &graph_ctx,
989        )
990        .await?;
991        columns.extend(prop_cols);
992    }
993
994    // Edge columns (bound or internal tracking)
995    if edge_variable.is_some() {
996        let edge_cols =
997            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998        columns.extend(edge_cols);
999    } else {
1000        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002    }
1003
1004    let expanded_batch = RecordBatch::try_new(schema.clone(), columns)
1005        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1006
1007    // Append null rows for unmatched optional sources
1008    if optional {
1009        let matched_indices: HashSet<usize> =
1010            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1011        let unmatched = collect_unmatched_optional_group_rows(
1012            &input,
1013            &matched_indices,
1014            &schema,
1015            &optional_pattern_vars,
1016        )?;
1017
1018        if !unmatched.is_empty() {
1019            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1020                &input,
1021                &unmatched,
1022                &schema,
1023                &optional_pattern_vars,
1024            )?;
1025            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1026                .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1027            return Ok(combined);
1028        }
1029    }
1030
1031    Ok(expanded_batch)
1032}
1033
1034/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1035/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1036fn build_optional_null_batch_for_rows(
1037    input: &RecordBatch,
1038    unmatched_indices: &[usize],
1039    schema: &SchemaRef,
1040) -> DFResult<RecordBatch> {
1041    let num_rows = unmatched_indices.len();
1042    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1043    let indices_array = UInt64Array::from(indices);
1044
1045    // Take the unmatched input rows
1046    let mut columns: Vec<ArrayRef> = Vec::new();
1047    for col in input.columns() {
1048        let taken = take(col.as_ref(), &indices_array, None)?;
1049        columns.push(taken);
1050    }
1051    // Fill remaining columns with nulls
1052    for field in schema.fields().iter().skip(input.num_columns()) {
1053        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1054    }
1055    RecordBatch::try_new(schema.clone(), columns)
1056        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1057}
1058
1059fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1060    optional_vars.contains(col_name)
1061        || optional_vars.iter().any(|var| {
1062            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1063                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1064        })
1065}
1066
1067fn collect_unmatched_optional_group_rows(
1068    input: &RecordBatch,
1069    matched_indices: &HashSet<usize>,
1070    schema: &SchemaRef,
1071    optional_vars: &HashSet<String>,
1072) -> DFResult<Vec<usize>> {
1073    if input.num_rows() == 0 {
1074        return Ok(Vec::new());
1075    }
1076
1077    if optional_vars.is_empty() {
1078        return Ok((0..input.num_rows())
1079            .filter(|idx| !matched_indices.contains(idx))
1080            .collect());
1081    }
1082
1083    let source_vid_indices: Vec<usize> = schema
1084        .fields()
1085        .iter()
1086        .enumerate()
1087        .filter_map(|(idx, field)| {
1088            if idx >= input.num_columns() {
1089                return None;
1090            }
1091            let name = field.name();
1092            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1093                Some(idx)
1094            } else {
1095                None
1096            }
1097        })
1098        .collect();
1099
1100    // Group rows by non-optional VID bindings and preserve group order.
1101    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1102    let mut group_order: Vec<Vec<u8>> = Vec::new();
1103
1104    for row_idx in 0..input.num_rows() {
1105        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1106        let entry = groups.entry(key.clone());
1107        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1108            group_order.push(key.clone());
1109        }
1110        let matched = matched_indices.contains(&row_idx);
1111        entry
1112            .and_modify(|(_, any_matched)| *any_matched |= matched)
1113            .or_insert((row_idx, matched));
1114    }
1115
1116    Ok(group_order
1117        .into_iter()
1118        .filter_map(|key| {
1119            groups
1120                .get(&key)
1121                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1122        })
1123        .collect())
1124}
1125
1126fn compute_optional_group_key(
1127    batch: &RecordBatch,
1128    row_idx: usize,
1129    source_vid_indices: &[usize],
1130) -> DFResult<Vec<u8>> {
1131    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1132    for &col_idx in source_vid_indices {
1133        let col = batch.column(col_idx);
1134        let vid_cow = column_as_vid_array(col.as_ref())?;
1135        let arr: &UInt64Array = &vid_cow;
1136        if arr.is_null(row_idx) {
1137            key.extend_from_slice(&u64::MAX.to_le_bytes());
1138        } else {
1139            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1140        }
1141    }
1142    Ok(key)
1143}
1144
1145fn build_optional_null_batch_for_rows_with_optional_vars(
1146    input: &RecordBatch,
1147    unmatched_indices: &[usize],
1148    schema: &SchemaRef,
1149    optional_vars: &HashSet<String>,
1150) -> DFResult<RecordBatch> {
1151    if optional_vars.is_empty() {
1152        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1153    }
1154
1155    let num_rows = unmatched_indices.len();
1156    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1157    let indices_array = UInt64Array::from(indices);
1158
1159    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1160    for (col_idx, field) in schema.fields().iter().enumerate() {
1161        if col_idx < input.num_columns() {
1162            if is_optional_column_for_vars(field.name(), optional_vars) {
1163                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1164            } else {
1165                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1166                columns.push(taken);
1167            }
1168        } else {
1169            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1170        }
1171    }
1172
1173    RecordBatch::try_new(schema.clone(), columns)
1174        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1175}
1176
1177impl Stream for GraphTraverseStream {
1178    type Item = DFResult<RecordBatch>;
1179
1180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1181        loop {
1182            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1183
1184            match state {
1185                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1186                    Poll::Ready(Ok(())) => {
1187                        self.state = TraverseStreamState::Reading;
1188                        // Continue loop to start reading
1189                    }
1190                    Poll::Ready(Err(e)) => {
1191                        self.state = TraverseStreamState::Done;
1192                        return Poll::Ready(Some(Err(e)));
1193                    }
1194                    Poll::Pending => {
1195                        self.state = TraverseStreamState::Warming(fut);
1196                        return Poll::Pending;
1197                    }
1198                },
1199                TraverseStreamState::Reading => {
1200                    // Check timeout
1201                    if let Err(e) = self.graph_ctx.check_timeout() {
1202                        return Poll::Ready(Some(Err(
1203                            datafusion::error::DataFusionError::Execution(e.to_string()),
1204                        )));
1205                    }
1206
1207                    match self.input.poll_next_unpin(cx) {
1208                        Poll::Ready(Some(Ok(batch))) => {
1209                            // Expand neighbors synchronously
1210                            let expansions = match self.expand_neighbors(&batch) {
1211                                Ok(exp) => exp,
1212                                Err(e) => {
1213                                    self.state = TraverseStreamState::Reading;
1214                                    return Poll::Ready(Some(Err(e)));
1215                                }
1216                            };
1217
1218                            // Build output synchronously only when no properties need async hydration
1219                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1220                            {
1221                                let result = build_traverse_output_batch_sync(
1222                                    &batch,
1223                                    &expansions,
1224                                    &self.schema,
1225                                    self.edge_variable.as_ref(),
1226                                    &self.graph_ctx,
1227                                    self.optional,
1228                                    &self.optional_pattern_vars,
1229                                );
1230                                self.state = TraverseStreamState::Reading;
1231                                if let Ok(ref r) = result {
1232                                    self.metrics.record_output(r.num_rows());
1233                                }
1234                                return Poll::Ready(Some(result));
1235                            }
1236
1237                            // Properties needed — create async future for hydration
1238                            let schema = self.schema.clone();
1239                            let edge_variable = self.edge_variable.clone();
1240                            let edge_properties = self.edge_properties.clone();
1241                            let edge_type_ids = self.edge_type_ids.clone();
1242                            let target_properties = self.target_properties.clone();
1243                            let target_label_name = self.target_label_name.clone();
1244                            let graph_ctx = self.graph_ctx.clone();
1245
1246                            let optional = self.optional;
1247                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1248
1249                            let fut = build_traverse_output_batch(
1250                                batch,
1251                                expansions,
1252                                schema,
1253                                edge_variable,
1254                                edge_properties,
1255                                edge_type_ids,
1256                                target_properties,
1257                                target_label_name,
1258                                graph_ctx,
1259                                optional,
1260                                optional_pattern_vars,
1261                            );
1262
1263                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1264                            // Continue loop to poll the future
1265                        }
1266                        Poll::Ready(Some(Err(e))) => {
1267                            self.state = TraverseStreamState::Done;
1268                            return Poll::Ready(Some(Err(e)));
1269                        }
1270                        Poll::Ready(None) => {
1271                            self.state = TraverseStreamState::Done;
1272                            return Poll::Ready(None);
1273                        }
1274                        Poll::Pending => {
1275                            self.state = TraverseStreamState::Reading;
1276                            return Poll::Pending;
1277                        }
1278                    }
1279                }
1280                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1281                    Poll::Ready(Ok(batch)) => {
1282                        self.state = TraverseStreamState::Reading;
1283                        self.metrics.record_output(batch.num_rows());
1284                        return Poll::Ready(Some(Ok(batch)));
1285                    }
1286                    Poll::Ready(Err(e)) => {
1287                        self.state = TraverseStreamState::Done;
1288                        return Poll::Ready(Some(Err(e)));
1289                    }
1290                    Poll::Pending => {
1291                        self.state = TraverseStreamState::Materializing(fut);
1292                        return Poll::Pending;
1293                    }
1294                },
1295                TraverseStreamState::Done => {
1296                    return Poll::Ready(None);
1297                }
1298            }
1299        }
1300    }
1301}
1302
1303/// Build output batch synchronously when no properties need async hydration.
1304///
1305/// Only called when both `target_properties` and `edge_properties` are empty,
1306/// so no property columns need to be materialized.
1307fn build_traverse_output_batch_sync(
1308    input: &RecordBatch,
1309    expansions: &[(usize, Vid, u64, u32)],
1310    schema: &SchemaRef,
1311    edge_variable: Option<&String>,
1312    graph_ctx: &GraphExecutionContext,
1313    optional: bool,
1314    optional_pattern_vars: &HashSet<String>,
1315) -> DFResult<RecordBatch> {
1316    if expansions.is_empty() {
1317        if !optional {
1318            return Ok(RecordBatch::new_empty(schema.clone()));
1319        }
1320        let unmatched_reps = collect_unmatched_optional_group_rows(
1321            input,
1322            &HashSet::new(),
1323            schema,
1324            optional_pattern_vars,
1325        )?;
1326        if unmatched_reps.is_empty() {
1327            return Ok(RecordBatch::new_empty(schema.clone()));
1328        }
1329        return build_optional_null_batch_for_rows_with_optional_vars(
1330            input,
1331            &unmatched_reps,
1332            schema,
1333            optional_pattern_vars,
1334        );
1335    }
1336
1337    let indices: Vec<u64> = expansions
1338        .iter()
1339        .map(|(idx, _, _, _)| *idx as u64)
1340        .collect();
1341    let indices_array = UInt64Array::from(indices);
1342
1343    let mut columns: Vec<ArrayRef> = Vec::new();
1344    for col in input.columns() {
1345        let expanded = take(col.as_ref(), &indices_array, None)?;
1346        columns.push(expanded);
1347    }
1348
1349    // Add target VID column
1350    let target_vids: Vec<u64> = expansions
1351        .iter()
1352        .map(|(_, vid, _, _)| vid.as_u64())
1353        .collect();
1354    columns.push(Arc::new(UInt64Array::from(target_vids)));
1355
1356    // Add target ._labels column (from L0 buffers)
1357    {
1358        use arrow_array::builder::{ListBuilder, StringBuilder};
1359        let l0_ctx = graph_ctx.l0_context();
1360        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1361        for (_, vid, _, _) in expansions {
1362            let mut row_labels: Vec<String> = Vec::new();
1363            for l0 in l0_ctx.iter_l0_buffers() {
1364                let guard = l0.read();
1365                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1366                    for lbl in l0_labels {
1367                        if !row_labels.contains(lbl) {
1368                            row_labels.push(lbl.clone());
1369                        }
1370                    }
1371                }
1372            }
1373            let values = labels_builder.values();
1374            for lbl in &row_labels {
1375                values.append_value(lbl);
1376            }
1377            labels_builder.append(true);
1378        }
1379        columns.push(Arc::new(labels_builder.finish()));
1380    }
1381
1382    // Add edge columns if edge is bound (no properties in sync path)
1383    if edge_variable.is_some() {
1384        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1385        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1386
1387        // Add edge _type column
1388        let uni_schema = graph_ctx.storage().schema_manager().schema();
1389        let mut type_builder = arrow_array::builder::StringBuilder::new();
1390        for (_, _, _, edge_type_id) in expansions {
1391            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1392                type_builder.append_value(&name);
1393            } else {
1394                type_builder.append_null();
1395            }
1396        }
1397        columns.push(Arc::new(type_builder.finish()));
1398    } else {
1399        // Internal EID column for relationship uniqueness tracking (matches schema)
1400        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1401        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1402    }
1403
1404    let expanded_batch = RecordBatch::try_new(schema.clone(), columns)
1405        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1406
1407    if optional {
1408        let matched_indices: HashSet<usize> =
1409            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1410        let unmatched = collect_unmatched_optional_group_rows(
1411            input,
1412            &matched_indices,
1413            schema,
1414            optional_pattern_vars,
1415        )?;
1416
1417        if !unmatched.is_empty() {
1418            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1419                input,
1420                &unmatched,
1421                schema,
1422                optional_pattern_vars,
1423            )?;
1424            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1425                .map_err(|e| {
1426                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1427            })?;
1428            return Ok(combined);
1429        }
1430    }
1431
1432    Ok(expanded_batch)
1433}
1434
1435impl RecordBatchStream for GraphTraverseStream {
1436    fn schema(&self) -> SchemaRef {
1437        self.schema.clone()
1438    }
1439}
1440
1441/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1442type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1443
1444/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1445///
1446/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1447/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1448///
1449/// # Example
1450///
1451/// ```ignore
1452/// // Traverse schemaless "CUSTOM" edges
1453/// let traverse = GraphTraverseMainExec::new(
1454///     input_plan,
1455///     "_vid",
1456///     "CUSTOM",
1457///     Direction::Outgoing,
1458///     "m",           // target variable
1459///     Some("r"),     // edge variable
1460///     vec![],        // edge properties
1461///     vec![],        // target properties
1462///     graph_ctx,
1463///     false,         // not optional
1464/// );
1465/// ```
1466pub struct GraphTraverseMainExec {
1467    /// Input execution plan.
1468    input: Arc<dyn ExecutionPlan>,
1469
1470    /// Column name containing source VIDs.
1471    source_column: String,
1472
1473    /// Edge type names (not IDs, since schemaless types may not have IDs).
1474    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1475    type_names: Vec<String>,
1476
1477    /// Traversal direction.
1478    direction: Direction,
1479
1480    /// Variable name for target vertex columns.
1481    target_variable: String,
1482
1483    /// Variable name for edge columns (if edge is bound).
1484    edge_variable: Option<String>,
1485
1486    /// Edge properties to materialize.
1487    edge_properties: Vec<String>,
1488
1489    /// Target vertex properties to materialize.
1490    target_properties: Vec<String>,
1491
1492    /// Graph execution context.
1493    graph_ctx: Arc<GraphExecutionContext>,
1494
1495    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1496    optional: bool,
1497
1498    /// Variables introduced by the OPTIONAL MATCH pattern.
1499    optional_pattern_vars: HashSet<String>,
1500
1501    /// Column name of an already-bound target VID (for patterns where target is in scope).
1502    /// When set, only traversals reaching this exact VID are included.
1503    bound_target_column: Option<String>,
1504
1505    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1506    /// Edges matching any of these IDs are excluded from traversal results.
1507    used_edge_columns: Vec<String>,
1508
1509    /// Output schema.
1510    schema: SchemaRef,
1511
1512    /// Cached plan properties.
1513    properties: PlanProperties,
1514
1515    /// Execution metrics.
1516    metrics: ExecutionPlanMetricsSet,
1517}
1518
1519impl fmt::Debug for GraphTraverseMainExec {
1520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1521        f.debug_struct("GraphTraverseMainExec")
1522            .field("type_names", &self.type_names)
1523            .field("direction", &self.direction)
1524            .field("target_variable", &self.target_variable)
1525            .field("edge_variable", &self.edge_variable)
1526            .finish()
1527    }
1528}
1529
1530impl GraphTraverseMainExec {
1531    /// Create a new schemaless traversal executor.
1532    #[expect(clippy::too_many_arguments)]
1533    pub fn new(
1534        input: Arc<dyn ExecutionPlan>,
1535        source_column: impl Into<String>,
1536        type_names: Vec<String>,
1537        direction: Direction,
1538        target_variable: impl Into<String>,
1539        edge_variable: Option<String>,
1540        edge_properties: Vec<String>,
1541        target_properties: Vec<String>,
1542        graph_ctx: Arc<GraphExecutionContext>,
1543        optional: bool,
1544        optional_pattern_vars: HashSet<String>,
1545        bound_target_column: Option<String>,
1546        used_edge_columns: Vec<String>,
1547    ) -> Self {
1548        let source_column = source_column.into();
1549        let target_variable = target_variable.into();
1550
1551        // Build output schema
1552        let schema = Self::build_schema(
1553            &input.schema(),
1554            &target_variable,
1555            &edge_variable,
1556            &edge_properties,
1557            &target_properties,
1558            optional,
1559        );
1560
1561        let properties = compute_plan_properties(schema.clone());
1562
1563        Self {
1564            input,
1565            source_column,
1566            type_names,
1567            direction,
1568            target_variable,
1569            edge_variable,
1570            edge_properties,
1571            target_properties,
1572            graph_ctx,
1573            optional,
1574            optional_pattern_vars,
1575            bound_target_column,
1576            used_edge_columns,
1577            schema,
1578            properties,
1579            metrics: ExecutionPlanMetricsSet::new(),
1580        }
1581    }
1582
1583    /// Build output schema for traversal.
1584    fn build_schema(
1585        input_schema: &SchemaRef,
1586        target_variable: &str,
1587        edge_variable: &Option<String>,
1588        edge_properties: &[String],
1589        target_properties: &[String],
1590        optional: bool,
1591    ) -> SchemaRef {
1592        let mut fields: Vec<Field> = input_schema
1593            .fields()
1594            .iter()
1595            .map(|f| f.as_ref().clone())
1596            .collect();
1597
1598        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1599        let target_vid_name = format!("{}._vid", target_variable);
1600        if input_schema.column_with_name(&target_vid_name).is_none() {
1601            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1602        }
1603
1604        // Add target ._labels column (only if not already in input)
1605        let target_labels_name = format!("{}._labels", target_variable);
1606        if input_schema.column_with_name(&target_labels_name).is_none() {
1607            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1608        }
1609
1610        // Add edge columns if edge variable is bound
1611        if let Some(edge_var) = edge_variable {
1612            fields.push(Field::new(
1613                format!("{}._eid", edge_var),
1614                DataType::UInt64,
1615                optional,
1616            ));
1617
1618            // Add edge ._type column for type(r) support
1619            fields.push(Field::new(
1620                format!("{}._type", edge_var),
1621                DataType::Utf8,
1622                true,
1623            ));
1624
1625            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1626            // Schemaless edges store properties as CypherValue blobs so that
1627            // Int, Float, etc. round-trip correctly through Arrow.
1628            for prop in edge_properties {
1629                let col_name = format!("{}.{}", edge_var, prop);
1630                let mut metadata = std::collections::HashMap::new();
1631                metadata.insert("cv_encoded".to_string(), "true".to_string());
1632                fields.push(
1633                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1634                );
1635            }
1636        } else {
1637            // Add internal edge ID for anonymous relationships so BindPath can
1638            // reconstruct named paths (p = (a)-[:T]->(b)).
1639            fields.push(Field::new(
1640                format!("__eid_to_{}", target_variable),
1641                DataType::UInt64,
1642                optional,
1643            ));
1644        }
1645
1646        // Target properties: all as LargeBinary (deferred to PropertyManager)
1647        for prop in target_properties {
1648            fields.push(Field::new(
1649                format!("{}.{}", target_variable, prop),
1650                DataType::LargeBinary,
1651                true,
1652            ));
1653        }
1654
1655        Arc::new(Schema::new(fields))
1656    }
1657}
1658
1659impl DisplayAs for GraphTraverseMainExec {
1660    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1661        write!(
1662            f,
1663            "GraphTraverseMainExec: types={:?}, direction={:?}",
1664            self.type_names, self.direction
1665        )
1666    }
1667}
1668
1669impl ExecutionPlan for GraphTraverseMainExec {
1670    fn name(&self) -> &str {
1671        "GraphTraverseMainExec"
1672    }
1673
1674    fn as_any(&self) -> &dyn Any {
1675        self
1676    }
1677
1678    fn schema(&self) -> SchemaRef {
1679        self.schema.clone()
1680    }
1681
1682    fn properties(&self) -> &PlanProperties {
1683        &self.properties
1684    }
1685
1686    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1687        vec![&self.input]
1688    }
1689
1690    fn with_new_children(
1691        self: Arc<Self>,
1692        children: Vec<Arc<dyn ExecutionPlan>>,
1693    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1694        if children.len() != 1 {
1695            return Err(datafusion::error::DataFusionError::Plan(
1696                "GraphTraverseMainExec expects exactly one child".to_string(),
1697            ));
1698        }
1699
1700        Ok(Arc::new(Self {
1701            input: children[0].clone(),
1702            source_column: self.source_column.clone(),
1703            type_names: self.type_names.clone(),
1704            direction: self.direction,
1705            target_variable: self.target_variable.clone(),
1706            edge_variable: self.edge_variable.clone(),
1707            edge_properties: self.edge_properties.clone(),
1708            target_properties: self.target_properties.clone(),
1709            graph_ctx: self.graph_ctx.clone(),
1710            optional: self.optional,
1711            optional_pattern_vars: self.optional_pattern_vars.clone(),
1712            bound_target_column: self.bound_target_column.clone(),
1713            used_edge_columns: self.used_edge_columns.clone(),
1714            schema: self.schema.clone(),
1715            properties: self.properties.clone(),
1716            metrics: self.metrics.clone(),
1717        }))
1718    }
1719
1720    fn execute(
1721        &self,
1722        partition: usize,
1723        context: Arc<TaskContext>,
1724    ) -> DFResult<SendableRecordBatchStream> {
1725        let input_stream = self.input.execute(partition, context)?;
1726        let metrics = BaselineMetrics::new(&self.metrics, partition);
1727
1728        Ok(Box::pin(GraphTraverseMainStream::new(
1729            input_stream,
1730            self.source_column.clone(),
1731            self.type_names.clone(),
1732            self.direction,
1733            self.target_variable.clone(),
1734            self.edge_variable.clone(),
1735            self.edge_properties.clone(),
1736            self.target_properties.clone(),
1737            self.graph_ctx.clone(),
1738            self.optional,
1739            self.optional_pattern_vars.clone(),
1740            self.bound_target_column.clone(),
1741            self.used_edge_columns.clone(),
1742            self.schema.clone(),
1743            metrics,
1744        )))
1745    }
1746
1747    fn metrics(&self) -> Option<MetricsSet> {
1748        Some(self.metrics.clone_inner())
1749    }
1750}
1751
1752/// State machine for GraphTraverseMainStream.
1753enum GraphTraverseMainState {
1754    /// Loading adjacency map from main edges table.
1755    LoadingEdges {
1756        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1757        input_stream: SendableRecordBatchStream,
1758    },
1759    /// Processing input stream with loaded adjacency.
1760    Processing {
1761        adjacency: EdgeAdjacencyMap,
1762        input_stream: SendableRecordBatchStream,
1763    },
1764    /// Stream is done.
1765    Done,
1766}
1767
1768/// Stream that executes schemaless edge traversal.
1769struct GraphTraverseMainStream {
1770    /// Source column name.
1771    source_column: String,
1772
1773    /// Target variable name.
1774    target_variable: String,
1775
1776    /// Edge variable name.
1777    edge_variable: Option<String>,
1778
1779    /// Edge properties to materialize.
1780    edge_properties: Vec<String>,
1781
1782    /// Target properties to materialize.
1783    target_properties: Vec<String>,
1784
1785    /// Graph execution context.
1786    graph_ctx: Arc<GraphExecutionContext>,
1787
1788    /// Whether this is optional (preserve unmatched rows).
1789    optional: bool,
1790
1791    /// Variables introduced by OPTIONAL pattern.
1792    optional_pattern_vars: HashSet<String>,
1793
1794    /// Column name of an already-bound target VID (for filtering).
1795    bound_target_column: Option<String>,
1796
1797    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1798    used_edge_columns: Vec<String>,
1799
1800    /// Output schema.
1801    schema: SchemaRef,
1802
1803    /// Stream state.
1804    state: GraphTraverseMainState,
1805
1806    /// Metrics.
1807    metrics: BaselineMetrics,
1808}
1809
1810impl GraphTraverseMainStream {
1811    /// Create a new traverse main stream.
1812    #[expect(clippy::too_many_arguments)]
1813    fn new(
1814        input_stream: SendableRecordBatchStream,
1815        source_column: String,
1816        type_names: Vec<String>,
1817        direction: Direction,
1818        target_variable: String,
1819        edge_variable: Option<String>,
1820        edge_properties: Vec<String>,
1821        target_properties: Vec<String>,
1822        graph_ctx: Arc<GraphExecutionContext>,
1823        optional: bool,
1824        optional_pattern_vars: HashSet<String>,
1825        bound_target_column: Option<String>,
1826        used_edge_columns: Vec<String>,
1827        schema: SchemaRef,
1828        metrics: BaselineMetrics,
1829    ) -> Self {
1830        // Start by loading the adjacency map from the main edges table
1831        let loading_ctx = graph_ctx.clone();
1832        let loading_types = type_names.clone();
1833        let fut =
1834            async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1835
1836        Self {
1837            source_column,
1838            target_variable,
1839            edge_variable,
1840            edge_properties,
1841            target_properties,
1842            graph_ctx,
1843            optional,
1844            optional_pattern_vars,
1845            bound_target_column,
1846            used_edge_columns,
1847            schema,
1848            state: GraphTraverseMainState::LoadingEdges {
1849                future: Box::pin(fut),
1850                input_stream,
1851            },
1852            metrics,
1853        }
1854    }
1855
1856    /// Expand input batch using adjacency map (synchronous version).
1857    fn expand_batch(
1858        &self,
1859        input: &RecordBatch,
1860        adjacency: &EdgeAdjacencyMap,
1861    ) -> DFResult<RecordBatch> {
1862        // Extract source VIDs from source column
1863        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1864            datafusion::error::DataFusionError::Execution(format!(
1865                "Source column {} not found",
1866                self.source_column
1867            ))
1868        })?;
1869
1870        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1871        let source_vids: &UInt64Array = &source_vid_cow;
1872
1873        // Read bound target VIDs if column exists
1874        let bound_target_cow = self
1875            .bound_target_column
1876            .as_ref()
1877            .and_then(|col| input.column_by_name(col))
1878            .map(|c| column_as_vid_array(c.as_ref()))
1879            .transpose()?;
1880        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1881
1882        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1883        let used_edge_arrays: Vec<&UInt64Array> = self
1884            .used_edge_columns
1885            .iter()
1886            .filter_map(|col| {
1887                input
1888                    .column_by_name(col)
1889                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1890            })
1891            .collect();
1892
1893        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props)
1894        type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1895        let mut expansions: Vec<Expansion> = Vec::new();
1896
1897        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1898            if let Some(src_u64) = src_u64 {
1899                let src_vid = Vid::from(src_u64);
1900
1901                // Collect used edge IDs for this row from all previous hops
1902                let used_eids: HashSet<u64> = used_edge_arrays
1903                    .iter()
1904                    .filter_map(|arr| {
1905                        if arr.is_null(row_idx) {
1906                            None
1907                        } else {
1908                            Some(arr.value(row_idx))
1909                        }
1910                    })
1911                    .collect();
1912
1913                if let Some(neighbors) = adjacency.get(&src_vid) {
1914                    for (target_vid, eid, edge_type, props) in neighbors {
1915                        // Skip edges already used in previous hops (relationship uniqueness)
1916                        if used_eids.contains(&eid.as_u64()) {
1917                            continue;
1918                        }
1919
1920                        // Filter by bound target VID if set (for patterns where target is in scope).
1921                        // Only include traversals where the target matches the expected VID.
1922                        if let Some(targets) = expected_targets {
1923                            if targets.is_null(row_idx) {
1924                                continue;
1925                            }
1926                            let expected_vid = targets.value(row_idx);
1927                            if target_vid.as_u64() != expected_vid {
1928                                continue;
1929                            }
1930                        }
1931
1932                        expansions.push((
1933                            row_idx,
1934                            *target_vid,
1935                            *eid,
1936                            edge_type.clone(),
1937                            props.clone(),
1938                        ));
1939                    }
1940                }
1941            }
1942        }
1943
1944        // Handle OPTIONAL: preserve unmatched rows
1945        if expansions.is_empty() && self.optional {
1946            // No matches - return input with NULL columns appended
1947            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1948            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1949        }
1950
1951        if expansions.is_empty() {
1952            // No matches, not optional - return empty batch
1953            return Ok(RecordBatch::new_empty(self.schema.clone()));
1954        }
1955
1956        // Track matched rows for OPTIONAL handling
1957        let matched_rows: HashSet<usize> = if self.optional {
1958            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1959        } else {
1960            HashSet::new()
1961        };
1962
1963        // Expand input columns using Arrow take()
1964        let mut columns: Vec<ArrayRef> = Vec::new();
1965        let indices: Vec<u64> = expansions
1966            .iter()
1967            .map(|(idx, _, _, _, _)| *idx as u64)
1968            .collect();
1969        let indices_array = UInt64Array::from(indices);
1970
1971        for col in input.columns() {
1972            let expanded = take(col.as_ref(), &indices_array, None)?;
1973            columns.push(expanded);
1974        }
1975
1976        // Add target ._vid column (only if not already in input)
1977        let target_vid_name = format!("{}._vid", self.target_variable);
1978        let target_vids: Vec<u64> = expansions
1979            .iter()
1980            .map(|(_, vid, _, _, _)| vid.as_u64())
1981            .collect();
1982        if input.schema().column_with_name(&target_vid_name).is_none() {
1983            columns.push(Arc::new(UInt64Array::from(target_vids)));
1984        }
1985
1986        // Add target ._labels column (only if not already in input)
1987        let target_labels_name = format!("{}._labels", self.target_variable);
1988        if input
1989            .schema()
1990            .column_with_name(&target_labels_name)
1991            .is_none()
1992        {
1993            use arrow_array::builder::{ListBuilder, StringBuilder};
1994            let l0_ctx = self.graph_ctx.l0_context();
1995            let mut labels_builder = ListBuilder::new(StringBuilder::new());
1996            for (_, target_vid, _, _, _) in &expansions {
1997                let mut row_labels: Vec<String> = Vec::new();
1998                for l0 in l0_ctx.iter_l0_buffers() {
1999                    let guard = l0.read();
2000                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2001                        for lbl in l0_labels {
2002                            if !row_labels.contains(lbl) {
2003                                row_labels.push(lbl.clone());
2004                            }
2005                        }
2006                    }
2007                }
2008                let values = labels_builder.values();
2009                for lbl in &row_labels {
2010                    values.append_value(lbl);
2011                }
2012                labels_builder.append(true);
2013            }
2014            columns.push(Arc::new(labels_builder.finish()));
2015        }
2016
2017        // Add edge columns if edge variable is bound.
2018        // For anonymous relationships, emit internal edge IDs for BindPath.
2019        if self.edge_variable.is_some() {
2020            // Add edge ._eid column
2021            let eids: Vec<u64> = expansions
2022                .iter()
2023                .map(|(_, _, eid, _, _)| eid.as_u64())
2024                .collect();
2025            columns.push(Arc::new(UInt64Array::from(eids)));
2026
2027            // Add edge ._type column
2028            {
2029                let mut type_builder = arrow_array::builder::StringBuilder::new();
2030                for (_, _, _, edge_type, _) in &expansions {
2031                    type_builder.append_value(edge_type);
2032                }
2033                columns.push(Arc::new(type_builder.finish()));
2034            }
2035
2036            // Add edge property columns as cv_encoded LargeBinary to preserve types
2037            for prop_name in &self.edge_properties {
2038                use crate::query::df_graph::scan::encode_cypher_value;
2039                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2040                if prop_name == "_all_props" {
2041                    // Serialize all edge properties to CypherValue blob
2042                    for (_, _, _, _, props) in &expansions {
2043                        if props.is_empty() {
2044                            builder.append_null();
2045                        } else {
2046                            let mut json_map = serde_json::Map::new();
2047                            for (k, v) in props.iter() {
2048                                let json_val: serde_json::Value = v.clone().into();
2049                                json_map.insert(k.clone(), json_val);
2050                            }
2051                            let json = serde_json::Value::Object(json_map);
2052                            match encode_cypher_value(&json) {
2053                                Ok(bytes) => builder.append_value(bytes),
2054                                Err(_) => builder.append_null(),
2055                            }
2056                        }
2057                    }
2058                } else {
2059                    // Named property as cv_encoded CypherValue
2060                    for (_, _, _, _, props) in &expansions {
2061                        match props.get(prop_name) {
2062                            Some(uni_common::Value::Null) | None => builder.append_null(),
2063                            Some(val) => {
2064                                let json_val: serde_json::Value = val.clone().into();
2065                                match encode_cypher_value(&json_val) {
2066                                    Ok(bytes) => builder.append_value(bytes),
2067                                    Err(_) => builder.append_null(),
2068                                }
2069                            }
2070                        }
2071                    }
2072                }
2073                columns.push(Arc::new(builder.finish()));
2074            }
2075        } else {
2076            let eids: Vec<u64> = expansions
2077                .iter()
2078                .map(|(_, _, eid, _, _)| eid.as_u64())
2079                .collect();
2080            columns.push(Arc::new(UInt64Array::from(eids)));
2081        }
2082
2083        // Add target property columns (hydrate from L0 buffers)
2084        {
2085            use crate::query::df_graph::scan::encode_cypher_value;
2086            let l0_ctx = self.graph_ctx.l0_context();
2087
2088            for prop_name in &self.target_properties {
2089                if prop_name == "_all_props" {
2090                    // Build full CypherValue blob from all L0 vertex properties
2091                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2092                    for (_, target_vid, _, _, _) in &expansions {
2093                        let mut merged_props = serde_json::Map::new();
2094                        for l0 in l0_ctx.iter_l0_buffers() {
2095                            let guard = l0.read();
2096                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2097                                for (k, v) in props.iter() {
2098                                    let json_val: serde_json::Value = v.clone().into();
2099                                    merged_props.insert(k.to_string(), json_val);
2100                                }
2101                            }
2102                        }
2103                        if merged_props.is_empty() {
2104                            builder.append_null();
2105                        } else {
2106                            let json = serde_json::Value::Object(merged_props);
2107                            match encode_cypher_value(&json) {
2108                                Ok(bytes) => builder.append_value(bytes),
2109                                Err(_) => builder.append_null(),
2110                            }
2111                        }
2112                    }
2113                    columns.push(Arc::new(builder.finish()));
2114                } else {
2115                    // Extract individual property from L0 and encode as CypherValue
2116                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2117                    for (_, target_vid, _, _, _) in &expansions {
2118                        let mut found = false;
2119                        for l0 in l0_ctx.iter_l0_buffers() {
2120                            let guard = l0.read();
2121                            if let Some(props) = guard.vertex_properties.get(target_vid)
2122                                && let Some(val) = props.get(prop_name.as_str())
2123                                && !val.is_null()
2124                            {
2125                                let json_val: serde_json::Value = val.clone().into();
2126                                if let Ok(bytes) = encode_cypher_value(&json_val) {
2127                                    builder.append_value(bytes);
2128                                    found = true;
2129                                    break;
2130                                }
2131                            }
2132                        }
2133                        if !found {
2134                            builder.append_null();
2135                        }
2136                    }
2137                    columns.push(Arc::new(builder.finish()));
2138                }
2139            }
2140        }
2141
2142        let matched_batch = RecordBatch::try_new(self.schema.clone(), columns)
2143            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
2144
2145        // Handle OPTIONAL: append unmatched rows with NULLs
2146        if self.optional {
2147            let unmatched = collect_unmatched_optional_group_rows(
2148                input,
2149                &matched_rows,
2150                &self.schema,
2151                &self.optional_pattern_vars,
2152            )?;
2153
2154            if unmatched.is_empty() {
2155                return Ok(matched_batch);
2156            }
2157
2158            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2159                input,
2160                &unmatched,
2161                &self.schema,
2162                &self.optional_pattern_vars,
2163            )?;
2164
2165            // Concatenate matched and unmatched batches
2166            use arrow::compute::concat_batches;
2167            concat_batches(&self.schema, &[matched_batch, unmatched_batch])
2168                .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2169        } else {
2170            Ok(matched_batch)
2171        }
2172    }
2173}
2174
2175/// Build adjacency map from main edges table for given type names and direction.
2176///
2177/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2178/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2179/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2180async fn build_edge_adjacency_map(
2181    graph_ctx: &GraphExecutionContext,
2182    type_names: &[String],
2183    direction: Direction,
2184) -> DFResult<EdgeAdjacencyMap> {
2185    use uni_store::storage::main_edge::MainEdgeDataset;
2186
2187    let storage = graph_ctx.storage();
2188    let l0_ctx = graph_ctx.l0_context();
2189    let lancedb_store = storage.lancedb_store();
2190
2191    // Step 1: Query main edges table for all type names
2192    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2193    let edges_with_type = MainEdgeDataset::find_edges_by_type_names(lancedb_store, &type_refs)
2194        .await
2195        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2196
2197    // Preserve edge type name in the adjacency map for type(r) support
2198    let mut edges: Vec<(
2199        uni_common::Eid,
2200        uni_common::Vid,
2201        uni_common::Vid,
2202        String,
2203        uni_common::Properties,
2204    )> = edges_with_type.into_iter().collect();
2205
2206    // Step 2: Overlay L0 buffers for all type names
2207    for l0 in l0_ctx.iter_l0_buffers() {
2208        let l0_guard = l0.read();
2209
2210        for type_name in type_names {
2211            let l0_eids = l0_guard.eids_for_type(type_name);
2212
2213            // For each L0 edge, extract its information
2214            for &eid in &l0_eids {
2215                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2216                    let src_vid = edge_ref.src_vid;
2217                    let dst_vid = edge_ref.dst_vid;
2218
2219                    // Get properties for this edge from L0
2220                    let props = l0_guard
2221                        .edge_properties
2222                        .get(&eid)
2223                        .cloned()
2224                        .unwrap_or_default();
2225
2226                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2227                }
2228            }
2229        }
2230    }
2231
2232    // Step 3: Deduplicate by EID (L0 takes precedence)
2233    let mut seen_eids = HashSet::new();
2234    let mut unique_edges = Vec::new();
2235    for edge in edges.into_iter().rev() {
2236        if seen_eids.insert(edge.0) {
2237            unique_edges.push(edge);
2238        }
2239    }
2240    unique_edges.reverse();
2241
2242    // Step 4: Filter out edges tombstoned in any L0 buffer
2243    let mut tombstoned_eids = HashSet::new();
2244    for l0 in l0_ctx.iter_l0_buffers() {
2245        let l0_guard = l0.read();
2246        for eid in l0_guard.tombstones.keys() {
2247            tombstoned_eids.insert(*eid);
2248        }
2249    }
2250    if !tombstoned_eids.is_empty() {
2251        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2252    }
2253
2254    // Step 5: Build adjacency map based on direction
2255    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2256
2257    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2258        match direction {
2259            Direction::Outgoing => {
2260                adjacency
2261                    .entry(src_vid)
2262                    .or_default()
2263                    .push((dst_vid, eid, edge_type, props));
2264            }
2265            Direction::Incoming => {
2266                adjacency
2267                    .entry(dst_vid)
2268                    .or_default()
2269                    .push((src_vid, eid, edge_type, props));
2270            }
2271            Direction::Both => {
2272                adjacency.entry(src_vid).or_default().push((
2273                    dst_vid,
2274                    eid,
2275                    edge_type.clone(),
2276                    props.clone(),
2277                ));
2278                adjacency
2279                    .entry(dst_vid)
2280                    .or_default()
2281                    .push((src_vid, eid, edge_type, props));
2282            }
2283        }
2284    }
2285
2286    Ok(adjacency)
2287}
2288
2289impl Stream for GraphTraverseMainStream {
2290    type Item = DFResult<RecordBatch>;
2291
2292    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2293        loop {
2294            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2295
2296            match state {
2297                GraphTraverseMainState::LoadingEdges {
2298                    mut future,
2299                    input_stream,
2300                } => match future.as_mut().poll(cx) {
2301                    Poll::Ready(Ok(adjacency)) => {
2302                        // Move to processing state with loaded adjacency
2303                        self.state = GraphTraverseMainState::Processing {
2304                            adjacency,
2305                            input_stream,
2306                        };
2307                        // Continue loop to start processing
2308                    }
2309                    Poll::Ready(Err(e)) => {
2310                        self.state = GraphTraverseMainState::Done;
2311                        return Poll::Ready(Some(Err(e)));
2312                    }
2313                    Poll::Pending => {
2314                        self.state = GraphTraverseMainState::LoadingEdges {
2315                            future,
2316                            input_stream,
2317                        };
2318                        return Poll::Pending;
2319                    }
2320                },
2321                GraphTraverseMainState::Processing {
2322                    adjacency,
2323                    mut input_stream,
2324                } => {
2325                    // Check timeout
2326                    if let Err(e) = self.graph_ctx.check_timeout() {
2327                        return Poll::Ready(Some(Err(
2328                            datafusion::error::DataFusionError::Execution(e.to_string()),
2329                        )));
2330                    }
2331
2332                    match input_stream.poll_next_unpin(cx) {
2333                        Poll::Ready(Some(Ok(batch))) => {
2334                            // Expand batch using adjacency map
2335                            let result = self.expand_batch(&batch, &adjacency);
2336
2337                            self.state = GraphTraverseMainState::Processing {
2338                                adjacency,
2339                                input_stream,
2340                            };
2341
2342                            if let Ok(ref r) = result {
2343                                self.metrics.record_output(r.num_rows());
2344                            }
2345                            return Poll::Ready(Some(result));
2346                        }
2347                        Poll::Ready(Some(Err(e))) => {
2348                            self.state = GraphTraverseMainState::Done;
2349                            return Poll::Ready(Some(Err(e)));
2350                        }
2351                        Poll::Ready(None) => {
2352                            self.state = GraphTraverseMainState::Done;
2353                            return Poll::Ready(None);
2354                        }
2355                        Poll::Pending => {
2356                            self.state = GraphTraverseMainState::Processing {
2357                                adjacency,
2358                                input_stream,
2359                            };
2360                            return Poll::Pending;
2361                        }
2362                    }
2363                }
2364                GraphTraverseMainState::Done => {
2365                    return Poll::Ready(None);
2366                }
2367            }
2368        }
2369    }
2370}
2371
2372impl RecordBatchStream for GraphTraverseMainStream {
2373    fn schema(&self) -> SchemaRef {
2374        self.schema.clone()
2375    }
2376}
2377
2378/// Variable-length graph traversal execution plan.
2379///
2380/// Performs BFS traversal from source vertices with configurable min/max hops.
2381/// Tracks visited nodes to avoid cycles.
2382///
2383/// # Example
2384///
2385/// ```ignore
2386/// // Find all nodes 1-3 hops away via KNOWS edges
2387/// let traverse = GraphVariableLengthTraverseExec::new(
2388///     input_plan,
2389///     "_vid",
2390///     knows_type_id,
2391///     Direction::Outgoing,
2392///     1,  // min_hops
2393///     3,  // max_hops
2394///     Some("p"), // path variable
2395///     graph_ctx,
2396/// );
2397/// ```
2398pub struct GraphVariableLengthTraverseExec {
2399    /// Input execution plan.
2400    input: Arc<dyn ExecutionPlan>,
2401
2402    /// Column name containing source VIDs.
2403    source_column: String,
2404
2405    /// Edge type IDs to traverse.
2406    edge_type_ids: Vec<u32>,
2407
2408    /// Traversal direction.
2409    direction: Direction,
2410
2411    /// Minimum number of hops.
2412    min_hops: usize,
2413
2414    /// Maximum number of hops.
2415    max_hops: usize,
2416
2417    /// Variable name for target vertex columns.
2418    target_variable: String,
2419
2420    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2421    step_variable: Option<String>,
2422
2423    /// Variable name for path (if path is bound).
2424    path_variable: Option<String>,
2425
2426    /// Target vertex properties to materialize.
2427    target_properties: Vec<String>,
2428
2429    /// Target label name for property type resolution.
2430    target_label_name: Option<String>,
2431
2432    /// Whether this is an optional match (LEFT JOIN semantics).
2433    is_optional: bool,
2434
2435    /// Column name of an already-bound target VID (for patterns where target is in scope).
2436    bound_target_column: Option<String>,
2437
2438    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2439    edge_lance_filter: Option<String>,
2440
2441    /// Simple property equality conditions for per-edge L0 checking during BFS.
2442    /// Each entry is (property_name, expected_value).
2443    edge_property_conditions: Vec<(String, UniValue)>,
2444
2445    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2446    used_edge_columns: Vec<String>,
2447
2448    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2449    path_mode: super::nfa::PathMode,
2450
2451    /// Output mode determining BFS strategy.
2452    output_mode: super::nfa::VlpOutputMode,
2453
2454    /// Compiled NFA for path pattern matching.
2455    nfa: Arc<PathNfa>,
2456
2457    /// Graph execution context.
2458    graph_ctx: Arc<GraphExecutionContext>,
2459
2460    /// Output schema.
2461    schema: SchemaRef,
2462
2463    /// Cached plan properties.
2464    properties: PlanProperties,
2465
2466    /// Execution metrics.
2467    metrics: ExecutionPlanMetricsSet,
2468}
2469
2470impl fmt::Debug for GraphVariableLengthTraverseExec {
2471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2472        f.debug_struct("GraphVariableLengthTraverseExec")
2473            .field("source_column", &self.source_column)
2474            .field("edge_type_ids", &self.edge_type_ids)
2475            .field("direction", &self.direction)
2476            .field("min_hops", &self.min_hops)
2477            .field("max_hops", &self.max_hops)
2478            .field("target_variable", &self.target_variable)
2479            .finish()
2480    }
2481}
2482
2483impl GraphVariableLengthTraverseExec {
2484    /// Create a new variable-length traversal plan.
2485    ///
2486    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2487    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2488    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2489    #[expect(clippy::too_many_arguments)]
2490    pub fn new(
2491        input: Arc<dyn ExecutionPlan>,
2492        source_column: impl Into<String>,
2493        edge_type_ids: Vec<u32>,
2494        direction: Direction,
2495        min_hops: usize,
2496        max_hops: usize,
2497        target_variable: impl Into<String>,
2498        step_variable: Option<String>,
2499        path_variable: Option<String>,
2500        target_properties: Vec<String>,
2501        target_label_name: Option<String>,
2502        graph_ctx: Arc<GraphExecutionContext>,
2503        is_optional: bool,
2504        bound_target_column: Option<String>,
2505        edge_lance_filter: Option<String>,
2506        edge_property_conditions: Vec<(String, UniValue)>,
2507        used_edge_columns: Vec<String>,
2508        path_mode: super::nfa::PathMode,
2509        output_mode: super::nfa::VlpOutputMode,
2510        qpp_nfa: Option<PathNfa>,
2511    ) -> Self {
2512        let source_column = source_column.into();
2513        let target_variable = target_variable.into();
2514
2515        // Resolve target property Arrow types from the schema
2516        let uni_schema = graph_ctx.storage().schema_manager().schema();
2517        let label_props = target_label_name
2518            .as_deref()
2519            .and_then(|ln| uni_schema.properties.get(ln));
2520
2521        // Build output schema
2522        let schema = Self::build_schema(
2523            input.schema(),
2524            &target_variable,
2525            step_variable.as_deref(),
2526            path_variable.as_deref(),
2527            &target_properties,
2528            label_props,
2529        );
2530        let properties = compute_plan_properties(schema.clone());
2531
2532        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2533        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2534            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2535        }));
2536
2537        Self {
2538            input,
2539            source_column,
2540            edge_type_ids,
2541            direction,
2542            min_hops,
2543            max_hops,
2544            target_variable,
2545            step_variable,
2546            path_variable,
2547            target_properties,
2548            target_label_name,
2549            is_optional,
2550            bound_target_column,
2551            edge_lance_filter,
2552            edge_property_conditions,
2553            used_edge_columns,
2554            path_mode,
2555            output_mode,
2556            nfa,
2557            graph_ctx,
2558            schema,
2559            properties,
2560            metrics: ExecutionPlanMetricsSet::new(),
2561        }
2562    }
2563
2564    /// Build output schema.
2565    fn build_schema(
2566        input_schema: SchemaRef,
2567        target_variable: &str,
2568        step_variable: Option<&str>,
2569        path_variable: Option<&str>,
2570        target_properties: &[String],
2571        label_props: Option<
2572            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2573        >,
2574    ) -> SchemaRef {
2575        let mut fields: Vec<Field> = input_schema
2576            .fields()
2577            .iter()
2578            .map(|f| f.as_ref().clone())
2579            .collect();
2580
2581        // Add target VID column (only if not already in input)
2582        let target_vid_name = format!("{}._vid", target_variable);
2583        if input_schema.column_with_name(&target_vid_name).is_none() {
2584            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2585        }
2586
2587        // Add target ._labels column (only if not already in input)
2588        let target_labels_name = format!("{}._labels", target_variable);
2589        if input_schema.column_with_name(&target_labels_name).is_none() {
2590            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2591        }
2592
2593        // Add target vertex property columns (skip if already in input)
2594        for prop_name in target_properties {
2595            let col_name = format!("{}.{}", target_variable, prop_name);
2596            if input_schema.column_with_name(&col_name).is_none() {
2597                let arrow_type = resolve_property_type(prop_name, label_props);
2598                fields.push(Field::new(&col_name, arrow_type, true));
2599            }
2600        }
2601
2602        // Add hop count
2603        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2604
2605        // Add step variable (edge list) if bound
2606        if let Some(step_var) = step_variable {
2607            fields.push(build_edge_list_field(step_var));
2608        }
2609
2610        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2611        if let Some(path_var) = path_variable
2612            && input_schema.column_with_name(path_var).is_none()
2613        {
2614            fields.push(build_path_struct_field(path_var));
2615        }
2616
2617        Arc::new(Schema::new(fields))
2618    }
2619}
2620
2621impl DisplayAs for GraphVariableLengthTraverseExec {
2622    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2623        write!(
2624            f,
2625            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2626            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2627        )
2628    }
2629}
2630
2631impl ExecutionPlan for GraphVariableLengthTraverseExec {
2632    fn name(&self) -> &str {
2633        "GraphVariableLengthTraverseExec"
2634    }
2635
2636    fn as_any(&self) -> &dyn Any {
2637        self
2638    }
2639
2640    fn schema(&self) -> SchemaRef {
2641        self.schema.clone()
2642    }
2643
2644    fn properties(&self) -> &PlanProperties {
2645        &self.properties
2646    }
2647
2648    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2649        vec![&self.input]
2650    }
2651
2652    fn with_new_children(
2653        self: Arc<Self>,
2654        children: Vec<Arc<dyn ExecutionPlan>>,
2655    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2656        if children.len() != 1 {
2657            return Err(datafusion::error::DataFusionError::Plan(
2658                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2659            ));
2660        }
2661
2662        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2663        Ok(Arc::new(Self::new(
2664            children[0].clone(),
2665            self.source_column.clone(),
2666            self.edge_type_ids.clone(),
2667            self.direction,
2668            self.min_hops,
2669            self.max_hops,
2670            self.target_variable.clone(),
2671            self.step_variable.clone(),
2672            self.path_variable.clone(),
2673            self.target_properties.clone(),
2674            self.target_label_name.clone(),
2675            self.graph_ctx.clone(),
2676            self.is_optional,
2677            self.bound_target_column.clone(),
2678            self.edge_lance_filter.clone(),
2679            self.edge_property_conditions.clone(),
2680            self.used_edge_columns.clone(),
2681            self.path_mode.clone(),
2682            self.output_mode.clone(),
2683            Some((*self.nfa).clone()),
2684        )))
2685    }
2686
2687    fn execute(
2688        &self,
2689        partition: usize,
2690        context: Arc<TaskContext>,
2691    ) -> DFResult<SendableRecordBatchStream> {
2692        let input_stream = self.input.execute(partition, context)?;
2693
2694        let metrics = BaselineMetrics::new(&self.metrics, partition);
2695
2696        let warm_fut = self
2697            .graph_ctx
2698            .warming_future(self.edge_type_ids.clone(), self.direction);
2699
2700        Ok(Box::pin(GraphVariableLengthTraverseStream {
2701            input: input_stream,
2702            exec: Arc::new(self.clone_for_stream()),
2703            schema: self.schema.clone(),
2704            state: VarLengthStreamState::Warming(warm_fut),
2705            metrics,
2706        }))
2707    }
2708
2709    fn metrics(&self) -> Option<MetricsSet> {
2710        Some(self.metrics.clone_inner())
2711    }
2712}
2713
2714impl GraphVariableLengthTraverseExec {
2715    /// Clone fields needed for stream (avoids cloning the full struct).
2716    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2717        GraphVariableLengthTraverseExecData {
2718            source_column: self.source_column.clone(),
2719            edge_type_ids: self.edge_type_ids.clone(),
2720            direction: self.direction,
2721            min_hops: self.min_hops,
2722            max_hops: self.max_hops,
2723            target_variable: self.target_variable.clone(),
2724            step_variable: self.step_variable.clone(),
2725            path_variable: self.path_variable.clone(),
2726            target_properties: self.target_properties.clone(),
2727            target_label_name: self.target_label_name.clone(),
2728            is_optional: self.is_optional,
2729            bound_target_column: self.bound_target_column.clone(),
2730            edge_lance_filter: self.edge_lance_filter.clone(),
2731            edge_property_conditions: self.edge_property_conditions.clone(),
2732            used_edge_columns: self.used_edge_columns.clone(),
2733            path_mode: self.path_mode.clone(),
2734            output_mode: self.output_mode.clone(),
2735            nfa: self.nfa.clone(),
2736            graph_ctx: self.graph_ctx.clone(),
2737        }
2738    }
2739}
2740
2741/// Data needed by the stream (without ExecutionPlan overhead).
2742#[allow(dead_code)] // Some fields accessed via NFA; kept for with_new_children reconstruction
2743struct GraphVariableLengthTraverseExecData {
2744    source_column: String,
2745    edge_type_ids: Vec<u32>,
2746    direction: Direction,
2747    min_hops: usize,
2748    max_hops: usize,
2749    target_variable: String,
2750    step_variable: Option<String>,
2751    path_variable: Option<String>,
2752    target_properties: Vec<String>,
2753    target_label_name: Option<String>,
2754    is_optional: bool,
2755    bound_target_column: Option<String>,
2756    #[allow(dead_code)] // Used in Phase 3 warming
2757    edge_lance_filter: Option<String>,
2758    /// Simple property equality conditions for per-edge L0 checking during BFS.
2759    edge_property_conditions: Vec<(String, UniValue)>,
2760    used_edge_columns: Vec<String>,
2761    path_mode: super::nfa::PathMode,
2762    output_mode: super::nfa::VlpOutputMode,
2763    nfa: Arc<PathNfa>,
2764    graph_ctx: Arc<GraphExecutionContext>,
2765}
2766
2767/// Safety cap for frontier size to prevent OOM on pathological graphs.
2768const MAX_FRONTIER_SIZE: usize = 500_000;
2769/// Safety cap for predecessor pool size.
2770const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2771
2772impl GraphVariableLengthTraverseExecData {
2773    /// Check if a vertex passes the target label filter.
2774    fn check_target_label(&self, vid: Vid) -> bool {
2775        if let Some(ref label_name) = self.target_label_name {
2776            let query_ctx = self.graph_ctx.query_context();
2777            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2778                Some(labels) => labels.contains(label_name),
2779                None => true, // not in L0, trust storage
2780            }
2781        } else {
2782            true
2783        }
2784    }
2785
2786    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
2787    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2788        match constraint {
2789            super::nfa::VertexConstraint::Label(label_name) => {
2790                let query_ctx = self.graph_ctx.query_context();
2791                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2792                    Some(labels) => labels.contains(label_name),
2793                    None => true, // not in L0, trust storage
2794                }
2795            }
2796        }
2797    }
2798
2799    /// Expand neighbors from a vertex through all NFA transitions from the given state.
2800    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
2801    fn expand_neighbors(
2802        &self,
2803        vid: Vid,
2804        state: NfaStateId,
2805        eid_filter: &EidFilter,
2806        used_eids: &FxHashSet<u64>,
2807    ) -> Vec<(Vid, Eid, NfaStateId)> {
2808        let is_undirected = matches!(self.direction, Direction::Both);
2809        let mut results = Vec::new();
2810
2811        for transition in self.nfa.transitions_from(state) {
2812            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2813
2814            for &etype in &transition.edge_type_ids {
2815                for (neighbor, eid) in
2816                    self.graph_ctx
2817                        .get_neighbors(vid, etype, transition.direction)
2818                {
2819                    // Deduplicate edges for undirected patterns
2820                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
2821                        continue;
2822                    }
2823
2824                    // Check EidFilter (edge property bitmap preselection)
2825                    if !eid_filter.contains(eid) {
2826                        continue;
2827                    }
2828
2829                    // Check edge property conditions (L0 in-memory properties)
2830                    if !self.edge_property_conditions.is_empty() {
2831                        let query_ctx = self.graph_ctx.query_context();
2832                        let passes = if let Some(props) =
2833                            l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2834                        {
2835                            self.edge_property_conditions
2836                                .iter()
2837                                .all(|(name, expected)| {
2838                                    props.get(name).is_some_and(|actual| actual == expected)
2839                                })
2840                        } else {
2841                            // Edge not in L0 (CSR/Lance) — relies on EidFilter
2842                            // for correctness. TODO: build EidFilter from Lance
2843                            // during warming for flushed edges.
2844                            true
2845                        };
2846                        if !passes {
2847                            continue;
2848                        }
2849                    }
2850
2851                    // Check cross-pattern relationship uniqueness
2852                    if used_eids.contains(&eid.as_u64()) {
2853                        continue;
2854                    }
2855
2856                    // Check NFA state constraint on the destination state (QPP label filters)
2857                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
2858                        && !self.check_state_constraint(neighbor, constraint)
2859                    {
2860                        continue;
2861                    }
2862
2863                    results.push((neighbor, eid, transition.to));
2864                }
2865            }
2866        }
2867
2868        results
2869    }
2870
2871    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
2872    ///
2873    /// Returns BFS results in the same format as the old bfs() for compatibility
2874    /// with build_output_batch.
2875    #[allow(clippy::too_many_arguments)]
2876    fn bfs_with_dag(
2877        &self,
2878        source: Vid,
2879        eid_filter: &EidFilter,
2880        used_eids: &FxHashSet<u64>,
2881        vid_filter: &VidFilter,
2882    ) -> Vec<BfsResult> {
2883        let nfa = &self.nfa;
2884        let selector = PathSelector::All;
2885        let mut dag = PredecessorDag::new(selector);
2886        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2887
2888        // Handle zero-length paths (min_hops == 0)
2889        if nfa.is_accepting(nfa.start_state())
2890            && self.check_target_label(source)
2891            && vid_filter.contains(source)
2892        {
2893            accepting.push((source, nfa.start_state(), 0));
2894        }
2895
2896        // Per-depth frontier BFS
2897        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2898        let mut depth: u32 = 0;
2899
2900        while !frontier.is_empty() && depth < self.max_hops as u32 {
2901            depth += 1;
2902            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2903            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2904
2905            for &(vid, state) in &frontier {
2906                for (neighbor, eid, dst_state) in
2907                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2908                {
2909                    // Record in predecessor DAG
2910                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2911
2912                    // Add to next frontier (deduplicated per depth)
2913                    if seen_at_depth.insert((neighbor, dst_state)) {
2914                        next_frontier.push((neighbor, dst_state));
2915
2916                        // Check if accepting
2917                        if nfa.is_accepting(dst_state)
2918                            && self.check_target_label(neighbor)
2919                            && vid_filter.contains(neighbor)
2920                        {
2921                            accepting.push((neighbor, dst_state, depth));
2922                        }
2923                    }
2924                }
2925            }
2926
2927            // Safety cap
2928            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2929                break;
2930            }
2931
2932            frontier = next_frontier;
2933        }
2934
2935        // Enumerate paths from DAG to produce BfsResult tuples
2936        let mut results: Vec<BfsResult> = Vec::new();
2937        for &(target, state, depth) in &accepting {
2938            dag.enumerate_paths(
2939                source,
2940                target,
2941                state,
2942                depth,
2943                depth,
2944                &self.path_mode,
2945                &mut |nodes, edges| {
2946                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2947                    std::ops::ControlFlow::Continue(())
2948                },
2949            );
2950        }
2951
2952        results
2953    }
2954
2955    /// NFA-driven BFS returning only endpoints and depths (Mode A).
2956    ///
2957    /// More efficient when no path/step variable is bound — skips full path enumeration.
2958    /// Uses lightweight trail verification via has_trail_valid_path().
2959    #[allow(clippy::too_many_arguments)]
2960    fn bfs_endpoints_only(
2961        &self,
2962        source: Vid,
2963        eid_filter: &EidFilter,
2964        used_eids: &FxHashSet<u64>,
2965        vid_filter: &VidFilter,
2966    ) -> Vec<(Vid, u32)> {
2967        let nfa = &self.nfa;
2968        let selector = PathSelector::Any; // Only need existence, not all paths
2969        let mut dag = PredecessorDag::new(selector);
2970        let mut results: Vec<(Vid, u32)> = Vec::new();
2971
2972        // Handle zero-length paths
2973        if nfa.is_accepting(nfa.start_state())
2974            && self.check_target_label(source)
2975            && vid_filter.contains(source)
2976        {
2977            results.push((source, 0));
2978        }
2979
2980        // Per-depth frontier BFS
2981        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2982        let mut depth: u32 = 0;
2983
2984        while !frontier.is_empty() && depth < self.max_hops as u32 {
2985            depth += 1;
2986            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2987            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2988
2989            for &(vid, state) in &frontier {
2990                for (neighbor, eid, dst_state) in
2991                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2992                {
2993                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2994
2995                    if seen_at_depth.insert((neighbor, dst_state)) {
2996                        next_frontier.push((neighbor, dst_state));
2997
2998                        // Check if accepting with trail verification
2999                        if nfa.is_accepting(dst_state)
3000                            && self.check_target_label(neighbor)
3001                            && vid_filter.contains(neighbor)
3002                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3003                        {
3004                            results.push((neighbor, depth));
3005                        }
3006                    }
3007                }
3008            }
3009
3010            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3011                break;
3012            }
3013
3014            frontier = next_frontier;
3015        }
3016
3017        results
3018    }
3019}
3020
3021/// State machine for variable-length traverse stream.
3022enum VarLengthStreamState {
3023    /// Warming adjacency CSRs before first batch.
3024    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3025    /// Processing input batches.
3026    Reading,
3027    /// Materializing target vertex properties asynchronously.
3028    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3029    /// Stream is done.
3030    Done,
3031}
3032
3033/// Stream for variable-length traversal.
3034struct GraphVariableLengthTraverseStream {
3035    input: SendableRecordBatchStream,
3036    exec: Arc<GraphVariableLengthTraverseExecData>,
3037    schema: SchemaRef,
3038    state: VarLengthStreamState,
3039    metrics: BaselineMetrics,
3040}
3041
3042impl Stream for GraphVariableLengthTraverseStream {
3043    type Item = DFResult<RecordBatch>;
3044
3045    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3046        loop {
3047            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3048
3049            match state {
3050                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3051                    Poll::Ready(Ok(())) => {
3052                        self.state = VarLengthStreamState::Reading;
3053                        // Continue loop to start reading
3054                    }
3055                    Poll::Ready(Err(e)) => {
3056                        self.state = VarLengthStreamState::Done;
3057                        return Poll::Ready(Some(Err(e)));
3058                    }
3059                    Poll::Pending => {
3060                        self.state = VarLengthStreamState::Warming(fut);
3061                        return Poll::Pending;
3062                    }
3063                },
3064                VarLengthStreamState::Reading => {
3065                    // Check timeout
3066                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3067                        return Poll::Ready(Some(Err(
3068                            datafusion::error::DataFusionError::Execution(e.to_string()),
3069                        )));
3070                    }
3071
3072                    match self.input.poll_next_unpin(cx) {
3073                        Poll::Ready(Some(Ok(batch))) => {
3074                            // Build base batch synchronously (BFS + expand)
3075                            // TODO(Phase 3.5): Build real EidFilter/VidFilter during warming
3076                            let eid_filter = EidFilter::AllAllowed;
3077                            let vid_filter = VidFilter::AllAllowed;
3078                            let base_result =
3079                                self.process_batch_base(batch, &eid_filter, &vid_filter);
3080                            let base_batch = match base_result {
3081                                Ok(b) => b,
3082                                Err(e) => {
3083                                    self.state = VarLengthStreamState::Reading;
3084                                    return Poll::Ready(Some(Err(e)));
3085                                }
3086                            };
3087
3088                            // If no properties need async hydration, return directly
3089                            if self.exec.target_properties.is_empty() {
3090                                self.state = VarLengthStreamState::Reading;
3091                                return Poll::Ready(Some(Ok(base_batch)));
3092                            }
3093
3094                            // Properties needed — create async future for hydration
3095                            let schema = self.schema.clone();
3096                            let target_variable = self.exec.target_variable.clone();
3097                            let target_properties = self.exec.target_properties.clone();
3098                            let target_label_name = self.exec.target_label_name.clone();
3099                            let graph_ctx = self.exec.graph_ctx.clone();
3100
3101                            let fut = hydrate_vlp_target_properties(
3102                                base_batch,
3103                                schema,
3104                                target_variable,
3105                                target_properties,
3106                                target_label_name,
3107                                graph_ctx,
3108                            );
3109
3110                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3111                            // Continue loop to poll the future
3112                        }
3113                        Poll::Ready(Some(Err(e))) => {
3114                            self.state = VarLengthStreamState::Done;
3115                            return Poll::Ready(Some(Err(e)));
3116                        }
3117                        Poll::Ready(None) => {
3118                            self.state = VarLengthStreamState::Done;
3119                            return Poll::Ready(None);
3120                        }
3121                        Poll::Pending => {
3122                            self.state = VarLengthStreamState::Reading;
3123                            return Poll::Pending;
3124                        }
3125                    }
3126                }
3127                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3128                    Poll::Ready(Ok(batch)) => {
3129                        self.state = VarLengthStreamState::Reading;
3130                        self.metrics.record_output(batch.num_rows());
3131                        return Poll::Ready(Some(Ok(batch)));
3132                    }
3133                    Poll::Ready(Err(e)) => {
3134                        self.state = VarLengthStreamState::Done;
3135                        return Poll::Ready(Some(Err(e)));
3136                    }
3137                    Poll::Pending => {
3138                        self.state = VarLengthStreamState::Materializing(fut);
3139                        return Poll::Pending;
3140                    }
3141                },
3142                VarLengthStreamState::Done => {
3143                    return Poll::Ready(None);
3144                }
3145            }
3146        }
3147    }
3148}
3149
3150impl GraphVariableLengthTraverseStream {
3151    fn process_batch_base(
3152        &self,
3153        batch: RecordBatch,
3154        eid_filter: &EidFilter,
3155        vid_filter: &VidFilter,
3156    ) -> DFResult<RecordBatch> {
3157        let source_col = batch
3158            .column_by_name(&self.exec.source_column)
3159            .ok_or_else(|| {
3160                datafusion::error::DataFusionError::Execution(format!(
3161                    "Source column '{}' not found",
3162                    self.exec.source_column
3163                ))
3164            })?;
3165
3166        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3167        let source_vids: &UInt64Array = &source_vid_cow;
3168
3169        // Read bound target VIDs if column exists
3170        let bound_target_cow = self
3171            .exec
3172            .bound_target_column
3173            .as_ref()
3174            .and_then(|col| batch.column_by_name(col))
3175            .map(|c| column_as_vid_array(c.as_ref()))
3176            .transpose()?;
3177        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3178
3179        // Extract used edge columns for cross-pattern relationship uniqueness
3180        let used_edge_arrays: Vec<&UInt64Array> = self
3181            .exec
3182            .used_edge_columns
3183            .iter()
3184            .filter_map(|col| {
3185                batch
3186                    .column_by_name(col)?
3187                    .as_any()
3188                    .downcast_ref::<UInt64Array>()
3189            })
3190            .collect();
3191
3192        // Collect all BFS results
3193        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3194
3195        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3196            let mut emitted_for_row = false;
3197
3198            if let Some(src) = source_vid {
3199                let vid = Vid::from(src);
3200
3201                // Collect used edge IDs from previous hops for this row
3202                let used_eids: FxHashSet<u64> = used_edge_arrays
3203                    .iter()
3204                    .filter_map(|arr| {
3205                        if arr.is_null(row_idx) {
3206                            None
3207                        } else {
3208                            Some(arr.value(row_idx))
3209                        }
3210                    })
3211                    .collect();
3212
3213                // Dispatch to appropriate BFS mode based on output_mode
3214                match &self.exec.output_mode {
3215                    VlpOutputMode::EndpointsOnly => {
3216                        let endpoints = self
3217                            .exec
3218                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3219                        for (target, depth) in endpoints {
3220                            // Filter by bound target VID
3221                            if let Some(targets) = expected_targets {
3222                                if targets.is_null(row_idx) {
3223                                    continue;
3224                                }
3225                                if target.as_u64() != targets.value(row_idx) {
3226                                    continue;
3227                                }
3228                            }
3229                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3230                            emitted_for_row = true;
3231                        }
3232                    }
3233                    _ => {
3234                        // FullPath, StepVariable, CountOnly, etc.
3235                        let bfs_results = self
3236                            .exec
3237                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3238                        for (target, hop_count, node_path, edge_path) in bfs_results {
3239                            // Filter by bound target VID
3240                            if let Some(targets) = expected_targets {
3241                                if targets.is_null(row_idx) {
3242                                    continue;
3243                                }
3244                                if target.as_u64() != targets.value(row_idx) {
3245                                    continue;
3246                                }
3247                            }
3248                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3249                            emitted_for_row = true;
3250                        }
3251                    }
3252                }
3253            }
3254
3255            if self.exec.is_optional && !emitted_for_row {
3256                // Preserve the source row with NULL optional bindings.
3257                // We use empty node/edge paths to mark unmatched rows.
3258                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3259            }
3260        }
3261
3262        self.build_output_batch(&batch, &expansions)
3263    }
3264
3265    fn build_output_batch(
3266        &self,
3267        input: &RecordBatch,
3268        expansions: &[VarLengthExpansion],
3269    ) -> DFResult<RecordBatch> {
3270        if expansions.is_empty() {
3271            return Ok(RecordBatch::new_empty(self.schema.clone()));
3272        }
3273
3274        let num_rows = expansions.len();
3275
3276        // Build index array
3277        let indices: Vec<u64> = expansions
3278            .iter()
3279            .map(|(idx, _, _, _, _)| *idx as u64)
3280            .collect();
3281        let indices_array = UInt64Array::from(indices);
3282
3283        // Expand input columns
3284        let mut columns: Vec<ArrayRef> = Vec::new();
3285        for col in input.columns() {
3286            let expanded = take(col.as_ref(), &indices_array, None)?;
3287            columns.push(expanded);
3288        }
3289
3290        // Collect target VIDs and unmatched markers for use in multiple places.
3291        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3292        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3293        let unmatched_rows: Vec<bool> = expansions
3294            .iter()
3295            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3296            .collect();
3297        let target_vids: Vec<Option<u64>> = expansions
3298            .iter()
3299            .zip(unmatched_rows.iter())
3300            .map(
3301                |((_, vid, _, _, _), unmatched)| {
3302                    if *unmatched { None } else { Some(vid.as_u64()) }
3303                },
3304            )
3305            .collect();
3306
3307        // Add target VID column (only if not already in input)
3308        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3309        if input.schema().column_with_name(&target_vid_name).is_none() {
3310            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3311        }
3312
3313        // Add target ._labels column (only if not already in input)
3314        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3315        if input
3316            .schema()
3317            .column_with_name(&target_labels_name)
3318            .is_none()
3319        {
3320            use arrow_array::builder::{ListBuilder, StringBuilder};
3321            let query_ctx = self.exec.graph_ctx.query_context();
3322            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3323            for target_vid in &target_vids {
3324                let Some(vid_u64) = target_vid else {
3325                    labels_builder.append(false);
3326                    continue;
3327                };
3328                let vid = Vid::from(*vid_u64);
3329                let row_labels: Vec<String> =
3330                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3331                        Some(labels) => {
3332                            // Vertex is in L0 — use actual labels only
3333                            labels
3334                        }
3335                        None => {
3336                            // Vertex not in L0 — trust schema label (storage already filtered)
3337                            if let Some(ref label_name) = self.exec.target_label_name {
3338                                vec![label_name.clone()]
3339                            } else {
3340                                vec![]
3341                            }
3342                        }
3343                    };
3344                let values = labels_builder.values();
3345                for lbl in &row_labels {
3346                    values.append_value(lbl);
3347                }
3348                labels_builder.append(true);
3349            }
3350            columns.push(Arc::new(labels_builder.finish()));
3351        }
3352
3353        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3354        for prop_name in &self.exec.target_properties {
3355            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3356            if input.schema().column_with_name(&full_prop_name).is_none() {
3357                let col_idx = columns.len();
3358                if col_idx < self.schema.fields().len() {
3359                    let field = self.schema.field(col_idx);
3360                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3361                }
3362            }
3363        }
3364
3365        // Add hop count column
3366        let hop_counts: Vec<u64> = expansions
3367            .iter()
3368            .map(|(_, _, hops, _, _)| *hops as u64)
3369            .collect();
3370        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3371
3372        // Add step variable (edge list) column if bound
3373        if self.exec.step_variable.is_some() {
3374            let mut edges_builder = new_edge_list_builder();
3375            let query_ctx = self.exec.graph_ctx.query_context();
3376
3377            for (_, _, _, node_path, edge_path) in expansions {
3378                if node_path.is_empty() && edge_path.is_empty() {
3379                    // Null row for OPTIONAL MATCH unmatched
3380                    edges_builder.append_null();
3381                } else if edge_path.is_empty() {
3382                    // Zero-hop match: empty list
3383                    edges_builder.append(true);
3384                } else {
3385                    for (i, eid) in edge_path.iter().enumerate() {
3386                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3387                            .unwrap_or_else(|| "UNKNOWN".to_string());
3388                        append_edge_to_struct(
3389                            edges_builder.values(),
3390                            *eid,
3391                            &type_name,
3392                            node_path[i].as_u64(),
3393                            node_path[i + 1].as_u64(),
3394                            &query_ctx,
3395                        );
3396                    }
3397                    edges_builder.append(true);
3398                }
3399            }
3400
3401            columns.push(Arc::new(edges_builder.finish()));
3402        }
3403
3404        // Add path variable column if bound.
3405        // For named paths, we output a Path struct with nodes and relationships arrays.
3406        // If a path column already exists in input (from a prior BindFixedPath), extend it
3407        // rather than building from scratch.
3408        if let Some(path_var_name) = &self.exec.path_variable {
3409            let existing_path_col_idx = input
3410                .schema()
3411                .column_with_name(path_var_name)
3412                .map(|(idx, _)| idx);
3413            // Clone the Arc so we can read existing path without borrowing `columns`
3414            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3415            let existing_path = existing_path_arc
3416                .as_ref()
3417                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3418
3419            let mut nodes_builder = new_node_list_builder();
3420            let mut rels_builder = new_edge_list_builder();
3421            let query_ctx = self.exec.graph_ctx.query_context();
3422            let mut path_validity = Vec::with_capacity(expansions.len());
3423
3424            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3425                if node_path.is_empty() && edge_path.is_empty() {
3426                    nodes_builder.append(false);
3427                    rels_builder.append(false);
3428                    path_validity.push(false);
3429                    continue;
3430                }
3431
3432                // Prepend existing path prefix if extending
3433                let skip_first_vlp_node = if let Some(existing) = existing_path {
3434                    if !existing.is_null(row_out_idx) {
3435                        prepend_existing_path(
3436                            existing,
3437                            row_out_idx,
3438                            &mut nodes_builder,
3439                            &mut rels_builder,
3440                            &query_ctx,
3441                        );
3442                        true
3443                    } else {
3444                        false
3445                    }
3446                } else {
3447                    false
3448                };
3449
3450                // Append VLP nodes (skip first if extending — it's the junction point)
3451                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3452                for vid in &node_path[start_idx..] {
3453                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3454                }
3455                nodes_builder.append(true);
3456
3457                for (i, eid) in edge_path.iter().enumerate() {
3458                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3459                        .unwrap_or_else(|| "UNKNOWN".to_string());
3460                    append_edge_to_struct(
3461                        rels_builder.values(),
3462                        *eid,
3463                        &type_name,
3464                        node_path[i].as_u64(),
3465                        node_path[i + 1].as_u64(),
3466                        &query_ctx,
3467                    );
3468                }
3469                rels_builder.append(true);
3470                path_validity.push(true);
3471            }
3472
3473            // Finish builders and get ListArrays
3474            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3475            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3476
3477            // Build the path struct fields
3478            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3479            let rels_field = Arc::new(Field::new(
3480                "relationships",
3481                rels_array.data_type().clone(),
3482                true,
3483            ));
3484
3485            // Create the path struct array
3486            let path_struct = arrow_array::StructArray::try_new(
3487                vec![nodes_field, rels_field].into(),
3488                vec![nodes_array, rels_array],
3489                Some(arrow::buffer::NullBuffer::from(path_validity)),
3490            )
3491            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
3492
3493            if let Some(idx) = existing_path_col_idx {
3494                columns[idx] = Arc::new(path_struct);
3495            } else {
3496                columns.push(Arc::new(path_struct));
3497            }
3498        }
3499
3500        self.metrics.record_output(num_rows);
3501
3502        RecordBatch::try_new(self.schema.clone(), columns)
3503            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
3504    }
3505}
3506
3507impl RecordBatchStream for GraphVariableLengthTraverseStream {
3508    fn schema(&self) -> SchemaRef {
3509        self.schema.clone()
3510    }
3511}
3512
3513/// Hydrate target vertex properties into a VLP batch.
3514///
3515/// The base batch already has null placeholder columns for target properties.
3516/// This function replaces them with actual property values fetched from storage.
3517async fn hydrate_vlp_target_properties(
3518    base_batch: RecordBatch,
3519    schema: SchemaRef,
3520    target_variable: String,
3521    target_properties: Vec<String>,
3522    target_label_name: Option<String>,
3523    graph_ctx: Arc<GraphExecutionContext>,
3524) -> DFResult<RecordBatch> {
3525    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3526        return Ok(base_batch);
3527    }
3528
3529    // Find the target VID column by exact name.
3530    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3531    //
3532    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3533    // clauses referencing the same variable), there may be duplicate column names. We need
3534    // the LAST occurrence of target._vid, which is the one added by the VLP.
3535    let target_vid_col_name = format!("{}._vid", target_variable);
3536    let vid_col_idx = schema
3537        .fields()
3538        .iter()
3539        .enumerate()
3540        .rev()
3541        .find(|(_, f)| f.name() == &target_vid_col_name)
3542        .map(|(i, _)| i);
3543
3544    let Some(vid_col_idx) = vid_col_idx else {
3545        return Ok(base_batch);
3546    };
3547
3548    let vid_col = base_batch.column(vid_col_idx);
3549    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3550    let target_vid_array: &UInt64Array = &target_vid_cow;
3551
3552    let target_vids: Vec<Vid> = target_vid_array
3553        .iter()
3554        // Preserve null rows by mapping them to a sentinel VID that never resolves
3555        // to stored properties. The output property columns remain NULL for these rows.
3556        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3557        .collect();
3558
3559    // Fetch properties from storage
3560    let mut property_columns: Vec<ArrayRef> = Vec::new();
3561
3562    if let Some(ref label_name) = target_label_name {
3563        let property_manager = graph_ctx.property_manager();
3564        let query_ctx = graph_ctx.query_context();
3565
3566        let props_map = property_manager
3567            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3568            .await
3569            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3570
3571        let uni_schema = graph_ctx.storage().schema_manager().schema();
3572        let label_props = uni_schema.properties.get(label_name.as_str());
3573
3574        for prop_name in &target_properties {
3575            let data_type = resolve_property_type(prop_name, label_props);
3576            let column =
3577                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3578            property_columns.push(column);
3579        }
3580    } else {
3581        // No label name — use label-agnostic property lookup.
3582        // This scans all label datasets, slower but correct for label-less traversals.
3583        let non_internal_props: Vec<&str> = target_properties
3584            .iter()
3585            .filter(|p| *p != "_all_props")
3586            .map(|s| s.as_str())
3587            .collect();
3588        let property_manager = graph_ctx.property_manager();
3589        let query_ctx = graph_ctx.query_context();
3590
3591        let props_map = if !non_internal_props.is_empty() {
3592            property_manager
3593                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3594                .await
3595                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3596        } else {
3597            std::collections::HashMap::new()
3598        };
3599
3600        for prop_name in &target_properties {
3601            if prop_name == "_all_props" {
3602                // Build CypherValue blob from all vertex properties (L0 + storage)
3603                use crate::query::df_graph::scan::encode_cypher_value;
3604                use arrow_array::builder::LargeBinaryBuilder;
3605
3606                let mut builder = LargeBinaryBuilder::new();
3607                let l0_ctx = graph_ctx.l0_context();
3608                for vid in &target_vids {
3609                    let mut merged_props = serde_json::Map::new();
3610                    // Collect from storage-hydrated props
3611                    if let Some(vid_props) = props_map.get(vid) {
3612                        for (k, v) in vid_props.iter() {
3613                            let json_val: serde_json::Value = v.clone().into();
3614                            merged_props.insert(k.to_string(), json_val);
3615                        }
3616                    }
3617                    // Overlay L0 properties
3618                    for l0 in l0_ctx.iter_l0_buffers() {
3619                        let guard = l0.read();
3620                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3621                            for (k, v) in l0_props.iter() {
3622                                let json_val: serde_json::Value = v.clone().into();
3623                                merged_props.insert(k.to_string(), json_val);
3624                            }
3625                        }
3626                    }
3627                    if merged_props.is_empty() {
3628                        builder.append_null();
3629                    } else {
3630                        let json = serde_json::Value::Object(merged_props);
3631                        match encode_cypher_value(&json) {
3632                            Ok(bytes) => builder.append_value(bytes),
3633                            Err(_) => builder.append_null(),
3634                        }
3635                    }
3636                }
3637                property_columns.push(Arc::new(builder.finish()));
3638            } else {
3639                let column = build_property_column_static(
3640                    &target_vids,
3641                    &props_map,
3642                    prop_name,
3643                    &arrow::datatypes::DataType::LargeBinary,
3644                )?;
3645                property_columns.push(column);
3646            }
3647        }
3648    }
3649
3650    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3651    // Find each property column by name — works regardless of column ordering
3652    // (schema-aware puts props before _hop_count; schemaless puts them after).
3653    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3654    // not pre-existing input columns with the same name (duplicate variable binding).
3655    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3656    let mut prop_idx = 0;
3657    for (col_idx, field) in schema.fields().iter().enumerate() {
3658        let is_target_prop = col_idx > vid_col_idx
3659            && target_properties
3660                .iter()
3661                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3662        if is_target_prop && prop_idx < property_columns.len() {
3663            new_columns.push(property_columns[prop_idx].clone());
3664            prop_idx += 1;
3665        } else {
3666            new_columns.push(base_batch.column(col_idx).clone());
3667        }
3668    }
3669
3670    RecordBatch::try_new(schema, new_columns)
3671        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
3672}
3673
3674// ============================================================================
3675// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3676// ============================================================================
3677
3678/// Execution plan for variable-length path traversal on schemaless edge types.
3679///
3680/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3681/// that don't have schema-defined IDs. It queries the main edges table by type name.
3682/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3683pub struct GraphVariableLengthTraverseMainExec {
3684    /// Input execution plan.
3685    input: Arc<dyn ExecutionPlan>,
3686
3687    /// Column name containing source VIDs.
3688    source_column: String,
3689
3690    /// Edge type names (not IDs, since schemaless types may not have IDs).
3691    type_names: Vec<String>,
3692
3693    /// Traversal direction.
3694    direction: Direction,
3695
3696    /// Minimum number of hops.
3697    min_hops: usize,
3698
3699    /// Maximum number of hops.
3700    max_hops: usize,
3701
3702    /// Variable name for target vertex columns.
3703    target_variable: String,
3704
3705    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3706    step_variable: Option<String>,
3707
3708    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3709    path_variable: Option<String>,
3710
3711    /// Target vertex properties to materialize.
3712    target_properties: Vec<String>,
3713
3714    /// Whether this is an optional match (LEFT JOIN semantics).
3715    is_optional: bool,
3716
3717    /// Column name of an already-bound target VID (for patterns where target is in scope).
3718    bound_target_column: Option<String>,
3719
3720    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3721    edge_lance_filter: Option<String>,
3722
3723    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
3724    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
3725    edge_property_conditions: Vec<(String, UniValue)>,
3726
3727    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
3728    used_edge_columns: Vec<String>,
3729
3730    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
3731    path_mode: super::nfa::PathMode,
3732
3733    /// Output mode determining BFS strategy.
3734    output_mode: super::nfa::VlpOutputMode,
3735
3736    /// Graph execution context.
3737    graph_ctx: Arc<GraphExecutionContext>,
3738
3739    /// Output schema.
3740    schema: SchemaRef,
3741
3742    /// Cached plan properties.
3743    properties: PlanProperties,
3744
3745    /// Execution metrics.
3746    metrics: ExecutionPlanMetricsSet,
3747}
3748
3749impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3750    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3751        f.debug_struct("GraphVariableLengthTraverseMainExec")
3752            .field("source_column", &self.source_column)
3753            .field("type_names", &self.type_names)
3754            .field("direction", &self.direction)
3755            .field("min_hops", &self.min_hops)
3756            .field("max_hops", &self.max_hops)
3757            .field("target_variable", &self.target_variable)
3758            .finish()
3759    }
3760}
3761
3762impl GraphVariableLengthTraverseMainExec {
3763    /// Create a new variable-length traversal plan for schemaless edges.
3764    #[expect(clippy::too_many_arguments)]
3765    pub fn new(
3766        input: Arc<dyn ExecutionPlan>,
3767        source_column: impl Into<String>,
3768        type_names: Vec<String>,
3769        direction: Direction,
3770        min_hops: usize,
3771        max_hops: usize,
3772        target_variable: impl Into<String>,
3773        step_variable: Option<String>,
3774        path_variable: Option<String>,
3775        target_properties: Vec<String>,
3776        graph_ctx: Arc<GraphExecutionContext>,
3777        is_optional: bool,
3778        bound_target_column: Option<String>,
3779        edge_lance_filter: Option<String>,
3780        edge_property_conditions: Vec<(String, UniValue)>,
3781        used_edge_columns: Vec<String>,
3782        path_mode: super::nfa::PathMode,
3783        output_mode: super::nfa::VlpOutputMode,
3784    ) -> Self {
3785        let source_column = source_column.into();
3786        let target_variable = target_variable.into();
3787
3788        // Build output schema
3789        let schema = Self::build_schema(
3790            input.schema(),
3791            &target_variable,
3792            step_variable.as_deref(),
3793            path_variable.as_deref(),
3794            &target_properties,
3795        );
3796        let properties = compute_plan_properties(schema.clone());
3797
3798        Self {
3799            input,
3800            source_column,
3801            type_names,
3802            direction,
3803            min_hops,
3804            max_hops,
3805            target_variable,
3806            step_variable,
3807            path_variable,
3808            target_properties,
3809            is_optional,
3810            bound_target_column,
3811            edge_lance_filter,
3812            edge_property_conditions,
3813            used_edge_columns,
3814            path_mode,
3815            output_mode,
3816            graph_ctx,
3817            schema,
3818            properties,
3819            metrics: ExecutionPlanMetricsSet::new(),
3820        }
3821    }
3822
3823    /// Build output schema.
3824    fn build_schema(
3825        input_schema: SchemaRef,
3826        target_variable: &str,
3827        step_variable: Option<&str>,
3828        path_variable: Option<&str>,
3829        target_properties: &[String],
3830    ) -> SchemaRef {
3831        let mut fields: Vec<Field> = input_schema
3832            .fields()
3833            .iter()
3834            .map(|f| f.as_ref().clone())
3835            .collect();
3836
3837        // Add target VID column (only if not already in input)
3838        let target_vid_name = format!("{}._vid", target_variable);
3839        if input_schema.column_with_name(&target_vid_name).is_none() {
3840            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3841        }
3842
3843        // Add target ._labels column (only if not already in input)
3844        let target_labels_name = format!("{}._labels", target_variable);
3845        if input_schema.column_with_name(&target_labels_name).is_none() {
3846            fields.push(Field::new(target_labels_name, labels_data_type(), true));
3847        }
3848
3849        // Add hop count
3850        fields.push(Field::new("_hop_count", DataType::UInt64, false));
3851
3852        // Add step variable column (list of edge structs) if bound
3853        // This is the relationship variable like `r` in `[r*1..3]`
3854        if let Some(step_var) = step_variable {
3855            fields.push(build_edge_list_field(step_var));
3856        }
3857
3858        // Add path struct if bound (only if not already in input from prior BindFixedPath)
3859        if let Some(path_var) = path_variable
3860            && input_schema.column_with_name(path_var).is_none()
3861        {
3862            fields.push(build_path_struct_field(path_var));
3863        }
3864
3865        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
3866        // Skip properties that are already in the input schema
3867        for prop in target_properties {
3868            let prop_name = format!("{}.{}", target_variable, prop);
3869            if input_schema.column_with_name(&prop_name).is_none() {
3870                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3871            }
3872        }
3873
3874        Arc::new(Schema::new(fields))
3875    }
3876}
3877
3878impl DisplayAs for GraphVariableLengthTraverseMainExec {
3879    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3880        write!(
3881            f,
3882            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3883            self.source_column, self.type_names, self.min_hops, self.max_hops
3884        )
3885    }
3886}
3887
3888impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3889    fn name(&self) -> &str {
3890        "GraphVariableLengthTraverseMainExec"
3891    }
3892
3893    fn as_any(&self) -> &dyn Any {
3894        self
3895    }
3896
3897    fn schema(&self) -> SchemaRef {
3898        self.schema.clone()
3899    }
3900
3901    fn properties(&self) -> &PlanProperties {
3902        &self.properties
3903    }
3904
3905    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3906        vec![&self.input]
3907    }
3908
3909    fn with_new_children(
3910        self: Arc<Self>,
3911        children: Vec<Arc<dyn ExecutionPlan>>,
3912    ) -> DFResult<Arc<dyn ExecutionPlan>> {
3913        if children.len() != 1 {
3914            return Err(datafusion::error::DataFusionError::Plan(
3915                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3916            ));
3917        }
3918
3919        Ok(Arc::new(Self::new(
3920            children[0].clone(),
3921            self.source_column.clone(),
3922            self.type_names.clone(),
3923            self.direction,
3924            self.min_hops,
3925            self.max_hops,
3926            self.target_variable.clone(),
3927            self.step_variable.clone(),
3928            self.path_variable.clone(),
3929            self.target_properties.clone(),
3930            self.graph_ctx.clone(),
3931            self.is_optional,
3932            self.bound_target_column.clone(),
3933            self.edge_lance_filter.clone(),
3934            self.edge_property_conditions.clone(),
3935            self.used_edge_columns.clone(),
3936            self.path_mode.clone(),
3937            self.output_mode.clone(),
3938        )))
3939    }
3940
3941    fn execute(
3942        &self,
3943        partition: usize,
3944        context: Arc<TaskContext>,
3945    ) -> DFResult<SendableRecordBatchStream> {
3946        let input_stream = self.input.execute(partition, context)?;
3947        let metrics = BaselineMetrics::new(&self.metrics, partition);
3948
3949        // Build adjacency map from main edges table (async)
3950        let graph_ctx = self.graph_ctx.clone();
3951        let type_names = self.type_names.clone();
3952        let direction = self.direction;
3953        let load_fut =
3954            async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3955
3956        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3957            input: input_stream,
3958            source_column: self.source_column.clone(),
3959            type_names: self.type_names.clone(),
3960            direction: self.direction,
3961            min_hops: self.min_hops,
3962            max_hops: self.max_hops,
3963            target_variable: self.target_variable.clone(),
3964            step_variable: self.step_variable.clone(),
3965            path_variable: self.path_variable.clone(),
3966            target_properties: self.target_properties.clone(),
3967            graph_ctx: self.graph_ctx.clone(),
3968            is_optional: self.is_optional,
3969            bound_target_column: self.bound_target_column.clone(),
3970            edge_lance_filter: self.edge_lance_filter.clone(),
3971            edge_property_conditions: self.edge_property_conditions.clone(),
3972            used_edge_columns: self.used_edge_columns.clone(),
3973            path_mode: self.path_mode.clone(),
3974            output_mode: self.output_mode.clone(),
3975            schema: self.schema.clone(),
3976            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3977            metrics,
3978        }))
3979    }
3980
3981    fn metrics(&self) -> Option<MetricsSet> {
3982        Some(self.metrics.clone_inner())
3983    }
3984}
3985
3986/// State machine for VLP schemaless stream.
3987enum VarLengthMainStreamState {
3988    /// Loading adjacency map from main edges table.
3989    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3990    /// Processing input batches with loaded adjacency.
3991    Processing(EdgeAdjacencyMap),
3992    /// Materializing properties for a batch.
3993    Materializing {
3994        adjacency: EdgeAdjacencyMap,
3995        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3996    },
3997    /// Stream is done.
3998    Done,
3999}
4000
4001/// Stream for variable-length traversal on schemaless edges.
4002#[allow(dead_code)] // VLP fields used in Phase 3
4003struct GraphVariableLengthTraverseMainStream {
4004    input: SendableRecordBatchStream,
4005    source_column: String,
4006    type_names: Vec<String>,
4007    direction: Direction,
4008    min_hops: usize,
4009    max_hops: usize,
4010    target_variable: String,
4011    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4012    step_variable: Option<String>,
4013    path_variable: Option<String>,
4014    target_properties: Vec<String>,
4015    graph_ctx: Arc<GraphExecutionContext>,
4016    is_optional: bool,
4017    bound_target_column: Option<String>,
4018    edge_lance_filter: Option<String>,
4019    /// Edge property conditions to check during BFS.
4020    edge_property_conditions: Vec<(String, UniValue)>,
4021    used_edge_columns: Vec<String>,
4022    path_mode: super::nfa::PathMode,
4023    output_mode: super::nfa::VlpOutputMode,
4024    schema: SchemaRef,
4025    state: VarLengthMainStreamState,
4026    metrics: BaselineMetrics,
4027}
4028
4029/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4030type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4031
4032impl GraphVariableLengthTraverseMainStream {
4033    /// Perform BFS from a source vertex using the adjacency map.
4034    ///
4035    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4036    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4037    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4038    fn bfs(
4039        &self,
4040        source: Vid,
4041        adjacency: &EdgeAdjacencyMap,
4042        used_eids: &FxHashSet<u64>,
4043    ) -> Vec<MainBfsResult> {
4044        let mut results = Vec::new();
4045        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4046
4047        queue.push_back((source, 0, vec![source], vec![]));
4048
4049        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4050            // Emit result if within hop range (including zero-length patterns)
4051            if depth >= self.min_hops && depth <= self.max_hops {
4052                results.push((current, depth, node_path.clone(), edge_path.clone()));
4053            }
4054
4055            // Stop if at max depth
4056            if depth >= self.max_hops {
4057                continue;
4058            }
4059
4060            // Get neighbors from adjacency map
4061            if let Some(neighbors) = adjacency.get(&current) {
4062                let is_undirected = matches!(self.direction, Direction::Both);
4063                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4064
4065                for (neighbor, eid, _edge_type, props) in neighbors {
4066                    // Deduplicate edges for undirected patterns
4067                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4068                        continue;
4069                    }
4070
4071                    // Enforce relationship uniqueness per-path (Cypher semantics).
4072                    if edge_path.contains(eid) {
4073                        continue;
4074                    }
4075
4076                    // Enforce cross-pattern relationship uniqueness: skip edges
4077                    // already bound by earlier pattern elements in the same MATCH.
4078                    if used_eids.contains(&eid.as_u64()) {
4079                        continue;
4080                    }
4081
4082                    // Check edge property conditions (e.g., {year: 1988}).
4083                    if !self.edge_property_conditions.is_empty() {
4084                        let passes =
4085                            self.edge_property_conditions
4086                                .iter()
4087                                .all(|(name, expected)| {
4088                                    props.get(name).is_some_and(|actual| actual == expected)
4089                                });
4090                        if !passes {
4091                            continue;
4092                        }
4093                    }
4094
4095                    let mut new_node_path = node_path.clone();
4096                    new_node_path.push(*neighbor);
4097                    let mut new_edge_path = edge_path.clone();
4098                    new_edge_path.push(*eid);
4099                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4100                }
4101            }
4102        }
4103
4104        results
4105    }
4106
4107    /// Process a batch using the adjacency map.
4108    fn process_batch(
4109        &self,
4110        batch: RecordBatch,
4111        adjacency: &EdgeAdjacencyMap,
4112    ) -> DFResult<RecordBatch> {
4113        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4114            datafusion::error::DataFusionError::Execution(format!(
4115                "Source column '{}' not found in input batch",
4116                self.source_column
4117            ))
4118        })?;
4119
4120        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4121        let source_vids: &UInt64Array = &source_vid_cow;
4122
4123        // Read bound target VIDs if column exists
4124        let bound_target_cow = self
4125            .bound_target_column
4126            .as_ref()
4127            .and_then(|col| batch.column_by_name(col))
4128            .map(|c| column_as_vid_array(c.as_ref()))
4129            .transpose()?;
4130        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4131
4132        // Extract used edge columns for cross-pattern relationship uniqueness
4133        let used_edge_arrays: Vec<&UInt64Array> = self
4134            .used_edge_columns
4135            .iter()
4136            .filter_map(|col| {
4137                batch
4138                    .column_by_name(col)?
4139                    .as_any()
4140                    .downcast_ref::<UInt64Array>()
4141            })
4142            .collect();
4143
4144        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4145        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4146
4147        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4148            let mut emitted_for_row = false;
4149
4150            if let Some(source_u64) = source_opt {
4151                let source = Vid::from(source_u64);
4152
4153                // Collect used edge IDs from previous hops for this row
4154                let used_eids: FxHashSet<u64> = used_edge_arrays
4155                    .iter()
4156                    .filter_map(|arr| {
4157                        if arr.is_null(row_idx) {
4158                            None
4159                        } else {
4160                            Some(arr.value(row_idx))
4161                        }
4162                    })
4163                    .collect();
4164
4165                let bfs_results = self.bfs(source, adjacency, &used_eids);
4166
4167                for (target, hops, node_path, edge_path) in bfs_results {
4168                    // Filter by bound target VID if set (for patterns where target is in scope).
4169                    // NULL bound targets do not match anything.
4170                    if let Some(targets) = expected_targets {
4171                        if targets.is_null(row_idx) {
4172                            continue;
4173                        }
4174                        let expected_vid = targets.value(row_idx);
4175                        if target.as_u64() != expected_vid {
4176                            continue;
4177                        }
4178                    }
4179
4180                    expansions.push((row_idx, target, hops, node_path, edge_path));
4181                    emitted_for_row = true;
4182                }
4183            }
4184
4185            if self.is_optional && !emitted_for_row {
4186                // Preserve source row with NULL optional bindings.
4187                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4188            }
4189        }
4190
4191        if expansions.is_empty() {
4192            if self.is_optional {
4193                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4194                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4195            }
4196            return Ok(RecordBatch::new_empty(self.schema.clone()));
4197        }
4198
4199        let num_rows = expansions.len();
4200        self.metrics.record_output(num_rows);
4201
4202        // Build output columns
4203        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4204
4205        // Expand input columns
4206        for col_idx in 0..batch.num_columns() {
4207            let array = batch.column(col_idx);
4208            let indices: Vec<u64> = expansions
4209                .iter()
4210                .map(|(idx, _, _, _, _)| *idx as u64)
4211                .collect();
4212            let take_indices = UInt64Array::from(indices);
4213            let expanded = arrow::compute::take(array, &take_indices, None)?;
4214            columns.push(expanded);
4215        }
4216
4217        // Add target VID column (only if not already in input)
4218        let target_vid_name = format!("{}._vid", self.target_variable);
4219        if batch.schema().column_with_name(&target_vid_name).is_none() {
4220            let target_vids: Vec<Option<u64>> = expansions
4221                .iter()
4222                .map(|(_, vid, _, node_path, edge_path)| {
4223                    if node_path.is_empty() && edge_path.is_empty() {
4224                        None
4225                    } else {
4226                        Some(vid.as_u64())
4227                    }
4228                })
4229                .collect();
4230            columns.push(Arc::new(UInt64Array::from(target_vids)));
4231        }
4232
4233        // Add target ._labels column (only if not already in input)
4234        let target_labels_name = format!("{}._labels", self.target_variable);
4235        if batch
4236            .schema()
4237            .column_with_name(&target_labels_name)
4238            .is_none()
4239        {
4240            use arrow_array::builder::{ListBuilder, StringBuilder};
4241            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4242            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4243                if node_path.is_empty() && edge_path.is_empty() {
4244                    labels_builder.append(false);
4245                    continue;
4246                }
4247                let mut row_labels: Vec<String> = Vec::new();
4248                let labels =
4249                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4250                for lbl in &labels {
4251                    if !row_labels.contains(lbl) {
4252                        row_labels.push(lbl.clone());
4253                    }
4254                }
4255                let values = labels_builder.values();
4256                for lbl in &row_labels {
4257                    values.append_value(lbl);
4258                }
4259                labels_builder.append(true);
4260            }
4261            columns.push(Arc::new(labels_builder.finish()));
4262        }
4263
4264        // Add hop count column
4265        let hop_counts: Vec<u64> = expansions
4266            .iter()
4267            .map(|(_, _, hops, _, _)| *hops as u64)
4268            .collect();
4269        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4270
4271        // Add step variable column if bound (list of edge structs).
4272        if self.step_variable.is_some() {
4273            let mut edges_builder = new_edge_list_builder();
4274            let query_ctx = self.graph_ctx.query_context();
4275            let type_names_str = self.type_names.join("|");
4276
4277            for (_, _, _, node_path, edge_path) in expansions.iter() {
4278                if node_path.is_empty() && edge_path.is_empty() {
4279                    edges_builder.append_null();
4280                } else if edge_path.is_empty() {
4281                    // Zero-hop match: empty list.
4282                    edges_builder.append(true);
4283                } else {
4284                    for (i, eid) in edge_path.iter().enumerate() {
4285                        append_edge_to_struct(
4286                            edges_builder.values(),
4287                            *eid,
4288                            &type_names_str,
4289                            node_path[i].as_u64(),
4290                            node_path[i + 1].as_u64(),
4291                            &query_ctx,
4292                        );
4293                    }
4294                    edges_builder.append(true);
4295                }
4296            }
4297
4298            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4299        }
4300
4301        // Add path variable column if bound.
4302        // If a path column already exists in input (from a prior BindFixedPath), extend it
4303        // rather than building from scratch.
4304        if let Some(path_var_name) = &self.path_variable {
4305            let existing_path_col_idx = batch
4306                .schema()
4307                .column_with_name(path_var_name)
4308                .map(|(idx, _)| idx);
4309            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4310            let existing_path = existing_path_arc
4311                .as_ref()
4312                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4313
4314            let mut nodes_builder = new_node_list_builder();
4315            let mut rels_builder = new_edge_list_builder();
4316            let query_ctx = self.graph_ctx.query_context();
4317            let type_names_str = self.type_names.join("|");
4318            let mut path_validity = Vec::with_capacity(expansions.len());
4319
4320            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4321                if node_path.is_empty() && edge_path.is_empty() {
4322                    nodes_builder.append(false);
4323                    rels_builder.append(false);
4324                    path_validity.push(false);
4325                    continue;
4326                }
4327
4328                // Prepend existing path prefix if extending
4329                let skip_first_vlp_node = if let Some(existing) = existing_path {
4330                    if !existing.is_null(row_out_idx) {
4331                        prepend_existing_path(
4332                            existing,
4333                            row_out_idx,
4334                            &mut nodes_builder,
4335                            &mut rels_builder,
4336                            &query_ctx,
4337                        );
4338                        true
4339                    } else {
4340                        false
4341                    }
4342                } else {
4343                    false
4344                };
4345
4346                // Append VLP nodes (skip first if extending — it's the junction point)
4347                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4348                for vid in &node_path[start_idx..] {
4349                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4350                }
4351                nodes_builder.append(true);
4352
4353                for (i, eid) in edge_path.iter().enumerate() {
4354                    append_edge_to_struct(
4355                        rels_builder.values(),
4356                        *eid,
4357                        &type_names_str,
4358                        node_path[i].as_u64(),
4359                        node_path[i + 1].as_u64(),
4360                        &query_ctx,
4361                    );
4362                }
4363                rels_builder.append(true);
4364                path_validity.push(true);
4365            }
4366
4367            // Finish the builders to get the arrays
4368            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4369            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4370
4371            // Build the path struct with nodes and relationships fields
4372            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4373            let rels_field = Arc::new(Field::new(
4374                "relationships",
4375                rels_array.data_type().clone(),
4376                true,
4377            ));
4378
4379            // Create the path struct array
4380            let path_struct = arrow_array::StructArray::try_new(
4381                vec![nodes_field, rels_field].into(),
4382                vec![nodes_array, rels_array],
4383                Some(arrow::buffer::NullBuffer::from(path_validity)),
4384            )
4385            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
4386
4387            if let Some(idx) = existing_path_col_idx {
4388                columns[idx] = Arc::new(path_struct);
4389            } else {
4390                columns.push(Arc::new(path_struct));
4391            }
4392        }
4393
4394        // Add target property columns as NULL for now (skip if already in input).
4395        // Property hydration happens via PropertyManager in the query execution pipeline.
4396        for prop_name in &self.target_properties {
4397            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4398            if batch.schema().column_with_name(&full_prop_name).is_none() {
4399                columns.push(arrow_array::new_null_array(
4400                    &DataType::LargeBinary,
4401                    num_rows,
4402                ));
4403            }
4404        }
4405
4406        RecordBatch::try_new(self.schema.clone(), columns)
4407            .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
4408    }
4409}
4410
4411impl Stream for GraphVariableLengthTraverseMainStream {
4412    type Item = DFResult<RecordBatch>;
4413
4414    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4415        loop {
4416            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4417
4418            match state {
4419                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4420                    Poll::Ready(Ok(adjacency)) => {
4421                        self.state = VarLengthMainStreamState::Processing(adjacency);
4422                        // Continue loop to start processing
4423                    }
4424                    Poll::Ready(Err(e)) => {
4425                        self.state = VarLengthMainStreamState::Done;
4426                        return Poll::Ready(Some(Err(e)));
4427                    }
4428                    Poll::Pending => {
4429                        self.state = VarLengthMainStreamState::Loading(fut);
4430                        return Poll::Pending;
4431                    }
4432                },
4433                VarLengthMainStreamState::Processing(adjacency) => {
4434                    match self.input.poll_next_unpin(cx) {
4435                        Poll::Ready(Some(Ok(batch))) => {
4436                            let base_batch = match self.process_batch(batch, &adjacency) {
4437                                Ok(b) => b,
4438                                Err(e) => {
4439                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4440                                    return Poll::Ready(Some(Err(e)));
4441                                }
4442                            };
4443
4444                            // If no properties need async hydration, return directly
4445                            if self.target_properties.is_empty() {
4446                                self.state = VarLengthMainStreamState::Processing(adjacency);
4447                                return Poll::Ready(Some(Ok(base_batch)));
4448                            }
4449
4450                            // Create async hydration future
4451                            let schema = self.schema.clone();
4452                            let target_variable = self.target_variable.clone();
4453                            let target_properties = self.target_properties.clone();
4454                            let graph_ctx = self.graph_ctx.clone();
4455
4456                            let fut = hydrate_vlp_target_properties(
4457                                base_batch,
4458                                schema,
4459                                target_variable,
4460                                target_properties,
4461                                None, // schemaless — no label name
4462                                graph_ctx,
4463                            );
4464
4465                            self.state = VarLengthMainStreamState::Materializing {
4466                                adjacency,
4467                                fut: Box::pin(fut),
4468                            };
4469                            // Continue loop to poll the future
4470                        }
4471                        Poll::Ready(Some(Err(e))) => {
4472                            self.state = VarLengthMainStreamState::Done;
4473                            return Poll::Ready(Some(Err(e)));
4474                        }
4475                        Poll::Ready(None) => {
4476                            self.state = VarLengthMainStreamState::Done;
4477                            return Poll::Ready(None);
4478                        }
4479                        Poll::Pending => {
4480                            self.state = VarLengthMainStreamState::Processing(adjacency);
4481                            return Poll::Pending;
4482                        }
4483                    }
4484                }
4485                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4486                    match fut.as_mut().poll(cx) {
4487                        Poll::Ready(Ok(batch)) => {
4488                            self.state = VarLengthMainStreamState::Processing(adjacency);
4489                            return Poll::Ready(Some(Ok(batch)));
4490                        }
4491                        Poll::Ready(Err(e)) => {
4492                            self.state = VarLengthMainStreamState::Done;
4493                            return Poll::Ready(Some(Err(e)));
4494                        }
4495                        Poll::Pending => {
4496                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4497                            return Poll::Pending;
4498                        }
4499                    }
4500                }
4501                VarLengthMainStreamState::Done => {
4502                    return Poll::Ready(None);
4503                }
4504            }
4505        }
4506    }
4507}
4508
4509impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4510    fn schema(&self) -> SchemaRef {
4511        self.schema.clone()
4512    }
4513}
4514
4515#[cfg(test)]
4516mod tests {
4517    use super::*;
4518
4519    #[test]
4520    fn test_traverse_schema_without_edge() {
4521        let input_schema = Arc::new(Schema::new(vec![Field::new(
4522            "a._vid",
4523            DataType::UInt64,
4524            false,
4525        )]));
4526
4527        let output_schema =
4528            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4529
4530        // Schema: input + target VID + target _labels + internal edge ID
4531        assert_eq!(output_schema.fields().len(), 4);
4532        assert_eq!(output_schema.field(0).name(), "a._vid");
4533        assert_eq!(output_schema.field(1).name(), "m._vid");
4534        assert_eq!(output_schema.field(2).name(), "m._labels");
4535        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4536    }
4537
4538    #[test]
4539    fn test_traverse_schema_with_edge() {
4540        let input_schema = Arc::new(Schema::new(vec![Field::new(
4541            "a._vid",
4542            DataType::UInt64,
4543            false,
4544        )]));
4545
4546        let output_schema = GraphTraverseExec::build_schema(
4547            input_schema,
4548            "m",
4549            Some("r"),
4550            &[],
4551            &[],
4552            None,
4553            None,
4554            false,
4555        );
4556
4557        // Schema: input + target VID + target _labels + edge EID + edge _type
4558        assert_eq!(output_schema.fields().len(), 5);
4559        assert_eq!(output_schema.field(0).name(), "a._vid");
4560        assert_eq!(output_schema.field(1).name(), "m._vid");
4561        assert_eq!(output_schema.field(2).name(), "m._labels");
4562        assert_eq!(output_schema.field(3).name(), "r._eid");
4563        assert_eq!(output_schema.field(4).name(), "r._type");
4564    }
4565
4566    #[test]
4567    fn test_traverse_schema_with_target_properties() {
4568        let input_schema = Arc::new(Schema::new(vec![Field::new(
4569            "a._vid",
4570            DataType::UInt64,
4571            false,
4572        )]));
4573
4574        let target_props = vec!["name".to_string(), "age".to_string()];
4575        let output_schema = GraphTraverseExec::build_schema(
4576            input_schema,
4577            "m",
4578            Some("r"),
4579            &[],
4580            &target_props,
4581            None,
4582            None,
4583            false,
4584        );
4585
4586        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4587        assert_eq!(output_schema.fields().len(), 7);
4588        assert_eq!(output_schema.field(0).name(), "a._vid");
4589        assert_eq!(output_schema.field(1).name(), "m._vid");
4590        assert_eq!(output_schema.field(2).name(), "m._labels");
4591        assert_eq!(output_schema.field(3).name(), "m.name");
4592        assert_eq!(output_schema.field(4).name(), "m.age");
4593        assert_eq!(output_schema.field(5).name(), "r._eid");
4594        assert_eq!(output_schema.field(6).name(), "r._type");
4595    }
4596
4597    #[test]
4598    fn test_variable_length_schema() {
4599        let input_schema = Arc::new(Schema::new(vec![Field::new(
4600            "a._vid",
4601            DataType::UInt64,
4602            false,
4603        )]));
4604
4605        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4606            input_schema,
4607            "b",
4608            None,
4609            Some("p"),
4610            &[],
4611            None,
4612        );
4613
4614        assert_eq!(output_schema.fields().len(), 5);
4615        assert_eq!(output_schema.field(0).name(), "a._vid");
4616        assert_eq!(output_schema.field(1).name(), "b._vid");
4617        assert_eq!(output_schema.field(2).name(), "b._labels");
4618        assert_eq!(output_schema.field(3).name(), "_hop_count");
4619        assert_eq!(output_schema.field(4).name(), "p");
4620    }
4621
4622    #[test]
4623    fn test_traverse_main_schema_without_edge() {
4624        let input_schema = Arc::new(Schema::new(vec![Field::new(
4625            "a._vid",
4626            DataType::UInt64,
4627            false,
4628        )]));
4629
4630        let output_schema =
4631            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4632
4633        // a._vid, m._vid, m._labels, __eid_to_m
4634        assert_eq!(output_schema.fields().len(), 4);
4635        assert_eq!(output_schema.field(0).name(), "a._vid");
4636        assert_eq!(output_schema.field(1).name(), "m._vid");
4637        assert_eq!(output_schema.field(2).name(), "m._labels");
4638        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4639    }
4640
4641    #[test]
4642    fn test_traverse_main_schema_with_edge() {
4643        let input_schema = Arc::new(Schema::new(vec![Field::new(
4644            "a._vid",
4645            DataType::UInt64,
4646            false,
4647        )]));
4648
4649        let output_schema = GraphTraverseMainExec::build_schema(
4650            &input_schema,
4651            "m",
4652            &Some("r".to_string()),
4653            &[],
4654            &[],
4655            false,
4656        );
4657
4658        // a._vid, m._vid, m._labels, r._eid, r._type
4659        assert_eq!(output_schema.fields().len(), 5);
4660        assert_eq!(output_schema.field(0).name(), "a._vid");
4661        assert_eq!(output_schema.field(1).name(), "m._vid");
4662        assert_eq!(output_schema.field(2).name(), "m._labels");
4663        assert_eq!(output_schema.field(3).name(), "r._eid");
4664        assert_eq!(output_schema.field(4).name(), "r._type");
4665    }
4666
4667    #[test]
4668    fn test_traverse_main_schema_with_edge_properties() {
4669        let input_schema = Arc::new(Schema::new(vec![Field::new(
4670            "a._vid",
4671            DataType::UInt64,
4672            false,
4673        )]));
4674
4675        let edge_props = vec!["weight".to_string(), "since".to_string()];
4676        let output_schema = GraphTraverseMainExec::build_schema(
4677            &input_schema,
4678            "m",
4679            &Some("r".to_string()),
4680            &edge_props,
4681            &[],
4682            false,
4683        );
4684
4685        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4686        assert_eq!(output_schema.fields().len(), 7);
4687        assert_eq!(output_schema.field(0).name(), "a._vid");
4688        assert_eq!(output_schema.field(1).name(), "m._vid");
4689        assert_eq!(output_schema.field(2).name(), "m._labels");
4690        assert_eq!(output_schema.field(3).name(), "r._eid");
4691        assert_eq!(output_schema.field(4).name(), "r._type");
4692        assert_eq!(output_schema.field(5).name(), "r.weight");
4693        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4694        assert_eq!(output_schema.field(6).name(), "r.since");
4695        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4696    }
4697
4698    #[test]
4699    fn test_traverse_main_schema_with_target_properties() {
4700        let input_schema = Arc::new(Schema::new(vec![Field::new(
4701            "a._vid",
4702            DataType::UInt64,
4703            false,
4704        )]));
4705
4706        let target_props = vec!["name".to_string(), "age".to_string()];
4707        let output_schema = GraphTraverseMainExec::build_schema(
4708            &input_schema,
4709            "m",
4710            &Some("r".to_string()),
4711            &[],
4712            &target_props,
4713            false,
4714        );
4715
4716        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
4717        assert_eq!(output_schema.fields().len(), 7);
4718        assert_eq!(output_schema.field(0).name(), "a._vid");
4719        assert_eq!(output_schema.field(1).name(), "m._vid");
4720        assert_eq!(output_schema.field(2).name(), "m._labels");
4721        assert_eq!(output_schema.field(3).name(), "r._eid");
4722        assert_eq!(output_schema.field(4).name(), "r._type");
4723        assert_eq!(output_schema.field(5).name(), "m.name");
4724        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4725        assert_eq!(output_schema.field(6).name(), "m.age");
4726        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4727    }
4728}