Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62/// BFS result: (target_vid, hop_count, node_path, edge_path)
63type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
66type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68/// Prepend nodes and edges from an existing path struct column into builders.
69///
70/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
71/// Reads the nodes and relationships from the existing path at `row_idx` and appends
72/// them to the provided builders. The caller should then skip the first VLP node
73/// (which is the junction point already present in the existing path).
74fn prepend_existing_path(
75    existing_path: &arrow_array::StructArray,
76    row_idx: usize,
77    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79    query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81    // Read existing nodes
82    let nodes_list = existing_path
83        .column(0)
84        .as_any()
85        .downcast_ref::<arrow_array::ListArray>()
86        .unwrap();
87    let node_values = nodes_list.value(row_idx);
88    let node_struct = node_values
89        .as_any()
90        .downcast_ref::<arrow_array::StructArray>()
91        .unwrap();
92    let vid_col = node_struct
93        .column(0)
94        .as_any()
95        .downcast_ref::<UInt64Array>()
96        .unwrap();
97    for i in 0..vid_col.len() {
98        append_node_to_struct(
99            nodes_builder.values(),
100            Vid::from(vid_col.value(i)),
101            query_ctx,
102        );
103    }
104
105    // Read existing edges
106    let rels_list = existing_path
107        .column(1)
108        .as_any()
109        .downcast_ref::<arrow_array::ListArray>()
110        .unwrap();
111    let edge_values = rels_list.value(row_idx);
112    let edge_struct = edge_values
113        .as_any()
114        .downcast_ref::<arrow_array::StructArray>()
115        .unwrap();
116    let eid_col = edge_struct
117        .column(0)
118        .as_any()
119        .downcast_ref::<UInt64Array>()
120        .unwrap();
121    let type_col = edge_struct
122        .column(1)
123        .as_any()
124        .downcast_ref::<arrow_array::StringArray>()
125        .unwrap();
126    let src_col = edge_struct
127        .column(2)
128        .as_any()
129        .downcast_ref::<UInt64Array>()
130        .unwrap();
131    let dst_col = edge_struct
132        .column(3)
133        .as_any()
134        .downcast_ref::<UInt64Array>()
135        .unwrap();
136    for i in 0..eid_col.len() {
137        append_edge_to_struct(
138            rels_builder.values(),
139            Eid::from(eid_col.value(i)),
140            type_col.value(i),
141            src_col.value(i),
142            dst_col.value(i),
143            query_ctx,
144        );
145    }
146}
147
148/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
149/// schemaless properties. Unlike vertex properties, schemaless edge properties must
150/// preserve original JSON value types (int, float, etc.) since edge types commonly
151/// lack explicit property definitions.
152fn resolve_edge_property_type(
153    prop: &str,
154    schema_props: Option<
155        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156    >,
157) -> DataType {
158    if prop == "overflow_json" {
159        DataType::LargeBinary
160    } else if prop == "_created_at" || prop == "_updated_at" {
161        // System-managed timestamps surfaced via `created_at(r)` /
162        // `updated_at(r)`. Stored on every edge by the L0 buffer and
163        // the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
164        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
165    } else {
166        schema_props
167            .and_then(|props| props.get(prop))
168            .map(|meta| meta.r#type.to_arrow())
169            .unwrap_or(DataType::LargeBinary)
170    }
171}
172
173use crate::query::df_graph::common::merged_edge_schema_props;
174
175/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
176type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
177
178/// Single-hop graph traversal execution plan.
179///
180/// Expands each input row by traversing edges to find neighbors.
181/// For each (source, edge, target) triple, produces one output row
182/// containing the input columns plus target vertex and edge columns.
183///
184/// # Example
185///
186/// ```ignore
187/// // Input: batch with _vid column
188/// // Traverse KNOWS edges outgoing
189/// let traverse = GraphTraverseExec::new(
190///     input_plan,
191///     "_vid",
192///     vec![knows_type_id],
193///     Direction::Outgoing,
194///     "m",           // target variable
195///     Some("r"),     // edge variable
196///     None,          // no target label filter
197///     graph_ctx,
198/// );
199///
200/// // Output: input columns + m._vid + r._eid
201/// ```
202pub struct GraphTraverseExec {
203    /// Input execution plan.
204    input: Arc<dyn ExecutionPlan>,
205
206    /// Column name containing source VIDs.
207    source_column: String,
208
209    /// Edge type IDs to traverse.
210    edge_type_ids: Vec<u32>,
211
212    /// Traversal direction.
213    direction: Direction,
214
215    /// Variable name for target vertex columns.
216    target_variable: String,
217
218    /// Variable name for edge columns (if edge is bound).
219    edge_variable: Option<String>,
220
221    /// Edge properties to materialize (for pushdown hydration).
222    edge_properties: Vec<String>,
223
224    /// Target vertex properties to materialize.
225    target_properties: Vec<String>,
226
227    /// Target label name for property type resolution.
228    target_label_name: Option<String>,
229
230    /// Optional target label filter.
231    target_label_id: Option<u16>,
232
233    /// Graph execution context.
234    graph_ctx: Arc<GraphExecutionContext>,
235
236    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
237    optional: bool,
238
239    /// Variables introduced by the OPTIONAL MATCH pattern.
240    /// Used to determine which columns should be null-extended on failure.
241    optional_pattern_vars: HashSet<String>,
242
243    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
244    /// When set, only traversals that reach this VID are included.
245    bound_target_column: Option<String>,
246
247    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
248    /// Edges matching any of these IDs are excluded from traversal results.
249    used_edge_columns: Vec<String>,
250
251    /// Output schema.
252    schema: SchemaRef,
253
254    /// Cached plan properties.
255    properties: Arc<PlanProperties>,
256
257    /// Execution metrics.
258    metrics: ExecutionPlanMetricsSet,
259}
260
261impl fmt::Debug for GraphTraverseExec {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("GraphTraverseExec")
264            .field("source_column", &self.source_column)
265            .field("edge_type_ids", &self.edge_type_ids)
266            .field("direction", &self.direction)
267            .field("target_variable", &self.target_variable)
268            .field("edge_variable", &self.edge_variable)
269            .finish()
270    }
271}
272
273impl GraphTraverseExec {
274    /// Create a new single-hop traversal plan.
275    ///
276    /// # Arguments
277    ///
278    /// * `input` - Input plan providing source vertices
279    /// * `source_column` - Column name containing source VIDs
280    /// * `edge_type_ids` - Edge types to traverse
281    /// * `direction` - Traversal direction
282    /// * `target_variable` - Variable name for target vertices
283    /// * `edge_variable` - Optional variable name for edges
284    /// * `edge_properties` - Edge properties to materialize
285    /// * `target_label_id` - Optional target label filter
286    /// * `graph_ctx` - Graph execution context
287    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
288    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
289    #[expect(clippy::too_many_arguments)]
290    pub fn new(
291        input: Arc<dyn ExecutionPlan>,
292        source_column: impl Into<String>,
293        edge_type_ids: Vec<u32>,
294        direction: Direction,
295        target_variable: impl Into<String>,
296        edge_variable: Option<String>,
297        edge_properties: Vec<String>,
298        target_properties: Vec<String>,
299        target_label_name: Option<String>,
300        target_label_id: Option<u16>,
301        graph_ctx: Arc<GraphExecutionContext>,
302        optional: bool,
303        optional_pattern_vars: HashSet<String>,
304        bound_target_column: Option<String>,
305        used_edge_columns: Vec<String>,
306    ) -> Self {
307        let source_column = source_column.into();
308        let target_variable = target_variable.into();
309
310        // Resolve target property Arrow types from the schema
311        let uni_schema = graph_ctx.storage().schema_manager().schema();
312        let label_props = target_label_name
313            .as_deref()
314            .and_then(|ln| uni_schema.properties.get(ln));
315        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
316        let edge_props = if merged_edge_props.is_empty() {
317            None
318        } else {
319            Some(&merged_edge_props)
320        };
321
322        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
323        let schema = Self::build_schema(
324            input.schema(),
325            &target_variable,
326            edge_variable.as_deref(),
327            &edge_properties,
328            &target_properties,
329            label_props,
330            edge_props,
331            optional,
332        );
333
334        let properties = compute_plan_properties(schema.clone());
335
336        Self {
337            input,
338            source_column,
339            edge_type_ids,
340            direction,
341            target_variable,
342            edge_variable,
343            edge_properties,
344            target_properties,
345            target_label_name,
346            target_label_id,
347            graph_ctx,
348            optional,
349            optional_pattern_vars,
350            bound_target_column,
351            used_edge_columns,
352            schema,
353            properties,
354            metrics: ExecutionPlanMetricsSet::new(),
355        }
356    }
357
358    /// Build output schema.
359    #[expect(
360        clippy::too_many_arguments,
361        reason = "Schema construction needs all field metadata"
362    )]
363    fn build_schema(
364        input_schema: SchemaRef,
365        target_variable: &str,
366        edge_variable: Option<&str>,
367        edge_properties: &[String],
368        target_properties: &[String],
369        label_props: Option<
370            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
371        >,
372        edge_props: Option<
373            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
374        >,
375        optional: bool,
376    ) -> SchemaRef {
377        let mut fields: Vec<Field> = input_schema
378            .fields()
379            .iter()
380            .map(|f| f.as_ref().clone())
381            .collect();
382
383        // Add target VID column (nullable when optional — unmatched rows get NULL)
384        let target_vid_name = format!("{}._vid", target_variable);
385        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
386
387        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
388        fields.push(Field::new(
389            format!("{}._labels", target_variable),
390            labels_data_type(),
391            true,
392        ));
393
394        // Add target vertex property columns
395        for prop_name in target_properties {
396            let col_name = format!("{}.{}", target_variable, prop_name);
397            let arrow_type = resolve_property_type(prop_name, label_props);
398            fields.push(Field::new(&col_name, arrow_type, true));
399        }
400
401        // Add edge ID column if edge variable is bound
402        if let Some(edge_var) = edge_variable {
403            let edge_id_name = format!("{}._eid", edge_var);
404            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
405
406            // Add edge _type column for type(r) support
407            fields.push(Field::new(
408                format!("{}._type", edge_var),
409                DataType::Utf8,
410                true,
411            ));
412
413            // Add edge property columns with types resolved from schema
414            for prop_name in edge_properties {
415                let prop_col_name = format!("{}.{}", edge_var, prop_name);
416                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
417                fields.push(Field::new(&prop_col_name, arrow_type, true));
418            }
419        } else {
420            // Add internal edge ID column for relationship uniqueness tracking
421            // even when edge variable is not explicitly bound.
422            let internal_eid_name = format!("__eid_to_{}", target_variable);
423            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
424        }
425
426        Arc::new(Schema::new(fields))
427    }
428}
429
430impl DisplayAs for GraphTraverseExec {
431    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432        write!(
433            f,
434            "GraphTraverseExec: {} --[{:?}]--> {}",
435            self.source_column, self.edge_type_ids, self.target_variable
436        )?;
437        if let Some(ref edge_var) = self.edge_variable {
438            write!(f, " as {}", edge_var)?;
439        }
440        Ok(())
441    }
442}
443
444impl ExecutionPlan for GraphTraverseExec {
445    fn name(&self) -> &str {
446        "GraphTraverseExec"
447    }
448
449    fn as_any(&self) -> &dyn Any {
450        self
451    }
452
453    fn schema(&self) -> SchemaRef {
454        self.schema.clone()
455    }
456
457    fn properties(&self) -> &Arc<PlanProperties> {
458        &self.properties
459    }
460
461    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
462        vec![&self.input]
463    }
464
465    fn with_new_children(
466        self: Arc<Self>,
467        children: Vec<Arc<dyn ExecutionPlan>>,
468    ) -> DFResult<Arc<dyn ExecutionPlan>> {
469        if children.len() != 1 {
470            return Err(datafusion::error::DataFusionError::Plan(
471                "GraphTraverseExec requires exactly one child".to_string(),
472            ));
473        }
474
475        Ok(Arc::new(Self::new(
476            children[0].clone(),
477            self.source_column.clone(),
478            self.edge_type_ids.clone(),
479            self.direction,
480            self.target_variable.clone(),
481            self.edge_variable.clone(),
482            self.edge_properties.clone(),
483            self.target_properties.clone(),
484            self.target_label_name.clone(),
485            self.target_label_id,
486            self.graph_ctx.clone(),
487            self.optional,
488            self.optional_pattern_vars.clone(),
489            self.bound_target_column.clone(),
490            self.used_edge_columns.clone(),
491        )))
492    }
493
494    fn execute(
495        &self,
496        partition: usize,
497        context: Arc<TaskContext>,
498    ) -> DFResult<SendableRecordBatchStream> {
499        let input_stream = self.input.execute(partition, context)?;
500
501        let metrics = BaselineMetrics::new(&self.metrics, partition);
502
503        let warm_fut = self
504            .graph_ctx
505            .warming_future(self.edge_type_ids.clone(), self.direction);
506
507        Ok(Box::pin(GraphTraverseStream {
508            input: input_stream,
509            source_column: self.source_column.clone(),
510            edge_type_ids: self.edge_type_ids.clone(),
511            direction: self.direction,
512            target_variable: self.target_variable.clone(),
513            edge_variable: self.edge_variable.clone(),
514            edge_properties: self.edge_properties.clone(),
515            target_properties: self.target_properties.clone(),
516            target_label_name: self.target_label_name.clone(),
517            graph_ctx: self.graph_ctx.clone(),
518            optional: self.optional,
519            optional_pattern_vars: self.optional_pattern_vars.clone(),
520            bound_target_column: self.bound_target_column.clone(),
521            used_edge_columns: self.used_edge_columns.clone(),
522            schema: self.schema.clone(),
523            state: TraverseStreamState::Warming(warm_fut),
524            metrics,
525        }))
526    }
527
528    fn metrics(&self) -> Option<MetricsSet> {
529        Some(self.metrics.clone_inner())
530    }
531}
532
533/// State machine for traverse stream execution.
534enum TraverseStreamState {
535    /// Warming adjacency CSRs before first batch.
536    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
537    /// Polling the input stream for batches.
538    Reading,
539    /// Materializing target vertex properties asynchronously.
540    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
541    /// Stream is done.
542    Done,
543}
544
545/// Stream that performs single-hop traversal with async property materialization.
546struct GraphTraverseStream {
547    /// Input stream.
548    input: SendableRecordBatchStream,
549
550    /// Column name containing source VIDs.
551    source_column: String,
552
553    /// Edge type IDs to traverse.
554    edge_type_ids: Vec<u32>,
555
556    /// Traversal direction.
557    direction: Direction,
558
559    /// Variable name for target vertex (retained for diagnostics).
560    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
561    target_variable: String,
562
563    /// Variable name for edge (if bound).
564    edge_variable: Option<String>,
565
566    /// Edge properties to materialize.
567    edge_properties: Vec<String>,
568
569    /// Target vertex properties to materialize.
570    target_properties: Vec<String>,
571
572    /// Target label name for property resolution and filtering.
573    target_label_name: Option<String>,
574
575    /// Graph execution context.
576    graph_ctx: Arc<GraphExecutionContext>,
577
578    /// Whether this is an OPTIONAL MATCH.
579    optional: bool,
580
581    /// Variables introduced by the OPTIONAL MATCH pattern.
582    optional_pattern_vars: HashSet<String>,
583
584    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
585    bound_target_column: Option<String>,
586
587    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
588    used_edge_columns: Vec<String>,
589
590    /// Output schema.
591    schema: SchemaRef,
592
593    /// Stream state.
594    state: TraverseStreamState,
595
596    /// Metrics.
597    metrics: BaselineMetrics,
598}
599
600impl GraphTraverseStream {
601    /// Expand neighbors synchronously and return expansions.
602    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
603    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
604        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
605            datafusion::error::DataFusionError::Execution(format!(
606                "Source column '{}' not found",
607                self.source_column
608            ))
609        })?;
610
611        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
612        let source_vids: &UInt64Array = &source_vid_cow;
613
614        // If bound_target_column is set, get the expected target VIDs for each row.
615        // This is used for cycle patterns like n-->k<--n where the target must match.
616        let bound_target_cow = self
617            .bound_target_column
618            .as_ref()
619            .and_then(|col| batch.column_by_name(col))
620            .map(|c| column_as_vid_array(c.as_ref()))
621            .transpose()?;
622        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
623
624        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
625        let used_edge_arrays: Vec<&UInt64Array> = self
626            .used_edge_columns
627            .iter()
628            .filter_map(|col| {
629                batch
630                    .column_by_name(col)
631                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
632            })
633            .collect();
634
635        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
636        let is_undirected = matches!(self.direction, Direction::Both);
637
638        for (row_idx, source_vid) in source_vids.iter().enumerate() {
639            let Some(src) = source_vid else {
640                continue;
641            };
642
643            // Get expected target VID if this is a bound target pattern.
644            // Distinguish between:
645            // - no bound target column (no filtering),
646            // - bound target present but NULL for this row (must produce no expansion),
647            // - bound target present with VID.
648            let expected_target = bound_target_vids.map(|arr| {
649                if arr.is_null(row_idx) {
650                    None
651                } else {
652                    Some(arr.value(row_idx))
653                }
654            });
655
656            // Collect used edge IDs for this row from all previous hops
657            let used_eids: HashSet<u64> = used_edge_arrays
658                .iter()
659                .filter_map(|arr| {
660                    if arr.is_null(row_idx) {
661                        None
662                    } else {
663                        Some(arr.value(row_idx))
664                    }
665                })
666                .collect();
667
668            let vid = Vid::from(src);
669            // For Direction::Both, deduplicate edges by eid within each source.
670            // This prevents the same edge being counted twice (once outgoing, once incoming).
671            let mut seen_edges: HashSet<u64> = HashSet::new();
672
673            for &edge_type in &self.edge_type_ids {
674                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
675
676                for (target_vid, eid) in neighbors {
677                    let eid_u64 = eid.as_u64();
678
679                    // Skip edges already used in previous hops (relationship uniqueness)
680                    if used_eids.contains(&eid_u64) {
681                        continue;
682                    }
683
684                    // Deduplicate edges for undirected patterns
685                    if is_undirected && !seen_edges.insert(eid_u64) {
686                        continue;
687                    }
688
689                    // Filter by bound target VID if set (for cycle patterns).
690                    // NULL bound targets do not match anything.
691                    if let Some(expected_opt) = expected_target {
692                        let Some(expected) = expected_opt else {
693                            continue;
694                        };
695                        if target_vid.as_u64() != expected {
696                            continue;
697                        }
698                    }
699
700                    // Filter by target label using L0 visibility.
701                    // VIDs no longer embed label information, so we must look up labels.
702                    if let Some(ref label_name) = self.target_label_name {
703                        let query_ctx = self.graph_ctx.query_context();
704                        if let Some(vertex_labels) =
705                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
706                        {
707                            // Vertex is in L0 — require actual label match
708                            if !vertex_labels.contains(label_name) {
709                                continue;
710                            }
711                        }
712                        // else: vertex not in L0 → trust storage-level filtering
713                    }
714
715                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
716                }
717            }
718        }
719
720        Ok(expanded_rows)
721    }
722}
723
724/// Build target vertex labels column from L0 buffers.
725fn build_target_labels_column(
726    target_vids: &[Vid],
727    target_label_name: &Option<String>,
728    graph_ctx: &GraphExecutionContext,
729) -> ArrayRef {
730    use arrow_array::builder::{ListBuilder, StringBuilder};
731    let mut labels_builder = ListBuilder::new(StringBuilder::new());
732    let query_ctx = graph_ctx.query_context();
733    for vid in target_vids {
734        let row_labels: Vec<String> =
735            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
736                Some(labels) => labels,
737                None => {
738                    // Vertex not in L0 — trust schema label (storage already filtered)
739                    if let Some(label_name) = target_label_name {
740                        vec![label_name.clone()]
741                    } else {
742                        vec![]
743                    }
744                }
745            };
746        let values = labels_builder.values();
747        for lbl in &row_labels {
748            values.append_value(lbl);
749        }
750        labels_builder.append(true);
751    }
752    Arc::new(labels_builder.finish())
753}
754
755/// Build target vertex property columns from storage and L0.
756async fn build_target_property_columns(
757    target_vids: &[Vid],
758    target_properties: &[String],
759    target_label_name: &Option<String>,
760    graph_ctx: &Arc<GraphExecutionContext>,
761) -> DFResult<Vec<ArrayRef>> {
762    let mut columns = Vec::new();
763
764    if let Some(label_name) = target_label_name {
765        let property_manager = graph_ctx.property_manager();
766        let query_ctx = graph_ctx.query_context();
767
768        let props_map = property_manager
769            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
770            .await
771            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
772
773        let uni_schema = graph_ctx.storage().schema_manager().schema();
774        let label_props = uni_schema.properties.get(label_name.as_str());
775
776        for prop_name in target_properties {
777            let data_type = resolve_property_type(prop_name, label_props);
778            let column =
779                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
780            columns.push(column);
781        }
782    } else {
783        // No label name — use label-agnostic property lookup.
784        let non_internal_props: Vec<&str> = target_properties
785            .iter()
786            .filter(|p| *p != "_all_props")
787            .map(|s| s.as_str())
788            .collect();
789        let property_manager = graph_ctx.property_manager();
790        let query_ctx = graph_ctx.query_context();
791
792        let props_map = if !non_internal_props.is_empty() {
793            property_manager
794                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
795                .await
796                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
797        } else {
798            std::collections::HashMap::new()
799        };
800
801        for prop_name in target_properties {
802            if prop_name == "_all_props" {
803                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
804            } else {
805                let column = build_property_column_static(
806                    target_vids,
807                    &props_map,
808                    prop_name,
809                    &arrow::datatypes::DataType::LargeBinary,
810                )?;
811                columns.push(column);
812            }
813        }
814    }
815
816    Ok(columns)
817}
818
819/// Build a CypherValue blob column from all vertex properties (L0 + storage).
820fn build_all_props_column(
821    target_vids: &[Vid],
822    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
823    graph_ctx: &Arc<GraphExecutionContext>,
824) -> ArrayRef {
825    use crate::query::df_graph::scan::encode_cypher_value;
826    use arrow_array::builder::LargeBinaryBuilder;
827
828    let mut builder = LargeBinaryBuilder::new();
829    let l0_ctx = graph_ctx.l0_context();
830    for vid in target_vids {
831        let mut merged_props = serde_json::Map::new();
832        if let Some(vid_props) = props_map.get(vid) {
833            for (k, v) in vid_props.iter() {
834                let json_val: serde_json::Value = v.clone().into();
835                merged_props.insert(k.to_string(), json_val);
836            }
837        }
838        for l0 in l0_ctx.iter_l0_buffers() {
839            let guard = l0.read();
840            if let Some(l0_props) = guard.vertex_properties.get(vid) {
841                for (k, v) in l0_props.iter() {
842                    let json_val: serde_json::Value = v.clone().into();
843                    merged_props.insert(k.to_string(), json_val);
844                }
845            }
846        }
847        if merged_props.is_empty() {
848            builder.append_null();
849        } else {
850            let json = serde_json::Value::Object(merged_props);
851            match encode_cypher_value(&json) {
852                Ok(bytes) => builder.append_value(bytes),
853                Err(_) => builder.append_null(),
854            }
855        }
856    }
857    Arc::new(builder.finish())
858}
859
860/// Build edge ID, type, and property columns for bound edge variables.
861async fn build_edge_columns(
862    expansions: &[(usize, Vid, u64, u32)],
863    edge_properties: &[String],
864    edge_type_ids: &[u32],
865    graph_ctx: &Arc<GraphExecutionContext>,
866) -> DFResult<Vec<ArrayRef>> {
867    let mut columns = Vec::new();
868
869    let eids: Vec<Eid> = expansions
870        .iter()
871        .map(|(_, _, eid, _)| Eid::from(*eid))
872        .collect();
873    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
874    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
875
876    // Edge _type column
877    {
878        let uni_schema = graph_ctx.storage().schema_manager().schema();
879        let mut type_builder = arrow_array::builder::StringBuilder::new();
880        for (_, _, _, edge_type_id) in expansions {
881            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
882                type_builder.append_value(&name);
883            } else {
884                type_builder.append_null();
885            }
886        }
887        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
888    }
889
890    if !edge_properties.is_empty() {
891        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
892        let property_manager = graph_ctx.property_manager();
893        let query_ctx = graph_ctx.query_context();
894
895        let props_map = property_manager
896            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
897            .await
898            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
899
900        let uni_schema = graph_ctx.storage().schema_manager().schema();
901        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
902        let edge_type_props = if merged_edge_props.is_empty() {
903            None
904        } else {
905            Some(&merged_edge_props)
906        };
907
908        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
909
910        for prop_name in edge_properties {
911            // System-managed edge timestamps live outside the property bag.
912            // Read them directly from L0 (earliest creation, latest update).
913            // Disk-only edges will surface as null — acceptable for the
914            // common case where the queried edges are L0-resident in the
915            // same session.
916            if prop_name == "_created_at" || prop_name == "_updated_at" {
917                let mut builder =
918                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
919                let l0_ctx = graph_ctx.l0_context();
920                for eid in &eids {
921                    let mut value: Option<i64> = None;
922                    for l0 in l0_ctx.iter_l0_buffers() {
923                        let guard = l0.read();
924                        let opt = if prop_name == "_created_at" {
925                            guard.edge_created_at.get(eid).copied()
926                        } else {
927                            guard.edge_updated_at.get(eid).copied()
928                        };
929                        if let Some(ts) = opt {
930                            value = Some(match value {
931                                None => ts,
932                                Some(cur) if prop_name == "_created_at" => cur.min(ts),
933                                Some(cur) => cur.max(ts),
934                            });
935                        }
936                    }
937                    match value {
938                        Some(ts) => builder.append_value(ts),
939                        None => builder.append_null(),
940                    }
941                }
942                columns.push(Arc::new(builder.finish()) as ArrayRef);
943                continue;
944            }
945            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
946            let column =
947                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
948            columns.push(column);
949        }
950    }
951
952    Ok(columns)
953}
954
955/// Build the output batch with target vertex properties.
956///
957/// This is a standalone async function so it can be boxed into a `Send` future
958/// without borrowing from `GraphTraverseStream`.
959#[expect(
960    clippy::too_many_arguments,
961    reason = "Standalone async fn needs all context passed explicitly"
962)]
963async fn build_traverse_output_batch(
964    input: RecordBatch,
965    expansions: Vec<(usize, Vid, u64, u32)>,
966    schema: SchemaRef,
967    edge_variable: Option<String>,
968    edge_properties: Vec<String>,
969    edge_type_ids: Vec<u32>,
970    target_properties: Vec<String>,
971    target_label_name: Option<String>,
972    graph_ctx: Arc<GraphExecutionContext>,
973    optional: bool,
974    optional_pattern_vars: HashSet<String>,
975) -> DFResult<RecordBatch> {
976    if expansions.is_empty() {
977        if !optional {
978            return Ok(RecordBatch::new_empty(schema));
979        }
980        let unmatched_reps = collect_unmatched_optional_group_rows(
981            &input,
982            &HashSet::new(),
983            &schema,
984            &optional_pattern_vars,
985        )?;
986        if unmatched_reps.is_empty() {
987            return Ok(RecordBatch::new_empty(schema));
988        }
989        return build_optional_null_batch_for_rows_with_optional_vars(
990            &input,
991            &unmatched_reps,
992            &schema,
993            &optional_pattern_vars,
994        );
995    }
996
997    // Expand input columns via index array
998    let indices: Vec<u64> = expansions
999        .iter()
1000        .map(|(idx, _, _, _)| *idx as u64)
1001        .collect();
1002    let indices_array = UInt64Array::from(indices);
1003    let mut columns: Vec<ArrayRef> = input
1004        .columns()
1005        .iter()
1006        .map(|col| take(col.as_ref(), &indices_array, None))
1007        .collect::<Result<_, _>>()?;
1008
1009    // Target VID column
1010    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1011    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1012    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1013
1014    // Target labels column
1015    columns.push(build_target_labels_column(
1016        &target_vids,
1017        &target_label_name,
1018        &graph_ctx,
1019    ));
1020
1021    // Target vertex property columns
1022    if !target_properties.is_empty() {
1023        let prop_cols = build_target_property_columns(
1024            &target_vids,
1025            &target_properties,
1026            &target_label_name,
1027            &graph_ctx,
1028        )
1029        .await?;
1030        columns.extend(prop_cols);
1031    }
1032
1033    // Edge columns (bound or internal tracking)
1034    if edge_variable.is_some() {
1035        let edge_cols =
1036            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1037        columns.extend(edge_cols);
1038    } else {
1039        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1040        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1041    }
1042
1043    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1044
1045    // Append null rows for unmatched optional sources
1046    if optional {
1047        let matched_indices: HashSet<usize> =
1048            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1049        let unmatched = collect_unmatched_optional_group_rows(
1050            &input,
1051            &matched_indices,
1052            &schema,
1053            &optional_pattern_vars,
1054        )?;
1055
1056        if !unmatched.is_empty() {
1057            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1058                &input,
1059                &unmatched,
1060                &schema,
1061                &optional_pattern_vars,
1062            )?;
1063            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1064                .map_err(arrow_err)?;
1065            return Ok(combined);
1066        }
1067    }
1068
1069    Ok(expanded_batch)
1070}
1071
1072/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1073/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1074fn build_optional_null_batch_for_rows(
1075    input: &RecordBatch,
1076    unmatched_indices: &[usize],
1077    schema: &SchemaRef,
1078) -> DFResult<RecordBatch> {
1079    let num_rows = unmatched_indices.len();
1080    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1081    let indices_array = UInt64Array::from(indices);
1082
1083    // Take the unmatched input rows
1084    let mut columns: Vec<ArrayRef> = Vec::new();
1085    for col in input.columns() {
1086        let taken = take(col.as_ref(), &indices_array, None)?;
1087        columns.push(taken);
1088    }
1089    // Fill remaining columns with nulls
1090    for field in schema.fields().iter().skip(input.num_columns()) {
1091        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1092    }
1093    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1094}
1095
1096fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1097    optional_vars.contains(col_name)
1098        || optional_vars.iter().any(|var| {
1099            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1100                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1101        })
1102}
1103
1104fn collect_unmatched_optional_group_rows(
1105    input: &RecordBatch,
1106    matched_indices: &HashSet<usize>,
1107    schema: &SchemaRef,
1108    optional_vars: &HashSet<String>,
1109) -> DFResult<Vec<usize>> {
1110    if input.num_rows() == 0 {
1111        return Ok(Vec::new());
1112    }
1113
1114    if optional_vars.is_empty() {
1115        return Ok((0..input.num_rows())
1116            .filter(|idx| !matched_indices.contains(idx))
1117            .collect());
1118    }
1119
1120    let source_vid_indices: Vec<usize> = schema
1121        .fields()
1122        .iter()
1123        .enumerate()
1124        .filter_map(|(idx, field)| {
1125            if idx >= input.num_columns() {
1126                return None;
1127            }
1128            let name = field.name();
1129            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1130                Some(idx)
1131            } else {
1132                None
1133            }
1134        })
1135        .collect();
1136
1137    // Group rows by non-optional VID bindings and preserve group order.
1138    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1139    let mut group_order: Vec<Vec<u8>> = Vec::new();
1140
1141    for row_idx in 0..input.num_rows() {
1142        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1143        let entry = groups.entry(key.clone());
1144        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1145            group_order.push(key.clone());
1146        }
1147        let matched = matched_indices.contains(&row_idx);
1148        entry
1149            .and_modify(|(_, any_matched)| *any_matched |= matched)
1150            .or_insert((row_idx, matched));
1151    }
1152
1153    Ok(group_order
1154        .into_iter()
1155        .filter_map(|key| {
1156            groups
1157                .get(&key)
1158                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1159        })
1160        .collect())
1161}
1162
1163fn compute_optional_group_key(
1164    batch: &RecordBatch,
1165    row_idx: usize,
1166    source_vid_indices: &[usize],
1167) -> DFResult<Vec<u8>> {
1168    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1169    for &col_idx in source_vid_indices {
1170        let col = batch.column(col_idx);
1171        let vid_cow = column_as_vid_array(col.as_ref())?;
1172        let arr: &UInt64Array = &vid_cow;
1173        if arr.is_null(row_idx) {
1174            key.extend_from_slice(&u64::MAX.to_le_bytes());
1175        } else {
1176            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1177        }
1178    }
1179    Ok(key)
1180}
1181
1182fn build_optional_null_batch_for_rows_with_optional_vars(
1183    input: &RecordBatch,
1184    unmatched_indices: &[usize],
1185    schema: &SchemaRef,
1186    optional_vars: &HashSet<String>,
1187) -> DFResult<RecordBatch> {
1188    if optional_vars.is_empty() {
1189        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1190    }
1191
1192    let num_rows = unmatched_indices.len();
1193    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1194    let indices_array = UInt64Array::from(indices);
1195
1196    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1197    for (col_idx, field) in schema.fields().iter().enumerate() {
1198        if col_idx < input.num_columns() {
1199            if is_optional_column_for_vars(field.name(), optional_vars) {
1200                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1201            } else {
1202                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1203                columns.push(taken);
1204            }
1205        } else {
1206            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1207        }
1208    }
1209
1210    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1211}
1212
1213impl Stream for GraphTraverseStream {
1214    type Item = DFResult<RecordBatch>;
1215
1216    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1217        let metrics = self.metrics.clone();
1218        let _timer = metrics.elapsed_compute().timer();
1219        loop {
1220            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1221
1222            match state {
1223                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1224                    Poll::Ready(Ok(())) => {
1225                        self.state = TraverseStreamState::Reading;
1226                        // Continue loop to start reading
1227                    }
1228                    Poll::Ready(Err(e)) => {
1229                        self.state = TraverseStreamState::Done;
1230                        return Poll::Ready(Some(Err(e)));
1231                    }
1232                    Poll::Pending => {
1233                        self.state = TraverseStreamState::Warming(fut);
1234                        return Poll::Pending;
1235                    }
1236                },
1237                TraverseStreamState::Reading => {
1238                    // Check timeout
1239                    if let Err(e) = self.graph_ctx.check_timeout() {
1240                        return Poll::Ready(Some(Err(
1241                            datafusion::error::DataFusionError::Execution(e.to_string()),
1242                        )));
1243                    }
1244
1245                    match self.input.poll_next_unpin(cx) {
1246                        Poll::Ready(Some(Ok(batch))) => {
1247                            // Expand neighbors synchronously
1248                            let expansions = match self.expand_neighbors(&batch) {
1249                                Ok(exp) => exp,
1250                                Err(e) => {
1251                                    self.state = TraverseStreamState::Reading;
1252                                    return Poll::Ready(Some(Err(e)));
1253                                }
1254                            };
1255
1256                            // Build output synchronously only when no properties need async hydration
1257                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1258                            {
1259                                let result = build_traverse_output_batch_sync(
1260                                    &batch,
1261                                    &expansions,
1262                                    &self.schema,
1263                                    self.edge_variable.as_ref(),
1264                                    &self.graph_ctx,
1265                                    self.optional,
1266                                    &self.optional_pattern_vars,
1267                                );
1268                                self.state = TraverseStreamState::Reading;
1269                                if let Ok(ref r) = result {
1270                                    self.metrics.record_output(r.num_rows());
1271                                }
1272                                return Poll::Ready(Some(result));
1273                            }
1274
1275                            // Properties needed — create async future for hydration
1276                            let schema = self.schema.clone();
1277                            let edge_variable = self.edge_variable.clone();
1278                            let edge_properties = self.edge_properties.clone();
1279                            let edge_type_ids = self.edge_type_ids.clone();
1280                            let target_properties = self.target_properties.clone();
1281                            let target_label_name = self.target_label_name.clone();
1282                            let graph_ctx = self.graph_ctx.clone();
1283
1284                            let optional = self.optional;
1285                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1286
1287                            let fut = build_traverse_output_batch(
1288                                batch,
1289                                expansions,
1290                                schema,
1291                                edge_variable,
1292                                edge_properties,
1293                                edge_type_ids,
1294                                target_properties,
1295                                target_label_name,
1296                                graph_ctx,
1297                                optional,
1298                                optional_pattern_vars,
1299                            );
1300
1301                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1302                            // Continue loop to poll the future
1303                        }
1304                        Poll::Ready(Some(Err(e))) => {
1305                            self.state = TraverseStreamState::Done;
1306                            return Poll::Ready(Some(Err(e)));
1307                        }
1308                        Poll::Ready(None) => {
1309                            self.state = TraverseStreamState::Done;
1310                            return Poll::Ready(None);
1311                        }
1312                        Poll::Pending => {
1313                            self.state = TraverseStreamState::Reading;
1314                            return Poll::Pending;
1315                        }
1316                    }
1317                }
1318                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1319                    Poll::Ready(Ok(batch)) => {
1320                        self.state = TraverseStreamState::Reading;
1321                        self.metrics.record_output(batch.num_rows());
1322                        return Poll::Ready(Some(Ok(batch)));
1323                    }
1324                    Poll::Ready(Err(e)) => {
1325                        self.state = TraverseStreamState::Done;
1326                        return Poll::Ready(Some(Err(e)));
1327                    }
1328                    Poll::Pending => {
1329                        self.state = TraverseStreamState::Materializing(fut);
1330                        return Poll::Pending;
1331                    }
1332                },
1333                TraverseStreamState::Done => {
1334                    return Poll::Ready(None);
1335                }
1336            }
1337        }
1338    }
1339}
1340
1341/// Build output batch synchronously when no properties need async hydration.
1342///
1343/// Only called when both `target_properties` and `edge_properties` are empty,
1344/// so no property columns need to be materialized.
1345fn build_traverse_output_batch_sync(
1346    input: &RecordBatch,
1347    expansions: &[(usize, Vid, u64, u32)],
1348    schema: &SchemaRef,
1349    edge_variable: Option<&String>,
1350    graph_ctx: &GraphExecutionContext,
1351    optional: bool,
1352    optional_pattern_vars: &HashSet<String>,
1353) -> DFResult<RecordBatch> {
1354    if expansions.is_empty() {
1355        if !optional {
1356            return Ok(RecordBatch::new_empty(schema.clone()));
1357        }
1358        let unmatched_reps = collect_unmatched_optional_group_rows(
1359            input,
1360            &HashSet::new(),
1361            schema,
1362            optional_pattern_vars,
1363        )?;
1364        if unmatched_reps.is_empty() {
1365            return Ok(RecordBatch::new_empty(schema.clone()));
1366        }
1367        return build_optional_null_batch_for_rows_with_optional_vars(
1368            input,
1369            &unmatched_reps,
1370            schema,
1371            optional_pattern_vars,
1372        );
1373    }
1374
1375    let indices: Vec<u64> = expansions
1376        .iter()
1377        .map(|(idx, _, _, _)| *idx as u64)
1378        .collect();
1379    let indices_array = UInt64Array::from(indices);
1380
1381    let mut columns: Vec<ArrayRef> = Vec::new();
1382    for col in input.columns() {
1383        let expanded = take(col.as_ref(), &indices_array, None)?;
1384        columns.push(expanded);
1385    }
1386
1387    // Add target VID column
1388    let target_vids: Vec<u64> = expansions
1389        .iter()
1390        .map(|(_, vid, _, _)| vid.as_u64())
1391        .collect();
1392    columns.push(Arc::new(UInt64Array::from(target_vids)));
1393
1394    // Add target ._labels column (from L0 buffers)
1395    {
1396        use arrow_array::builder::{ListBuilder, StringBuilder};
1397        let l0_ctx = graph_ctx.l0_context();
1398        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1399        for (_, vid, _, _) in expansions {
1400            let mut row_labels: Vec<String> = Vec::new();
1401            for l0 in l0_ctx.iter_l0_buffers() {
1402                let guard = l0.read();
1403                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1404                    for lbl in l0_labels {
1405                        if !row_labels.contains(lbl) {
1406                            row_labels.push(lbl.clone());
1407                        }
1408                    }
1409                }
1410            }
1411            let values = labels_builder.values();
1412            for lbl in &row_labels {
1413                values.append_value(lbl);
1414            }
1415            labels_builder.append(true);
1416        }
1417        columns.push(Arc::new(labels_builder.finish()));
1418    }
1419
1420    // Add edge columns if edge is bound (no properties in sync path)
1421    if edge_variable.is_some() {
1422        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1423        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1424
1425        // Add edge _type column
1426        let uni_schema = graph_ctx.storage().schema_manager().schema();
1427        let mut type_builder = arrow_array::builder::StringBuilder::new();
1428        for (_, _, _, edge_type_id) in expansions {
1429            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1430                type_builder.append_value(&name);
1431            } else {
1432                type_builder.append_null();
1433            }
1434        }
1435        columns.push(Arc::new(type_builder.finish()));
1436    } else {
1437        // Internal EID column for relationship uniqueness tracking (matches schema)
1438        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1439        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1440    }
1441
1442    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1443
1444    if optional {
1445        let matched_indices: HashSet<usize> =
1446            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1447        let unmatched = collect_unmatched_optional_group_rows(
1448            input,
1449            &matched_indices,
1450            schema,
1451            optional_pattern_vars,
1452        )?;
1453
1454        if !unmatched.is_empty() {
1455            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1456                input,
1457                &unmatched,
1458                schema,
1459                optional_pattern_vars,
1460            )?;
1461            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1462                .map_err(|e| {
1463                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1464            })?;
1465            return Ok(combined);
1466        }
1467    }
1468
1469    Ok(expanded_batch)
1470}
1471
1472impl RecordBatchStream for GraphTraverseStream {
1473    fn schema(&self) -> SchemaRef {
1474        self.schema.clone()
1475    }
1476}
1477
1478/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1479type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1480
1481/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1482///
1483/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1484/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1485///
1486/// # Example
1487///
1488/// ```ignore
1489/// // Traverse schemaless "CUSTOM" edges
1490/// let traverse = GraphTraverseMainExec::new(
1491///     input_plan,
1492///     "_vid",
1493///     "CUSTOM",
1494///     Direction::Outgoing,
1495///     "m",           // target variable
1496///     Some("r"),     // edge variable
1497///     vec![],        // edge properties
1498///     vec![],        // target properties
1499///     graph_ctx,
1500///     false,         // not optional
1501/// );
1502/// ```
1503pub struct GraphTraverseMainExec {
1504    /// Input execution plan.
1505    input: Arc<dyn ExecutionPlan>,
1506
1507    /// Column name containing source VIDs.
1508    source_column: String,
1509
1510    /// Edge type names (not IDs, since schemaless types may not have IDs).
1511    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1512    type_names: Vec<String>,
1513
1514    /// Traversal direction.
1515    direction: Direction,
1516
1517    /// Variable name for target vertex columns.
1518    target_variable: String,
1519
1520    /// Variable name for edge columns (if edge is bound).
1521    edge_variable: Option<String>,
1522
1523    /// Edge properties to materialize.
1524    edge_properties: Vec<String>,
1525
1526    /// Target vertex properties to materialize.
1527    target_properties: Vec<String>,
1528
1529    /// Graph execution context.
1530    graph_ctx: Arc<GraphExecutionContext>,
1531
1532    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1533    optional: bool,
1534
1535    /// Variables introduced by the OPTIONAL MATCH pattern.
1536    optional_pattern_vars: HashSet<String>,
1537
1538    /// Column name of an already-bound target VID (for patterns where target is in scope).
1539    /// When set, only traversals reaching this exact VID are included.
1540    bound_target_column: Option<String>,
1541
1542    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1543    /// Edges matching any of these IDs are excluded from traversal results.
1544    used_edge_columns: Vec<String>,
1545
1546    /// Output schema.
1547    schema: SchemaRef,
1548
1549    /// Cached plan properties.
1550    properties: Arc<PlanProperties>,
1551
1552    /// Execution metrics.
1553    metrics: ExecutionPlanMetricsSet,
1554}
1555
1556impl fmt::Debug for GraphTraverseMainExec {
1557    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558        f.debug_struct("GraphTraverseMainExec")
1559            .field("type_names", &self.type_names)
1560            .field("direction", &self.direction)
1561            .field("target_variable", &self.target_variable)
1562            .field("edge_variable", &self.edge_variable)
1563            .finish()
1564    }
1565}
1566
1567impl GraphTraverseMainExec {
1568    /// Create a new schemaless traversal executor.
1569    #[expect(clippy::too_many_arguments)]
1570    pub fn new(
1571        input: Arc<dyn ExecutionPlan>,
1572        source_column: impl Into<String>,
1573        type_names: Vec<String>,
1574        direction: Direction,
1575        target_variable: impl Into<String>,
1576        edge_variable: Option<String>,
1577        edge_properties: Vec<String>,
1578        target_properties: Vec<String>,
1579        graph_ctx: Arc<GraphExecutionContext>,
1580        optional: bool,
1581        optional_pattern_vars: HashSet<String>,
1582        bound_target_column: Option<String>,
1583        used_edge_columns: Vec<String>,
1584    ) -> Self {
1585        let source_column = source_column.into();
1586        let target_variable = target_variable.into();
1587
1588        // Build output schema
1589        let schema = Self::build_schema(
1590            &input.schema(),
1591            &target_variable,
1592            &edge_variable,
1593            &edge_properties,
1594            &target_properties,
1595            optional,
1596        );
1597
1598        let properties = compute_plan_properties(schema.clone());
1599
1600        Self {
1601            input,
1602            source_column,
1603            type_names,
1604            direction,
1605            target_variable,
1606            edge_variable,
1607            edge_properties,
1608            target_properties,
1609            graph_ctx,
1610            optional,
1611            optional_pattern_vars,
1612            bound_target_column,
1613            used_edge_columns,
1614            schema,
1615            properties,
1616            metrics: ExecutionPlanMetricsSet::new(),
1617        }
1618    }
1619
1620    /// Build output schema for traversal.
1621    fn build_schema(
1622        input_schema: &SchemaRef,
1623        target_variable: &str,
1624        edge_variable: &Option<String>,
1625        edge_properties: &[String],
1626        target_properties: &[String],
1627        optional: bool,
1628    ) -> SchemaRef {
1629        let mut fields: Vec<Field> = input_schema
1630            .fields()
1631            .iter()
1632            .map(|f| f.as_ref().clone())
1633            .collect();
1634
1635        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1636        let target_vid_name = format!("{}._vid", target_variable);
1637        if input_schema.column_with_name(&target_vid_name).is_none() {
1638            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1639        }
1640
1641        // Add target ._labels column (only if not already in input)
1642        let target_labels_name = format!("{}._labels", target_variable);
1643        if input_schema.column_with_name(&target_labels_name).is_none() {
1644            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1645        }
1646
1647        // Add edge columns if edge variable is bound
1648        if let Some(edge_var) = edge_variable {
1649            fields.push(Field::new(
1650                format!("{}._eid", edge_var),
1651                DataType::UInt64,
1652                optional,
1653            ));
1654
1655            // Add edge ._type column for type(r) support
1656            fields.push(Field::new(
1657                format!("{}._type", edge_var),
1658                DataType::Utf8,
1659                true,
1660            ));
1661
1662            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1663            // Schemaless edges store properties as CypherValue blobs so that
1664            // Int, Float, etc. round-trip correctly through Arrow.
1665            for prop in edge_properties {
1666                let col_name = format!("{}.{}", edge_var, prop);
1667                let mut metadata = std::collections::HashMap::new();
1668                metadata.insert("cv_encoded".to_string(), "true".to_string());
1669                fields.push(
1670                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1671                );
1672            }
1673        } else {
1674            // Add internal edge ID for anonymous relationships so BindPath can
1675            // reconstruct named paths (p = (a)-[:T]->(b)).
1676            fields.push(Field::new(
1677                format!("__eid_to_{}", target_variable),
1678                DataType::UInt64,
1679                optional,
1680            ));
1681        }
1682
1683        // Target properties: all as LargeBinary (deferred to PropertyManager)
1684        for prop in target_properties {
1685            fields.push(Field::new(
1686                format!("{}.{}", target_variable, prop),
1687                DataType::LargeBinary,
1688                true,
1689            ));
1690        }
1691
1692        Arc::new(Schema::new(fields))
1693    }
1694}
1695
1696impl DisplayAs for GraphTraverseMainExec {
1697    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698        write!(
1699            f,
1700            "GraphTraverseMainExec: types={:?}, direction={:?}",
1701            self.type_names, self.direction
1702        )
1703    }
1704}
1705
1706impl ExecutionPlan for GraphTraverseMainExec {
1707    fn name(&self) -> &str {
1708        "GraphTraverseMainExec"
1709    }
1710
1711    fn as_any(&self) -> &dyn Any {
1712        self
1713    }
1714
1715    fn schema(&self) -> SchemaRef {
1716        self.schema.clone()
1717    }
1718
1719    fn properties(&self) -> &Arc<PlanProperties> {
1720        &self.properties
1721    }
1722
1723    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1724        vec![&self.input]
1725    }
1726
1727    fn with_new_children(
1728        self: Arc<Self>,
1729        children: Vec<Arc<dyn ExecutionPlan>>,
1730    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1731        if children.len() != 1 {
1732            return Err(datafusion::error::DataFusionError::Plan(
1733                "GraphTraverseMainExec expects exactly one child".to_string(),
1734            ));
1735        }
1736
1737        Ok(Arc::new(Self {
1738            input: children[0].clone(),
1739            source_column: self.source_column.clone(),
1740            type_names: self.type_names.clone(),
1741            direction: self.direction,
1742            target_variable: self.target_variable.clone(),
1743            edge_variable: self.edge_variable.clone(),
1744            edge_properties: self.edge_properties.clone(),
1745            target_properties: self.target_properties.clone(),
1746            graph_ctx: self.graph_ctx.clone(),
1747            optional: self.optional,
1748            optional_pattern_vars: self.optional_pattern_vars.clone(),
1749            bound_target_column: self.bound_target_column.clone(),
1750            used_edge_columns: self.used_edge_columns.clone(),
1751            schema: self.schema.clone(),
1752            properties: self.properties.clone(),
1753            metrics: self.metrics.clone(),
1754        }))
1755    }
1756
1757    fn execute(
1758        &self,
1759        partition: usize,
1760        context: Arc<TaskContext>,
1761    ) -> DFResult<SendableRecordBatchStream> {
1762        let input_stream = self.input.execute(partition, context)?;
1763        let metrics = BaselineMetrics::new(&self.metrics, partition);
1764
1765        Ok(Box::pin(GraphTraverseMainStream::new(
1766            input_stream,
1767            self.source_column.clone(),
1768            self.type_names.clone(),
1769            self.direction,
1770            self.target_variable.clone(),
1771            self.edge_variable.clone(),
1772            self.edge_properties.clone(),
1773            self.target_properties.clone(),
1774            self.graph_ctx.clone(),
1775            self.optional,
1776            self.optional_pattern_vars.clone(),
1777            self.bound_target_column.clone(),
1778            self.used_edge_columns.clone(),
1779            self.schema.clone(),
1780            metrics,
1781        )))
1782    }
1783
1784    fn metrics(&self) -> Option<MetricsSet> {
1785        Some(self.metrics.clone_inner())
1786    }
1787}
1788
1789/// State machine for GraphTraverseMainStream.
1790enum GraphTraverseMainState {
1791    /// Loading adjacency map from main edges table.
1792    LoadingEdges {
1793        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1794        input_stream: SendableRecordBatchStream,
1795    },
1796    /// Processing input stream with loaded adjacency.
1797    Processing {
1798        adjacency: EdgeAdjacencyMap,
1799        input_stream: SendableRecordBatchStream,
1800    },
1801    /// Stream is done.
1802    Done,
1803}
1804
1805/// Stream that executes schemaless edge traversal.
1806struct GraphTraverseMainStream {
1807    /// Source column name.
1808    source_column: String,
1809
1810    /// Target variable name.
1811    target_variable: String,
1812
1813    /// Edge variable name.
1814    edge_variable: Option<String>,
1815
1816    /// Edge properties to materialize.
1817    edge_properties: Vec<String>,
1818
1819    /// Target properties to materialize.
1820    target_properties: Vec<String>,
1821
1822    /// Graph execution context.
1823    graph_ctx: Arc<GraphExecutionContext>,
1824
1825    /// Whether this is optional (preserve unmatched rows).
1826    optional: bool,
1827
1828    /// Variables introduced by OPTIONAL pattern.
1829    optional_pattern_vars: HashSet<String>,
1830
1831    /// Column name of an already-bound target VID (for filtering).
1832    bound_target_column: Option<String>,
1833
1834    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1835    used_edge_columns: Vec<String>,
1836
1837    /// Output schema.
1838    schema: SchemaRef,
1839
1840    /// Stream state.
1841    state: GraphTraverseMainState,
1842
1843    /// Metrics.
1844    metrics: BaselineMetrics,
1845}
1846
1847impl GraphTraverseMainStream {
1848    /// Create a new traverse main stream.
1849    #[expect(clippy::too_many_arguments)]
1850    fn new(
1851        input_stream: SendableRecordBatchStream,
1852        source_column: String,
1853        type_names: Vec<String>,
1854        direction: Direction,
1855        target_variable: String,
1856        edge_variable: Option<String>,
1857        edge_properties: Vec<String>,
1858        target_properties: Vec<String>,
1859        graph_ctx: Arc<GraphExecutionContext>,
1860        optional: bool,
1861        optional_pattern_vars: HashSet<String>,
1862        bound_target_column: Option<String>,
1863        used_edge_columns: Vec<String>,
1864        schema: SchemaRef,
1865        metrics: BaselineMetrics,
1866    ) -> Self {
1867        // Start by loading the adjacency map from the main edges table
1868        let loading_ctx = graph_ctx.clone();
1869        let loading_types = type_names.clone();
1870        let fut =
1871            async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1872
1873        Self {
1874            source_column,
1875            target_variable,
1876            edge_variable,
1877            edge_properties,
1878            target_properties,
1879            graph_ctx,
1880            optional,
1881            optional_pattern_vars,
1882            bound_target_column,
1883            used_edge_columns,
1884            schema,
1885            state: GraphTraverseMainState::LoadingEdges {
1886                future: Box::pin(fut),
1887                input_stream,
1888            },
1889            metrics,
1890        }
1891    }
1892
1893    /// Expand input batch using adjacency map (synchronous version).
1894    fn expand_batch(
1895        &self,
1896        input: &RecordBatch,
1897        adjacency: &EdgeAdjacencyMap,
1898    ) -> DFResult<RecordBatch> {
1899        // Extract source VIDs from source column
1900        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1901            datafusion::error::DataFusionError::Execution(format!(
1902                "Source column {} not found",
1903                self.source_column
1904            ))
1905        })?;
1906
1907        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1908        let source_vids: &UInt64Array = &source_vid_cow;
1909
1910        // Read bound target VIDs if column exists
1911        let bound_target_cow = self
1912            .bound_target_column
1913            .as_ref()
1914            .and_then(|col| input.column_by_name(col))
1915            .map(|c| column_as_vid_array(c.as_ref()))
1916            .transpose()?;
1917        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1918
1919        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1920        let used_edge_arrays: Vec<&UInt64Array> = self
1921            .used_edge_columns
1922            .iter()
1923            .filter_map(|col| {
1924                input
1925                    .column_by_name(col)
1926                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1927            })
1928            .collect();
1929
1930        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props)
1931        type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1932        let mut expansions: Vec<Expansion> = Vec::new();
1933
1934        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1935            if let Some(src_u64) = src_u64 {
1936                let src_vid = Vid::from(src_u64);
1937
1938                // Collect used edge IDs for this row from all previous hops
1939                let used_eids: HashSet<u64> = used_edge_arrays
1940                    .iter()
1941                    .filter_map(|arr| {
1942                        if arr.is_null(row_idx) {
1943                            None
1944                        } else {
1945                            Some(arr.value(row_idx))
1946                        }
1947                    })
1948                    .collect();
1949
1950                if let Some(neighbors) = adjacency.get(&src_vid) {
1951                    for (target_vid, eid, edge_type, props) in neighbors {
1952                        // Skip edges already used in previous hops (relationship uniqueness)
1953                        if used_eids.contains(&eid.as_u64()) {
1954                            continue;
1955                        }
1956
1957                        // Filter by bound target VID if set (for patterns where target is in scope).
1958                        // Only include traversals where the target matches the expected VID.
1959                        if let Some(targets) = expected_targets {
1960                            if targets.is_null(row_idx) {
1961                                continue;
1962                            }
1963                            let expected_vid = targets.value(row_idx);
1964                            if target_vid.as_u64() != expected_vid {
1965                                continue;
1966                            }
1967                        }
1968
1969                        expansions.push((
1970                            row_idx,
1971                            *target_vid,
1972                            *eid,
1973                            edge_type.clone(),
1974                            props.clone(),
1975                        ));
1976                    }
1977                }
1978            }
1979        }
1980
1981        // Handle OPTIONAL: preserve unmatched rows
1982        if expansions.is_empty() && self.optional {
1983            // No matches - return input with NULL columns appended
1984            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1985            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1986        }
1987
1988        if expansions.is_empty() {
1989            // No matches, not optional - return empty batch
1990            return Ok(RecordBatch::new_empty(self.schema.clone()));
1991        }
1992
1993        // Track matched rows for OPTIONAL handling
1994        let matched_rows: HashSet<usize> = if self.optional {
1995            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1996        } else {
1997            HashSet::new()
1998        };
1999
2000        // Expand input columns using Arrow take()
2001        let mut columns: Vec<ArrayRef> = Vec::new();
2002        let indices: Vec<u64> = expansions
2003            .iter()
2004            .map(|(idx, _, _, _, _)| *idx as u64)
2005            .collect();
2006        let indices_array = UInt64Array::from(indices);
2007
2008        for col in input.columns() {
2009            let expanded = take(col.as_ref(), &indices_array, None)?;
2010            columns.push(expanded);
2011        }
2012
2013        // Add target ._vid column (only if not already in input)
2014        let target_vid_name = format!("{}._vid", self.target_variable);
2015        let target_vids: Vec<u64> = expansions
2016            .iter()
2017            .map(|(_, vid, _, _, _)| vid.as_u64())
2018            .collect();
2019        if input.schema().column_with_name(&target_vid_name).is_none() {
2020            columns.push(Arc::new(UInt64Array::from(target_vids)));
2021        }
2022
2023        // Add target ._labels column (only if not already in input)
2024        let target_labels_name = format!("{}._labels", self.target_variable);
2025        if input
2026            .schema()
2027            .column_with_name(&target_labels_name)
2028            .is_none()
2029        {
2030            use arrow_array::builder::{ListBuilder, StringBuilder};
2031            let l0_ctx = self.graph_ctx.l0_context();
2032            let mut labels_builder = ListBuilder::new(StringBuilder::new());
2033            for (_, target_vid, _, _, _) in &expansions {
2034                let mut row_labels: Vec<String> = Vec::new();
2035                for l0 in l0_ctx.iter_l0_buffers() {
2036                    let guard = l0.read();
2037                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2038                        for lbl in l0_labels {
2039                            if !row_labels.contains(lbl) {
2040                                row_labels.push(lbl.clone());
2041                            }
2042                        }
2043                    }
2044                }
2045                let values = labels_builder.values();
2046                for lbl in &row_labels {
2047                    values.append_value(lbl);
2048                }
2049                labels_builder.append(true);
2050            }
2051            columns.push(Arc::new(labels_builder.finish()));
2052        }
2053
2054        // Add edge columns if edge variable is bound.
2055        // For anonymous relationships, emit internal edge IDs for BindPath.
2056        if self.edge_variable.is_some() {
2057            // Add edge ._eid column
2058            let eids: Vec<u64> = expansions
2059                .iter()
2060                .map(|(_, _, eid, _, _)| eid.as_u64())
2061                .collect();
2062            columns.push(Arc::new(UInt64Array::from(eids)));
2063
2064            // Add edge ._type column
2065            {
2066                let mut type_builder = arrow_array::builder::StringBuilder::new();
2067                for (_, _, _, edge_type, _) in &expansions {
2068                    type_builder.append_value(edge_type);
2069                }
2070                columns.push(Arc::new(type_builder.finish()));
2071            }
2072
2073            // Add edge property columns as cv_encoded LargeBinary to preserve types
2074            for prop_name in &self.edge_properties {
2075                use crate::query::df_graph::scan::encode_cypher_value;
2076                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2077                if prop_name == "_all_props" {
2078                    // Serialize all edge properties to CypherValue blob
2079                    for (_, _, _, _, props) in &expansions {
2080                        if props.is_empty() {
2081                            builder.append_null();
2082                        } else {
2083                            let mut json_map = serde_json::Map::new();
2084                            for (k, v) in props.iter() {
2085                                let json_val: serde_json::Value = v.clone().into();
2086                                json_map.insert(k.clone(), json_val);
2087                            }
2088                            let json = serde_json::Value::Object(json_map);
2089                            match encode_cypher_value(&json) {
2090                                Ok(bytes) => builder.append_value(bytes),
2091                                Err(_) => builder.append_null(),
2092                            }
2093                        }
2094                    }
2095                } else {
2096                    // Named property as cv_encoded CypherValue
2097                    for (_, _, _, _, props) in &expansions {
2098                        match props.get(prop_name) {
2099                            Some(uni_common::Value::Null) | None => builder.append_null(),
2100                            Some(val) => {
2101                                let json_val: serde_json::Value = val.clone().into();
2102                                match encode_cypher_value(&json_val) {
2103                                    Ok(bytes) => builder.append_value(bytes),
2104                                    Err(_) => builder.append_null(),
2105                                }
2106                            }
2107                        }
2108                    }
2109                }
2110                columns.push(Arc::new(builder.finish()));
2111            }
2112        } else {
2113            let eids: Vec<u64> = expansions
2114                .iter()
2115                .map(|(_, _, eid, _, _)| eid.as_u64())
2116                .collect();
2117            columns.push(Arc::new(UInt64Array::from(eids)));
2118        }
2119
2120        // Add target property columns (hydrate from L0 buffers)
2121        {
2122            use crate::query::df_graph::scan::encode_cypher_value;
2123            let l0_ctx = self.graph_ctx.l0_context();
2124
2125            for prop_name in &self.target_properties {
2126                if prop_name == "_all_props" {
2127                    // Build full CypherValue blob from all L0 vertex properties
2128                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2129                    for (_, target_vid, _, _, _) in &expansions {
2130                        let mut merged_props = serde_json::Map::new();
2131                        for l0 in l0_ctx.iter_l0_buffers() {
2132                            let guard = l0.read();
2133                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2134                                for (k, v) in props.iter() {
2135                                    let json_val: serde_json::Value = v.clone().into();
2136                                    merged_props.insert(k.to_string(), json_val);
2137                                }
2138                            }
2139                        }
2140                        if merged_props.is_empty() {
2141                            builder.append_null();
2142                        } else {
2143                            let json = serde_json::Value::Object(merged_props);
2144                            match encode_cypher_value(&json) {
2145                                Ok(bytes) => builder.append_value(bytes),
2146                                Err(_) => builder.append_null(),
2147                            }
2148                        }
2149                    }
2150                    columns.push(Arc::new(builder.finish()));
2151                } else {
2152                    // Extract individual property from L0 and encode as CypherValue
2153                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2154                    for (_, target_vid, _, _, _) in &expansions {
2155                        let mut found = false;
2156                        for l0 in l0_ctx.iter_l0_buffers() {
2157                            let guard = l0.read();
2158                            if let Some(props) = guard.vertex_properties.get(target_vid)
2159                                && let Some(val) = props.get(prop_name.as_str())
2160                                && !val.is_null()
2161                            {
2162                                let json_val: serde_json::Value = val.clone().into();
2163                                if let Ok(bytes) = encode_cypher_value(&json_val) {
2164                                    builder.append_value(bytes);
2165                                    found = true;
2166                                    break;
2167                                }
2168                            }
2169                        }
2170                        if !found {
2171                            builder.append_null();
2172                        }
2173                    }
2174                    columns.push(Arc::new(builder.finish()));
2175                }
2176            }
2177        }
2178
2179        let matched_batch =
2180            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2181
2182        // Handle OPTIONAL: append unmatched rows with NULLs
2183        if self.optional {
2184            let unmatched = collect_unmatched_optional_group_rows(
2185                input,
2186                &matched_rows,
2187                &self.schema,
2188                &self.optional_pattern_vars,
2189            )?;
2190
2191            if unmatched.is_empty() {
2192                return Ok(matched_batch);
2193            }
2194
2195            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2196                input,
2197                &unmatched,
2198                &self.schema,
2199                &self.optional_pattern_vars,
2200            )?;
2201
2202            // Concatenate matched and unmatched batches
2203            use arrow::compute::concat_batches;
2204            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2205        } else {
2206            Ok(matched_batch)
2207        }
2208    }
2209}
2210
2211impl GraphExecutionContext {
2212    /// Records a schemaless traversal's full edge-type scan into the SSI read-set.
2213    ///
2214    /// `build_edge_adjacency_map` scans every edge of the traversed type(s) via
2215    /// `find_edges_by_type_names` up front, so the transaction's true read
2216    /// footprint is that whole adjacency — not just the neighbourhoods later
2217    /// expanded. Recording it here gives schemaless single-hop and
2218    /// variable-length traversal (which bypass the per-vertex `get_neighbors`
2219    /// path) the same item-level antidependency coverage the schema-typed
2220    /// `record_neighbor_reads` path provides. No-op for read-only transactions
2221    /// (`occ_read_set` is `Some` only for read-write transactions).
2222    fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2223        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2224            return;
2225        };
2226        let guard = tx_l0.read();
2227        let Some(read_set) = &guard.occ_read_set else {
2228            return;
2229        };
2230        let mut rs = read_set.lock();
2231        for (src, neighbors) in adjacency {
2232            rs.vertices.insert(*src);
2233            for (nbr, eid, _type, _props) in neighbors {
2234                rs.vertices.insert(*nbr);
2235                rs.edges.insert(*eid);
2236            }
2237        }
2238    }
2239}
2240
2241/// Build adjacency map from main edges table for given type names and direction.
2242///
2243/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2244/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2245/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2246async fn build_edge_adjacency_map(
2247    graph_ctx: &GraphExecutionContext,
2248    type_names: &[String],
2249    direction: Direction,
2250) -> DFResult<EdgeAdjacencyMap> {
2251    let storage = graph_ctx.storage();
2252    let l0_ctx = graph_ctx.l0_context();
2253
2254    // Step 1: Query main edges table for all type names
2255    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2256    let edges_with_type = storage
2257        .find_edges_by_type_names(&type_refs)
2258        .await
2259        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2260
2261    // Preserve edge type name in the adjacency map for type(r) support
2262    let mut edges: Vec<(
2263        uni_common::Eid,
2264        uni_common::Vid,
2265        uni_common::Vid,
2266        String,
2267        uni_common::Properties,
2268    )> = edges_with_type.into_iter().collect();
2269
2270    // Step 2: Overlay L0 buffers for all type names
2271    for l0 in l0_ctx.iter_l0_buffers() {
2272        let l0_guard = l0.read();
2273
2274        for type_name in type_names {
2275            let l0_eids = l0_guard.eids_for_type(type_name);
2276
2277            // For each L0 edge, extract its information
2278            for &eid in &l0_eids {
2279                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2280                    let src_vid = edge_ref.src_vid;
2281                    let dst_vid = edge_ref.dst_vid;
2282
2283                    // Get properties for this edge from L0
2284                    let props = l0_guard
2285                        .edge_properties
2286                        .get(&eid)
2287                        .cloned()
2288                        .unwrap_or_default();
2289
2290                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2291                }
2292            }
2293        }
2294    }
2295
2296    // Step 3: Deduplicate by EID (L0 takes precedence)
2297    let mut seen_eids = HashSet::new();
2298    let mut unique_edges = Vec::new();
2299    for edge in edges.into_iter().rev() {
2300        if seen_eids.insert(edge.0) {
2301            unique_edges.push(edge);
2302        }
2303    }
2304    unique_edges.reverse();
2305
2306    // Step 4: Filter out edges tombstoned in any L0 buffer
2307    let mut tombstoned_eids = HashSet::new();
2308    for l0 in l0_ctx.iter_l0_buffers() {
2309        let l0_guard = l0.read();
2310        for eid in l0_guard.tombstones.keys() {
2311            tombstoned_eids.insert(*eid);
2312        }
2313    }
2314    if !tombstoned_eids.is_empty() {
2315        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2316    }
2317
2318    // Step 5: Build adjacency map based on direction
2319    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2320
2321    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2322        match direction {
2323            Direction::Outgoing => {
2324                adjacency
2325                    .entry(src_vid)
2326                    .or_default()
2327                    .push((dst_vid, eid, edge_type, props));
2328            }
2329            Direction::Incoming => {
2330                adjacency
2331                    .entry(dst_vid)
2332                    .or_default()
2333                    .push((src_vid, eid, edge_type, props));
2334            }
2335            Direction::Both => {
2336                adjacency.entry(src_vid).or_default().push((
2337                    dst_vid,
2338                    eid,
2339                    edge_type.clone(),
2340                    props.clone(),
2341                ));
2342                adjacency
2343                    .entry(dst_vid)
2344                    .or_default()
2345                    .push((src_vid, eid, edge_type, props));
2346            }
2347        }
2348    }
2349
2350    // SSI: a schemaless traversal physically scans the whole edge type above, so
2351    // record that read footprint for read-write transactions (no-op otherwise).
2352    graph_ctx.record_edge_adjacency(&adjacency);
2353
2354    Ok(adjacency)
2355}
2356
2357impl Stream for GraphTraverseMainStream {
2358    type Item = DFResult<RecordBatch>;
2359
2360    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2361        let metrics = self.metrics.clone();
2362        let _timer = metrics.elapsed_compute().timer();
2363        loop {
2364            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2365
2366            match state {
2367                GraphTraverseMainState::LoadingEdges {
2368                    mut future,
2369                    input_stream,
2370                } => match future.as_mut().poll(cx) {
2371                    Poll::Ready(Ok(adjacency)) => {
2372                        // Move to processing state with loaded adjacency
2373                        self.state = GraphTraverseMainState::Processing {
2374                            adjacency,
2375                            input_stream,
2376                        };
2377                        // Continue loop to start processing
2378                    }
2379                    Poll::Ready(Err(e)) => {
2380                        self.state = GraphTraverseMainState::Done;
2381                        return Poll::Ready(Some(Err(e)));
2382                    }
2383                    Poll::Pending => {
2384                        self.state = GraphTraverseMainState::LoadingEdges {
2385                            future,
2386                            input_stream,
2387                        };
2388                        return Poll::Pending;
2389                    }
2390                },
2391                GraphTraverseMainState::Processing {
2392                    adjacency,
2393                    mut input_stream,
2394                } => {
2395                    // Check timeout
2396                    if let Err(e) = self.graph_ctx.check_timeout() {
2397                        return Poll::Ready(Some(Err(
2398                            datafusion::error::DataFusionError::Execution(e.to_string()),
2399                        )));
2400                    }
2401
2402                    match input_stream.poll_next_unpin(cx) {
2403                        Poll::Ready(Some(Ok(batch))) => {
2404                            // Expand batch using adjacency map
2405                            let result = self.expand_batch(&batch, &adjacency);
2406
2407                            self.state = GraphTraverseMainState::Processing {
2408                                adjacency,
2409                                input_stream,
2410                            };
2411
2412                            if let Ok(ref r) = result {
2413                                self.metrics.record_output(r.num_rows());
2414                            }
2415                            return Poll::Ready(Some(result));
2416                        }
2417                        Poll::Ready(Some(Err(e))) => {
2418                            self.state = GraphTraverseMainState::Done;
2419                            return Poll::Ready(Some(Err(e)));
2420                        }
2421                        Poll::Ready(None) => {
2422                            self.state = GraphTraverseMainState::Done;
2423                            return Poll::Ready(None);
2424                        }
2425                        Poll::Pending => {
2426                            self.state = GraphTraverseMainState::Processing {
2427                                adjacency,
2428                                input_stream,
2429                            };
2430                            return Poll::Pending;
2431                        }
2432                    }
2433                }
2434                GraphTraverseMainState::Done => {
2435                    return Poll::Ready(None);
2436                }
2437            }
2438        }
2439    }
2440}
2441
2442impl RecordBatchStream for GraphTraverseMainStream {
2443    fn schema(&self) -> SchemaRef {
2444        self.schema.clone()
2445    }
2446}
2447
2448/// Variable-length graph traversal execution plan.
2449///
2450/// Performs BFS traversal from source vertices with configurable min/max hops.
2451/// Tracks visited nodes to avoid cycles.
2452///
2453/// # Example
2454///
2455/// ```ignore
2456/// // Find all nodes 1-3 hops away via KNOWS edges
2457/// let traverse = GraphVariableLengthTraverseExec::new(
2458///     input_plan,
2459///     "_vid",
2460///     knows_type_id,
2461///     Direction::Outgoing,
2462///     1,  // min_hops
2463///     3,  // max_hops
2464///     Some("p"), // path variable
2465///     graph_ctx,
2466/// );
2467/// ```
2468pub struct GraphVariableLengthTraverseExec {
2469    /// Input execution plan.
2470    input: Arc<dyn ExecutionPlan>,
2471
2472    /// Column name containing source VIDs.
2473    source_column: String,
2474
2475    /// Edge type IDs to traverse.
2476    edge_type_ids: Vec<u32>,
2477
2478    /// Traversal direction.
2479    direction: Direction,
2480
2481    /// Minimum number of hops.
2482    min_hops: usize,
2483
2484    /// Maximum number of hops.
2485    max_hops: usize,
2486
2487    /// Variable name for target vertex columns.
2488    target_variable: String,
2489
2490    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2491    step_variable: Option<String>,
2492
2493    /// Variable name for path (if path is bound).
2494    path_variable: Option<String>,
2495
2496    /// Target vertex properties to materialize.
2497    target_properties: Vec<String>,
2498
2499    /// Target label name for property type resolution.
2500    target_label_name: Option<String>,
2501
2502    /// Whether this is an optional match (LEFT JOIN semantics).
2503    is_optional: bool,
2504
2505    /// Column name of an already-bound target VID (for patterns where target is in scope).
2506    bound_target_column: Option<String>,
2507
2508    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2509    edge_lance_filter: Option<String>,
2510
2511    /// Simple property equality conditions for per-edge L0 checking during BFS.
2512    /// Each entry is (property_name, expected_value).
2513    edge_property_conditions: Vec<(String, UniValue)>,
2514
2515    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2516    used_edge_columns: Vec<String>,
2517
2518    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2519    path_mode: super::nfa::PathMode,
2520
2521    /// Output mode determining BFS strategy.
2522    output_mode: super::nfa::VlpOutputMode,
2523
2524    /// Compiled NFA for path pattern matching.
2525    nfa: Arc<PathNfa>,
2526
2527    /// Graph execution context.
2528    graph_ctx: Arc<GraphExecutionContext>,
2529
2530    /// Output schema.
2531    schema: SchemaRef,
2532
2533    /// Cached plan properties.
2534    properties: Arc<PlanProperties>,
2535
2536    /// Execution metrics.
2537    metrics: ExecutionPlanMetricsSet,
2538}
2539
2540impl fmt::Debug for GraphVariableLengthTraverseExec {
2541    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2542        f.debug_struct("GraphVariableLengthTraverseExec")
2543            .field("source_column", &self.source_column)
2544            .field("edge_type_ids", &self.edge_type_ids)
2545            .field("direction", &self.direction)
2546            .field("min_hops", &self.min_hops)
2547            .field("max_hops", &self.max_hops)
2548            .field("target_variable", &self.target_variable)
2549            .finish()
2550    }
2551}
2552
2553impl GraphVariableLengthTraverseExec {
2554    /// Create a new variable-length traversal plan.
2555    ///
2556    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2557    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2558    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2559    #[expect(clippy::too_many_arguments)]
2560    pub fn new(
2561        input: Arc<dyn ExecutionPlan>,
2562        source_column: impl Into<String>,
2563        edge_type_ids: Vec<u32>,
2564        direction: Direction,
2565        min_hops: usize,
2566        max_hops: usize,
2567        target_variable: impl Into<String>,
2568        step_variable: Option<String>,
2569        path_variable: Option<String>,
2570        target_properties: Vec<String>,
2571        target_label_name: Option<String>,
2572        graph_ctx: Arc<GraphExecutionContext>,
2573        is_optional: bool,
2574        bound_target_column: Option<String>,
2575        edge_lance_filter: Option<String>,
2576        edge_property_conditions: Vec<(String, UniValue)>,
2577        used_edge_columns: Vec<String>,
2578        path_mode: super::nfa::PathMode,
2579        output_mode: super::nfa::VlpOutputMode,
2580        qpp_nfa: Option<PathNfa>,
2581    ) -> Self {
2582        let source_column = source_column.into();
2583        let target_variable = target_variable.into();
2584
2585        // Resolve target property Arrow types from the schema
2586        let uni_schema = graph_ctx.storage().schema_manager().schema();
2587        let label_props = target_label_name
2588            .as_deref()
2589            .and_then(|ln| uni_schema.properties.get(ln));
2590
2591        // Build output schema
2592        let schema = Self::build_schema(
2593            input.schema(),
2594            &target_variable,
2595            step_variable.as_deref(),
2596            path_variable.as_deref(),
2597            &target_properties,
2598            label_props,
2599        );
2600        let properties = compute_plan_properties(schema.clone());
2601
2602        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2603        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2604            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2605        }));
2606
2607        Self {
2608            input,
2609            source_column,
2610            edge_type_ids,
2611            direction,
2612            min_hops,
2613            max_hops,
2614            target_variable,
2615            step_variable,
2616            path_variable,
2617            target_properties,
2618            target_label_name,
2619            is_optional,
2620            bound_target_column,
2621            edge_lance_filter,
2622            edge_property_conditions,
2623            used_edge_columns,
2624            path_mode,
2625            output_mode,
2626            nfa,
2627            graph_ctx,
2628            schema,
2629            properties,
2630            metrics: ExecutionPlanMetricsSet::new(),
2631        }
2632    }
2633
2634    /// Build output schema.
2635    fn build_schema(
2636        input_schema: SchemaRef,
2637        target_variable: &str,
2638        step_variable: Option<&str>,
2639        path_variable: Option<&str>,
2640        target_properties: &[String],
2641        label_props: Option<
2642            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2643        >,
2644    ) -> SchemaRef {
2645        let mut fields: Vec<Field> = input_schema
2646            .fields()
2647            .iter()
2648            .map(|f| f.as_ref().clone())
2649            .collect();
2650
2651        // Add target VID column (only if not already in input)
2652        let target_vid_name = format!("{}._vid", target_variable);
2653        if input_schema.column_with_name(&target_vid_name).is_none() {
2654            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2655        }
2656
2657        // Add target ._labels column (only if not already in input)
2658        let target_labels_name = format!("{}._labels", target_variable);
2659        if input_schema.column_with_name(&target_labels_name).is_none() {
2660            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2661        }
2662
2663        // Add target vertex property columns (skip if already in input)
2664        for prop_name in target_properties {
2665            let col_name = format!("{}.{}", target_variable, prop_name);
2666            if input_schema.column_with_name(&col_name).is_none() {
2667                let arrow_type = resolve_property_type(prop_name, label_props);
2668                fields.push(Field::new(&col_name, arrow_type, true));
2669            }
2670        }
2671
2672        // Add hop count
2673        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2674
2675        // Add step variable (edge list) if bound
2676        if let Some(step_var) = step_variable {
2677            fields.push(build_edge_list_field(step_var));
2678        }
2679
2680        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2681        if let Some(path_var) = path_variable
2682            && input_schema.column_with_name(path_var).is_none()
2683        {
2684            fields.push(build_path_struct_field(path_var));
2685        }
2686
2687        Arc::new(Schema::new(fields))
2688    }
2689}
2690
2691impl DisplayAs for GraphVariableLengthTraverseExec {
2692    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2693        write!(
2694            f,
2695            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2696            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2697        )
2698    }
2699}
2700
2701impl ExecutionPlan for GraphVariableLengthTraverseExec {
2702    fn name(&self) -> &str {
2703        "GraphVariableLengthTraverseExec"
2704    }
2705
2706    fn as_any(&self) -> &dyn Any {
2707        self
2708    }
2709
2710    fn schema(&self) -> SchemaRef {
2711        self.schema.clone()
2712    }
2713
2714    fn properties(&self) -> &Arc<PlanProperties> {
2715        &self.properties
2716    }
2717
2718    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2719        vec![&self.input]
2720    }
2721
2722    fn with_new_children(
2723        self: Arc<Self>,
2724        children: Vec<Arc<dyn ExecutionPlan>>,
2725    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2726        if children.len() != 1 {
2727            return Err(datafusion::error::DataFusionError::Plan(
2728                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2729            ));
2730        }
2731
2732        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2733        Ok(Arc::new(Self::new(
2734            children[0].clone(),
2735            self.source_column.clone(),
2736            self.edge_type_ids.clone(),
2737            self.direction,
2738            self.min_hops,
2739            self.max_hops,
2740            self.target_variable.clone(),
2741            self.step_variable.clone(),
2742            self.path_variable.clone(),
2743            self.target_properties.clone(),
2744            self.target_label_name.clone(),
2745            self.graph_ctx.clone(),
2746            self.is_optional,
2747            self.bound_target_column.clone(),
2748            self.edge_lance_filter.clone(),
2749            self.edge_property_conditions.clone(),
2750            self.used_edge_columns.clone(),
2751            self.path_mode.clone(),
2752            self.output_mode.clone(),
2753            Some((*self.nfa).clone()),
2754        )))
2755    }
2756
2757    fn execute(
2758        &self,
2759        partition: usize,
2760        context: Arc<TaskContext>,
2761    ) -> DFResult<SendableRecordBatchStream> {
2762        let input_stream = self.input.execute(partition, context)?;
2763
2764        let metrics = BaselineMetrics::new(&self.metrics, partition);
2765
2766        let warm_fut = self
2767            .graph_ctx
2768            .warming_future(self.edge_type_ids.clone(), self.direction);
2769
2770        Ok(Box::pin(GraphVariableLengthTraverseStream {
2771            input: input_stream,
2772            exec: Arc::new(self.clone_for_stream()),
2773            schema: self.schema.clone(),
2774            state: VarLengthStreamState::Warming(warm_fut),
2775            metrics,
2776        }))
2777    }
2778
2779    fn metrics(&self) -> Option<MetricsSet> {
2780        Some(self.metrics.clone_inner())
2781    }
2782}
2783
2784impl GraphVariableLengthTraverseExec {
2785    /// Clone fields needed for stream (avoids cloning the full struct).
2786    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2787        GraphVariableLengthTraverseExecData {
2788            source_column: self.source_column.clone(),
2789            edge_type_ids: self.edge_type_ids.clone(),
2790            direction: self.direction,
2791            min_hops: self.min_hops,
2792            max_hops: self.max_hops,
2793            target_variable: self.target_variable.clone(),
2794            step_variable: self.step_variable.clone(),
2795            path_variable: self.path_variable.clone(),
2796            target_properties: self.target_properties.clone(),
2797            target_label_name: self.target_label_name.clone(),
2798            is_optional: self.is_optional,
2799            bound_target_column: self.bound_target_column.clone(),
2800            edge_lance_filter: self.edge_lance_filter.clone(),
2801            edge_property_conditions: self.edge_property_conditions.clone(),
2802            used_edge_columns: self.used_edge_columns.clone(),
2803            path_mode: self.path_mode.clone(),
2804            output_mode: self.output_mode.clone(),
2805            nfa: self.nfa.clone(),
2806            graph_ctx: self.graph_ctx.clone(),
2807        }
2808    }
2809}
2810
2811/// Data needed by the stream (without ExecutionPlan overhead).
2812#[expect(
2813    dead_code,
2814    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2815)]
2816struct GraphVariableLengthTraverseExecData {
2817    source_column: String,
2818    edge_type_ids: Vec<u32>,
2819    direction: Direction,
2820    min_hops: usize,
2821    max_hops: usize,
2822    target_variable: String,
2823    step_variable: Option<String>,
2824    path_variable: Option<String>,
2825    target_properties: Vec<String>,
2826    target_label_name: Option<String>,
2827    is_optional: bool,
2828    bound_target_column: Option<String>,
2829    #[expect(dead_code, reason = "Used in Phase 3 warming")]
2830    edge_lance_filter: Option<String>,
2831    /// Simple property equality conditions for per-edge L0 checking during BFS.
2832    edge_property_conditions: Vec<(String, UniValue)>,
2833    used_edge_columns: Vec<String>,
2834    path_mode: super::nfa::PathMode,
2835    output_mode: super::nfa::VlpOutputMode,
2836    nfa: Arc<PathNfa>,
2837    graph_ctx: Arc<GraphExecutionContext>,
2838}
2839
2840/// Safety cap for frontier size to prevent OOM on pathological graphs.
2841const MAX_FRONTIER_SIZE: usize = 500_000;
2842/// Safety cap for predecessor pool size.
2843const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2844
2845impl GraphVariableLengthTraverseExecData {
2846    /// Check if a vertex passes the target label filter.
2847    fn check_target_label(&self, vid: Vid) -> bool {
2848        if let Some(ref label_name) = self.target_label_name {
2849            let query_ctx = self.graph_ctx.query_context();
2850            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2851                Some(labels) => labels.contains(label_name),
2852                None => true, // not in L0, trust storage
2853            }
2854        } else {
2855            true
2856        }
2857    }
2858
2859    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
2860    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2861        match constraint {
2862            super::nfa::VertexConstraint::Label(label_name) => {
2863                let query_ctx = self.graph_ctx.query_context();
2864                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2865                    Some(labels) => labels.contains(label_name),
2866                    None => true, // not in L0, trust storage
2867                }
2868            }
2869        }
2870    }
2871
2872    /// Expand neighbors from a vertex through all NFA transitions from the given state.
2873    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
2874    fn expand_neighbors(
2875        &self,
2876        vid: Vid,
2877        state: NfaStateId,
2878        eid_filter: &EidFilter,
2879        used_eids: &FxHashSet<u64>,
2880    ) -> Vec<(Vid, Eid, NfaStateId)> {
2881        let is_undirected = matches!(self.direction, Direction::Both);
2882        let mut results = Vec::new();
2883
2884        for transition in self.nfa.transitions_from(state) {
2885            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2886
2887            for &etype in &transition.edge_type_ids {
2888                for (neighbor, eid) in
2889                    self.graph_ctx
2890                        .get_neighbors(vid, etype, transition.direction)
2891                {
2892                    // Deduplicate edges for undirected patterns
2893                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
2894                        continue;
2895                    }
2896
2897                    // Check EidFilter (edge property bitmap preselection)
2898                    if !eid_filter.contains(eid) {
2899                        continue;
2900                    }
2901
2902                    // Check edge property conditions (L0 in-memory properties)
2903                    if !self.edge_property_conditions.is_empty() {
2904                        let query_ctx = self.graph_ctx.query_context();
2905                        let passes = if let Some(props) =
2906                            l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2907                        {
2908                            self.edge_property_conditions
2909                                .iter()
2910                                .all(|(name, expected)| {
2911                                    props.get(name).is_some_and(|actual| actual == expected)
2912                                })
2913                        } else {
2914                            // Edge not in L0 (CSR/Lance) — relies on EidFilter
2915                            // for correctness. TODO: build EidFilter from Lance
2916                            // during warming for flushed edges.
2917                            true
2918                        };
2919                        if !passes {
2920                            continue;
2921                        }
2922                    }
2923
2924                    // Check cross-pattern relationship uniqueness
2925                    if used_eids.contains(&eid.as_u64()) {
2926                        continue;
2927                    }
2928
2929                    // Check NFA state constraint on the destination state (QPP label filters)
2930                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
2931                        && !self.check_state_constraint(neighbor, constraint)
2932                    {
2933                        continue;
2934                    }
2935
2936                    results.push((neighbor, eid, transition.to));
2937                }
2938            }
2939        }
2940
2941        results
2942    }
2943
2944    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
2945    ///
2946    /// Returns BFS results in the same format as the old bfs() for compatibility
2947    /// with build_output_batch.
2948    fn bfs_with_dag(
2949        &self,
2950        source: Vid,
2951        eid_filter: &EidFilter,
2952        used_eids: &FxHashSet<u64>,
2953        vid_filter: &VidFilter,
2954    ) -> Vec<BfsResult> {
2955        let nfa = &self.nfa;
2956        let selector = PathSelector::All;
2957        let mut dag = PredecessorDag::new(selector);
2958        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2959
2960        // Handle zero-length paths (min_hops == 0)
2961        if nfa.is_accepting(nfa.start_state())
2962            && self.check_target_label(source)
2963            && vid_filter.contains(source)
2964        {
2965            accepting.push((source, nfa.start_state(), 0));
2966        }
2967
2968        // Per-depth frontier BFS
2969        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2970        let mut depth: u32 = 0;
2971
2972        while !frontier.is_empty() && depth < self.max_hops as u32 {
2973            depth += 1;
2974            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2975            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2976
2977            for &(vid, state) in &frontier {
2978                for (neighbor, eid, dst_state) in
2979                    self.expand_neighbors(vid, state, eid_filter, used_eids)
2980                {
2981                    // Record in predecessor DAG
2982                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2983
2984                    // Add to next frontier (deduplicated per depth)
2985                    if seen_at_depth.insert((neighbor, dst_state)) {
2986                        next_frontier.push((neighbor, dst_state));
2987
2988                        // Check if accepting
2989                        if nfa.is_accepting(dst_state)
2990                            && self.check_target_label(neighbor)
2991                            && vid_filter.contains(neighbor)
2992                        {
2993                            accepting.push((neighbor, dst_state, depth));
2994                        }
2995                    }
2996                }
2997            }
2998
2999            // Safety cap
3000            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3001                break;
3002            }
3003
3004            frontier = next_frontier;
3005        }
3006
3007        // Enumerate paths from DAG to produce BfsResult tuples
3008        let mut results: Vec<BfsResult> = Vec::new();
3009        for &(target, state, depth) in &accepting {
3010            dag.enumerate_paths(
3011                source,
3012                target,
3013                state,
3014                depth,
3015                depth,
3016                &self.path_mode,
3017                &mut |nodes, edges| {
3018                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3019                    std::ops::ControlFlow::Continue(())
3020                },
3021            );
3022        }
3023
3024        results
3025    }
3026
3027    /// NFA-driven BFS returning only endpoints and depths (Mode A).
3028    ///
3029    /// More efficient when no path/step variable is bound — skips full path enumeration.
3030    /// Uses lightweight trail verification via has_trail_valid_path().
3031    fn bfs_endpoints_only(
3032        &self,
3033        source: Vid,
3034        eid_filter: &EidFilter,
3035        used_eids: &FxHashSet<u64>,
3036        vid_filter: &VidFilter,
3037    ) -> Vec<(Vid, u32)> {
3038        let nfa = &self.nfa;
3039        let selector = PathSelector::Any; // Only need existence, not all paths
3040        let mut dag = PredecessorDag::new(selector);
3041        let mut results: Vec<(Vid, u32)> = Vec::new();
3042
3043        // Handle zero-length paths
3044        if nfa.is_accepting(nfa.start_state())
3045            && self.check_target_label(source)
3046            && vid_filter.contains(source)
3047        {
3048            results.push((source, 0));
3049        }
3050
3051        // Per-depth frontier BFS
3052        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3053        let mut depth: u32 = 0;
3054
3055        while !frontier.is_empty() && depth < self.max_hops as u32 {
3056            depth += 1;
3057            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3058            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3059
3060            for &(vid, state) in &frontier {
3061                for (neighbor, eid, dst_state) in
3062                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3063                {
3064                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3065
3066                    if seen_at_depth.insert((neighbor, dst_state)) {
3067                        next_frontier.push((neighbor, dst_state));
3068
3069                        // Check if accepting with trail verification
3070                        if nfa.is_accepting(dst_state)
3071                            && self.check_target_label(neighbor)
3072                            && vid_filter.contains(neighbor)
3073                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3074                        {
3075                            results.push((neighbor, depth));
3076                        }
3077                    }
3078                }
3079            }
3080
3081            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3082                break;
3083            }
3084
3085            frontier = next_frontier;
3086        }
3087
3088        results
3089    }
3090}
3091
3092/// State machine for variable-length traverse stream.
3093enum VarLengthStreamState {
3094    /// Warming adjacency CSRs before first batch.
3095    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3096    /// Processing input batches.
3097    Reading,
3098    /// Materializing target vertex properties asynchronously.
3099    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3100    /// Stream is done.
3101    Done,
3102}
3103
3104/// Stream for variable-length traversal.
3105struct GraphVariableLengthTraverseStream {
3106    input: SendableRecordBatchStream,
3107    exec: Arc<GraphVariableLengthTraverseExecData>,
3108    schema: SchemaRef,
3109    state: VarLengthStreamState,
3110    metrics: BaselineMetrics,
3111}
3112
3113impl Stream for GraphVariableLengthTraverseStream {
3114    type Item = DFResult<RecordBatch>;
3115
3116    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3117        let metrics = self.metrics.clone();
3118        let _timer = metrics.elapsed_compute().timer();
3119        loop {
3120            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3121
3122            match state {
3123                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3124                    Poll::Ready(Ok(())) => {
3125                        self.state = VarLengthStreamState::Reading;
3126                        // Continue loop to start reading
3127                    }
3128                    Poll::Ready(Err(e)) => {
3129                        self.state = VarLengthStreamState::Done;
3130                        return Poll::Ready(Some(Err(e)));
3131                    }
3132                    Poll::Pending => {
3133                        self.state = VarLengthStreamState::Warming(fut);
3134                        return Poll::Pending;
3135                    }
3136                },
3137                VarLengthStreamState::Reading => {
3138                    // Check timeout
3139                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3140                        return Poll::Ready(Some(Err(
3141                            datafusion::error::DataFusionError::Execution(e.to_string()),
3142                        )));
3143                    }
3144
3145                    match self.input.poll_next_unpin(cx) {
3146                        Poll::Ready(Some(Ok(batch))) => {
3147                            // Build base batch synchronously (BFS + expand)
3148                            // TODO(Phase 3.5): Build real EidFilter/VidFilter during warming
3149                            let eid_filter = EidFilter::AllAllowed;
3150                            let vid_filter = VidFilter::AllAllowed;
3151                            let base_result =
3152                                self.process_batch_base(batch, &eid_filter, &vid_filter);
3153                            let base_batch = match base_result {
3154                                Ok(b) => b,
3155                                Err(e) => {
3156                                    self.state = VarLengthStreamState::Reading;
3157                                    return Poll::Ready(Some(Err(e)));
3158                                }
3159                            };
3160
3161                            // If no properties need async hydration, return directly
3162                            if self.exec.target_properties.is_empty() {
3163                                self.state = VarLengthStreamState::Reading;
3164                                return Poll::Ready(Some(Ok(base_batch)));
3165                            }
3166
3167                            // Properties needed — create async future for hydration
3168                            let schema = self.schema.clone();
3169                            let target_variable = self.exec.target_variable.clone();
3170                            let target_properties = self.exec.target_properties.clone();
3171                            let target_label_name = self.exec.target_label_name.clone();
3172                            let graph_ctx = self.exec.graph_ctx.clone();
3173
3174                            let fut = hydrate_vlp_target_properties(
3175                                base_batch,
3176                                schema,
3177                                target_variable,
3178                                target_properties,
3179                                target_label_name,
3180                                graph_ctx,
3181                            );
3182
3183                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3184                            // Continue loop to poll the future
3185                        }
3186                        Poll::Ready(Some(Err(e))) => {
3187                            self.state = VarLengthStreamState::Done;
3188                            return Poll::Ready(Some(Err(e)));
3189                        }
3190                        Poll::Ready(None) => {
3191                            self.state = VarLengthStreamState::Done;
3192                            return Poll::Ready(None);
3193                        }
3194                        Poll::Pending => {
3195                            self.state = VarLengthStreamState::Reading;
3196                            return Poll::Pending;
3197                        }
3198                    }
3199                }
3200                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3201                    Poll::Ready(Ok(batch)) => {
3202                        self.state = VarLengthStreamState::Reading;
3203                        self.metrics.record_output(batch.num_rows());
3204                        return Poll::Ready(Some(Ok(batch)));
3205                    }
3206                    Poll::Ready(Err(e)) => {
3207                        self.state = VarLengthStreamState::Done;
3208                        return Poll::Ready(Some(Err(e)));
3209                    }
3210                    Poll::Pending => {
3211                        self.state = VarLengthStreamState::Materializing(fut);
3212                        return Poll::Pending;
3213                    }
3214                },
3215                VarLengthStreamState::Done => {
3216                    return Poll::Ready(None);
3217                }
3218            }
3219        }
3220    }
3221}
3222
3223impl GraphVariableLengthTraverseStream {
3224    fn process_batch_base(
3225        &self,
3226        batch: RecordBatch,
3227        eid_filter: &EidFilter,
3228        vid_filter: &VidFilter,
3229    ) -> DFResult<RecordBatch> {
3230        let source_col = batch
3231            .column_by_name(&self.exec.source_column)
3232            .ok_or_else(|| {
3233                datafusion::error::DataFusionError::Execution(format!(
3234                    "Source column '{}' not found",
3235                    self.exec.source_column
3236                ))
3237            })?;
3238
3239        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3240        let source_vids: &UInt64Array = &source_vid_cow;
3241
3242        // Read bound target VIDs if column exists
3243        let bound_target_cow = self
3244            .exec
3245            .bound_target_column
3246            .as_ref()
3247            .and_then(|col| batch.column_by_name(col))
3248            .map(|c| column_as_vid_array(c.as_ref()))
3249            .transpose()?;
3250        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3251
3252        // Extract used edge columns for cross-pattern relationship uniqueness
3253        let used_edge_arrays: Vec<&UInt64Array> = self
3254            .exec
3255            .used_edge_columns
3256            .iter()
3257            .filter_map(|col| {
3258                batch
3259                    .column_by_name(col)?
3260                    .as_any()
3261                    .downcast_ref::<UInt64Array>()
3262            })
3263            .collect();
3264
3265        // Collect all BFS results
3266        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3267
3268        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3269            let mut emitted_for_row = false;
3270
3271            if let Some(src) = source_vid {
3272                let vid = Vid::from(src);
3273
3274                // Collect used edge IDs from previous hops for this row
3275                let used_eids: FxHashSet<u64> = used_edge_arrays
3276                    .iter()
3277                    .filter_map(|arr| {
3278                        if arr.is_null(row_idx) {
3279                            None
3280                        } else {
3281                            Some(arr.value(row_idx))
3282                        }
3283                    })
3284                    .collect();
3285
3286                // Dispatch to appropriate BFS mode based on output_mode
3287                match &self.exec.output_mode {
3288                    VlpOutputMode::EndpointsOnly => {
3289                        let endpoints = self
3290                            .exec
3291                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3292                        for (target, depth) in endpoints {
3293                            // Filter by bound target VID
3294                            if let Some(targets) = expected_targets {
3295                                if targets.is_null(row_idx) {
3296                                    continue;
3297                                }
3298                                if target.as_u64() != targets.value(row_idx) {
3299                                    continue;
3300                                }
3301                            }
3302                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3303                            emitted_for_row = true;
3304                        }
3305                    }
3306                    _ => {
3307                        // FullPath, StepVariable, CountOnly, etc.
3308                        let bfs_results = self
3309                            .exec
3310                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3311                        for (target, hop_count, node_path, edge_path) in bfs_results {
3312                            // Filter by bound target VID
3313                            if let Some(targets) = expected_targets {
3314                                if targets.is_null(row_idx) {
3315                                    continue;
3316                                }
3317                                if target.as_u64() != targets.value(row_idx) {
3318                                    continue;
3319                                }
3320                            }
3321                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3322                            emitted_for_row = true;
3323                        }
3324                    }
3325                }
3326            }
3327
3328            if self.exec.is_optional && !emitted_for_row {
3329                // Preserve the source row with NULL optional bindings.
3330                // We use empty node/edge paths to mark unmatched rows.
3331                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3332            }
3333        }
3334
3335        self.build_output_batch(&batch, &expansions)
3336    }
3337
3338    fn build_output_batch(
3339        &self,
3340        input: &RecordBatch,
3341        expansions: &[VarLengthExpansion],
3342    ) -> DFResult<RecordBatch> {
3343        if expansions.is_empty() {
3344            return Ok(RecordBatch::new_empty(self.schema.clone()));
3345        }
3346
3347        let num_rows = expansions.len();
3348
3349        // Build index array
3350        let indices: Vec<u64> = expansions
3351            .iter()
3352            .map(|(idx, _, _, _, _)| *idx as u64)
3353            .collect();
3354        let indices_array = UInt64Array::from(indices);
3355
3356        // Expand input columns
3357        let mut columns: Vec<ArrayRef> = Vec::new();
3358        for col in input.columns() {
3359            let expanded = take(col.as_ref(), &indices_array, None)?;
3360            columns.push(expanded);
3361        }
3362
3363        // Collect target VIDs and unmatched markers for use in multiple places.
3364        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3365        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3366        let unmatched_rows: Vec<bool> = expansions
3367            .iter()
3368            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3369            .collect();
3370        let target_vids: Vec<Option<u64>> = expansions
3371            .iter()
3372            .zip(unmatched_rows.iter())
3373            .map(
3374                |((_, vid, _, _, _), unmatched)| {
3375                    if *unmatched { None } else { Some(vid.as_u64()) }
3376                },
3377            )
3378            .collect();
3379
3380        // Add target VID column (only if not already in input)
3381        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3382        if input.schema().column_with_name(&target_vid_name).is_none() {
3383            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3384        }
3385
3386        // Add target ._labels column (only if not already in input)
3387        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3388        if input
3389            .schema()
3390            .column_with_name(&target_labels_name)
3391            .is_none()
3392        {
3393            use arrow_array::builder::{ListBuilder, StringBuilder};
3394            let query_ctx = self.exec.graph_ctx.query_context();
3395            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3396            for target_vid in &target_vids {
3397                let Some(vid_u64) = target_vid else {
3398                    labels_builder.append(false);
3399                    continue;
3400                };
3401                let vid = Vid::from(*vid_u64);
3402                let row_labels: Vec<String> =
3403                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3404                        Some(labels) => {
3405                            // Vertex is in L0 — use actual labels only
3406                            labels
3407                        }
3408                        None => {
3409                            // Vertex not in L0 — trust schema label (storage already filtered)
3410                            if let Some(ref label_name) = self.exec.target_label_name {
3411                                vec![label_name.clone()]
3412                            } else {
3413                                vec![]
3414                            }
3415                        }
3416                    };
3417                let values = labels_builder.values();
3418                for lbl in &row_labels {
3419                    values.append_value(lbl);
3420                }
3421                labels_builder.append(true);
3422            }
3423            columns.push(Arc::new(labels_builder.finish()));
3424        }
3425
3426        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3427        for prop_name in &self.exec.target_properties {
3428            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3429            if input.schema().column_with_name(&full_prop_name).is_none() {
3430                let col_idx = columns.len();
3431                if col_idx < self.schema.fields().len() {
3432                    let field = self.schema.field(col_idx);
3433                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3434                }
3435            }
3436        }
3437
3438        // Add hop count column
3439        let hop_counts: Vec<u64> = expansions
3440            .iter()
3441            .map(|(_, _, hops, _, _)| *hops as u64)
3442            .collect();
3443        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3444
3445        // Add step variable (edge list) column if bound
3446        if self.exec.step_variable.is_some() {
3447            let mut edges_builder = new_edge_list_builder();
3448            let query_ctx = self.exec.graph_ctx.query_context();
3449
3450            for (_, _, _, node_path, edge_path) in expansions {
3451                if node_path.is_empty() && edge_path.is_empty() {
3452                    // Null row for OPTIONAL MATCH unmatched
3453                    edges_builder.append_null();
3454                } else if edge_path.is_empty() {
3455                    // Zero-hop match: empty list
3456                    edges_builder.append(true);
3457                } else {
3458                    for (i, eid) in edge_path.iter().enumerate() {
3459                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3460                            .unwrap_or_else(|| "UNKNOWN".to_string());
3461                        append_edge_to_struct(
3462                            edges_builder.values(),
3463                            *eid,
3464                            &type_name,
3465                            node_path[i].as_u64(),
3466                            node_path[i + 1].as_u64(),
3467                            &query_ctx,
3468                        );
3469                    }
3470                    edges_builder.append(true);
3471                }
3472            }
3473
3474            columns.push(Arc::new(edges_builder.finish()));
3475        }
3476
3477        // Add path variable column if bound.
3478        // For named paths, we output a Path struct with nodes and relationships arrays.
3479        // If a path column already exists in input (from a prior BindFixedPath), extend it
3480        // rather than building from scratch.
3481        if let Some(path_var_name) = &self.exec.path_variable {
3482            let existing_path_col_idx = input
3483                .schema()
3484                .column_with_name(path_var_name)
3485                .map(|(idx, _)| idx);
3486            // Clone the Arc so we can read existing path without borrowing `columns`
3487            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3488            let existing_path = existing_path_arc
3489                .as_ref()
3490                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3491
3492            let mut nodes_builder = new_node_list_builder();
3493            let mut rels_builder = new_edge_list_builder();
3494            let query_ctx = self.exec.graph_ctx.query_context();
3495            let mut path_validity = Vec::with_capacity(expansions.len());
3496
3497            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3498                if node_path.is_empty() && edge_path.is_empty() {
3499                    nodes_builder.append(false);
3500                    rels_builder.append(false);
3501                    path_validity.push(false);
3502                    continue;
3503                }
3504
3505                // Prepend existing path prefix if extending
3506                let skip_first_vlp_node = if let Some(existing) = existing_path {
3507                    if !existing.is_null(row_out_idx) {
3508                        prepend_existing_path(
3509                            existing,
3510                            row_out_idx,
3511                            &mut nodes_builder,
3512                            &mut rels_builder,
3513                            &query_ctx,
3514                        );
3515                        true
3516                    } else {
3517                        false
3518                    }
3519                } else {
3520                    false
3521                };
3522
3523                // Append VLP nodes (skip first if extending — it's the junction point)
3524                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3525                for vid in &node_path[start_idx..] {
3526                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3527                }
3528                nodes_builder.append(true);
3529
3530                for (i, eid) in edge_path.iter().enumerate() {
3531                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3532                        .unwrap_or_else(|| "UNKNOWN".to_string());
3533                    append_edge_to_struct(
3534                        rels_builder.values(),
3535                        *eid,
3536                        &type_name,
3537                        node_path[i].as_u64(),
3538                        node_path[i + 1].as_u64(),
3539                        &query_ctx,
3540                    );
3541                }
3542                rels_builder.append(true);
3543                path_validity.push(true);
3544            }
3545
3546            // Finish builders and get ListArrays
3547            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3548            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3549
3550            // Build the path struct fields
3551            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3552            let rels_field = Arc::new(Field::new(
3553                "relationships",
3554                rels_array.data_type().clone(),
3555                true,
3556            ));
3557
3558            // Create the path struct array
3559            let path_struct = arrow_array::StructArray::try_new(
3560                vec![nodes_field, rels_field].into(),
3561                vec![nodes_array, rels_array],
3562                Some(arrow::buffer::NullBuffer::from(path_validity)),
3563            )
3564            .map_err(arrow_err)?;
3565
3566            if let Some(idx) = existing_path_col_idx {
3567                columns[idx] = Arc::new(path_struct);
3568            } else {
3569                columns.push(Arc::new(path_struct));
3570            }
3571        }
3572
3573        self.metrics.record_output(num_rows);
3574
3575        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3576    }
3577}
3578
3579impl RecordBatchStream for GraphVariableLengthTraverseStream {
3580    fn schema(&self) -> SchemaRef {
3581        self.schema.clone()
3582    }
3583}
3584
3585/// Hydrate target vertex properties into a VLP batch.
3586///
3587/// The base batch already has null placeholder columns for target properties.
3588/// This function replaces them with actual property values fetched from storage.
3589async fn hydrate_vlp_target_properties(
3590    base_batch: RecordBatch,
3591    schema: SchemaRef,
3592    target_variable: String,
3593    target_properties: Vec<String>,
3594    target_label_name: Option<String>,
3595    graph_ctx: Arc<GraphExecutionContext>,
3596) -> DFResult<RecordBatch> {
3597    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3598        return Ok(base_batch);
3599    }
3600
3601    // Find the target VID column by exact name.
3602    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3603    //
3604    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3605    // clauses referencing the same variable), there may be duplicate column names. We need
3606    // the LAST occurrence of target._vid, which is the one added by the VLP.
3607    let target_vid_col_name = format!("{}._vid", target_variable);
3608    let vid_col_idx = schema
3609        .fields()
3610        .iter()
3611        .enumerate()
3612        .rev()
3613        .find(|(_, f)| f.name() == &target_vid_col_name)
3614        .map(|(i, _)| i);
3615
3616    let Some(vid_col_idx) = vid_col_idx else {
3617        return Ok(base_batch);
3618    };
3619
3620    let vid_col = base_batch.column(vid_col_idx);
3621    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3622    let target_vid_array: &UInt64Array = &target_vid_cow;
3623
3624    let target_vids: Vec<Vid> = target_vid_array
3625        .iter()
3626        // Preserve null rows by mapping them to a sentinel VID that never resolves
3627        // to stored properties. The output property columns remain NULL for these rows.
3628        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3629        .collect();
3630
3631    // Fetch properties from storage
3632    let mut property_columns: Vec<ArrayRef> = Vec::new();
3633
3634    if let Some(ref label_name) = target_label_name {
3635        let property_manager = graph_ctx.property_manager();
3636        let query_ctx = graph_ctx.query_context();
3637
3638        let props_map = property_manager
3639            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3640            .await
3641            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3642
3643        let uni_schema = graph_ctx.storage().schema_manager().schema();
3644        let label_props = uni_schema.properties.get(label_name.as_str());
3645
3646        for prop_name in &target_properties {
3647            let data_type = resolve_property_type(prop_name, label_props);
3648            let column =
3649                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3650            property_columns.push(column);
3651        }
3652    } else {
3653        // No label name — use label-agnostic property lookup.
3654        // This scans all label datasets, slower but correct for label-less traversals.
3655        let non_internal_props: Vec<&str> = target_properties
3656            .iter()
3657            .filter(|p| *p != "_all_props")
3658            .map(|s| s.as_str())
3659            .collect();
3660        let property_manager = graph_ctx.property_manager();
3661        let query_ctx = graph_ctx.query_context();
3662
3663        let props_map = if !non_internal_props.is_empty() {
3664            property_manager
3665                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3666                .await
3667                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3668        } else {
3669            std::collections::HashMap::new()
3670        };
3671
3672        for prop_name in &target_properties {
3673            if prop_name == "_all_props" {
3674                // Build CypherValue blob from all vertex properties (L0 + storage)
3675                use crate::query::df_graph::scan::encode_cypher_value;
3676                use arrow_array::builder::LargeBinaryBuilder;
3677
3678                let mut builder = LargeBinaryBuilder::new();
3679                let l0_ctx = graph_ctx.l0_context();
3680                for vid in &target_vids {
3681                    let mut merged_props = serde_json::Map::new();
3682                    // Collect from storage-hydrated props
3683                    if let Some(vid_props) = props_map.get(vid) {
3684                        for (k, v) in vid_props.iter() {
3685                            let json_val: serde_json::Value = v.clone().into();
3686                            merged_props.insert(k.to_string(), json_val);
3687                        }
3688                    }
3689                    // Overlay L0 properties
3690                    for l0 in l0_ctx.iter_l0_buffers() {
3691                        let guard = l0.read();
3692                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3693                            for (k, v) in l0_props.iter() {
3694                                let json_val: serde_json::Value = v.clone().into();
3695                                merged_props.insert(k.to_string(), json_val);
3696                            }
3697                        }
3698                    }
3699                    if merged_props.is_empty() {
3700                        builder.append_null();
3701                    } else {
3702                        let json = serde_json::Value::Object(merged_props);
3703                        match encode_cypher_value(&json) {
3704                            Ok(bytes) => builder.append_value(bytes),
3705                            Err(_) => builder.append_null(),
3706                        }
3707                    }
3708                }
3709                property_columns.push(Arc::new(builder.finish()));
3710            } else {
3711                let column = build_property_column_static(
3712                    &target_vids,
3713                    &props_map,
3714                    prop_name,
3715                    &arrow::datatypes::DataType::LargeBinary,
3716                )?;
3717                property_columns.push(column);
3718            }
3719        }
3720    }
3721
3722    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3723    // Find each property column by name — works regardless of column ordering
3724    // (schema-aware puts props before _hop_count; schemaless puts them after).
3725    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3726    // not pre-existing input columns with the same name (duplicate variable binding).
3727    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3728    let mut prop_idx = 0;
3729    for (col_idx, field) in schema.fields().iter().enumerate() {
3730        let is_target_prop = col_idx > vid_col_idx
3731            && target_properties
3732                .iter()
3733                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3734        if is_target_prop && prop_idx < property_columns.len() {
3735            new_columns.push(property_columns[prop_idx].clone());
3736            prop_idx += 1;
3737        } else {
3738            new_columns.push(base_batch.column(col_idx).clone());
3739        }
3740    }
3741
3742    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3743}
3744
3745// ============================================================================
3746// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3747// ============================================================================
3748
3749/// Execution plan for variable-length path traversal on schemaless edge types.
3750///
3751/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3752/// that don't have schema-defined IDs. It queries the main edges table by type name.
3753/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3754pub struct GraphVariableLengthTraverseMainExec {
3755    /// Input execution plan.
3756    input: Arc<dyn ExecutionPlan>,
3757
3758    /// Column name containing source VIDs.
3759    source_column: String,
3760
3761    /// Edge type names (not IDs, since schemaless types may not have IDs).
3762    type_names: Vec<String>,
3763
3764    /// Traversal direction.
3765    direction: Direction,
3766
3767    /// Minimum number of hops.
3768    min_hops: usize,
3769
3770    /// Maximum number of hops.
3771    max_hops: usize,
3772
3773    /// Variable name for target vertex columns.
3774    target_variable: String,
3775
3776    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3777    step_variable: Option<String>,
3778
3779    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3780    path_variable: Option<String>,
3781
3782    /// Target vertex properties to materialize.
3783    target_properties: Vec<String>,
3784
3785    /// Whether this is an optional match (LEFT JOIN semantics).
3786    is_optional: bool,
3787
3788    /// Column name of an already-bound target VID (for patterns where target is in scope).
3789    bound_target_column: Option<String>,
3790
3791    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3792    edge_lance_filter: Option<String>,
3793
3794    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
3795    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
3796    edge_property_conditions: Vec<(String, UniValue)>,
3797
3798    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
3799    used_edge_columns: Vec<String>,
3800
3801    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
3802    path_mode: super::nfa::PathMode,
3803
3804    /// Output mode determining BFS strategy.
3805    output_mode: super::nfa::VlpOutputMode,
3806
3807    /// Graph execution context.
3808    graph_ctx: Arc<GraphExecutionContext>,
3809
3810    /// Output schema.
3811    schema: SchemaRef,
3812
3813    /// Cached plan properties.
3814    properties: Arc<PlanProperties>,
3815
3816    /// Execution metrics.
3817    metrics: ExecutionPlanMetricsSet,
3818}
3819
3820impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3821    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3822        f.debug_struct("GraphVariableLengthTraverseMainExec")
3823            .field("source_column", &self.source_column)
3824            .field("type_names", &self.type_names)
3825            .field("direction", &self.direction)
3826            .field("min_hops", &self.min_hops)
3827            .field("max_hops", &self.max_hops)
3828            .field("target_variable", &self.target_variable)
3829            .finish()
3830    }
3831}
3832
3833impl GraphVariableLengthTraverseMainExec {
3834    /// Create a new variable-length traversal plan for schemaless edges.
3835    #[expect(clippy::too_many_arguments)]
3836    pub fn new(
3837        input: Arc<dyn ExecutionPlan>,
3838        source_column: impl Into<String>,
3839        type_names: Vec<String>,
3840        direction: Direction,
3841        min_hops: usize,
3842        max_hops: usize,
3843        target_variable: impl Into<String>,
3844        step_variable: Option<String>,
3845        path_variable: Option<String>,
3846        target_properties: Vec<String>,
3847        graph_ctx: Arc<GraphExecutionContext>,
3848        is_optional: bool,
3849        bound_target_column: Option<String>,
3850        edge_lance_filter: Option<String>,
3851        edge_property_conditions: Vec<(String, UniValue)>,
3852        used_edge_columns: Vec<String>,
3853        path_mode: super::nfa::PathMode,
3854        output_mode: super::nfa::VlpOutputMode,
3855    ) -> Self {
3856        let source_column = source_column.into();
3857        let target_variable = target_variable.into();
3858
3859        // Build output schema
3860        let schema = Self::build_schema(
3861            input.schema(),
3862            &target_variable,
3863            step_variable.as_deref(),
3864            path_variable.as_deref(),
3865            &target_properties,
3866        );
3867        let properties = compute_plan_properties(schema.clone());
3868
3869        Self {
3870            input,
3871            source_column,
3872            type_names,
3873            direction,
3874            min_hops,
3875            max_hops,
3876            target_variable,
3877            step_variable,
3878            path_variable,
3879            target_properties,
3880            is_optional,
3881            bound_target_column,
3882            edge_lance_filter,
3883            edge_property_conditions,
3884            used_edge_columns,
3885            path_mode,
3886            output_mode,
3887            graph_ctx,
3888            schema,
3889            properties,
3890            metrics: ExecutionPlanMetricsSet::new(),
3891        }
3892    }
3893
3894    /// Build output schema.
3895    fn build_schema(
3896        input_schema: SchemaRef,
3897        target_variable: &str,
3898        step_variable: Option<&str>,
3899        path_variable: Option<&str>,
3900        target_properties: &[String],
3901    ) -> SchemaRef {
3902        let mut fields: Vec<Field> = input_schema
3903            .fields()
3904            .iter()
3905            .map(|f| f.as_ref().clone())
3906            .collect();
3907
3908        // Add target VID column (only if not already in input)
3909        let target_vid_name = format!("{}._vid", target_variable);
3910        if input_schema.column_with_name(&target_vid_name).is_none() {
3911            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3912        }
3913
3914        // Add target ._labels column (only if not already in input)
3915        let target_labels_name = format!("{}._labels", target_variable);
3916        if input_schema.column_with_name(&target_labels_name).is_none() {
3917            fields.push(Field::new(target_labels_name, labels_data_type(), true));
3918        }
3919
3920        // Add hop count
3921        fields.push(Field::new("_hop_count", DataType::UInt64, false));
3922
3923        // Add step variable column (list of edge structs) if bound
3924        // This is the relationship variable like `r` in `[r*1..3]`
3925        if let Some(step_var) = step_variable {
3926            fields.push(build_edge_list_field(step_var));
3927        }
3928
3929        // Add path struct if bound (only if not already in input from prior BindFixedPath)
3930        if let Some(path_var) = path_variable
3931            && input_schema.column_with_name(path_var).is_none()
3932        {
3933            fields.push(build_path_struct_field(path_var));
3934        }
3935
3936        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
3937        // Skip properties that are already in the input schema
3938        for prop in target_properties {
3939            let prop_name = format!("{}.{}", target_variable, prop);
3940            if input_schema.column_with_name(&prop_name).is_none() {
3941                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3942            }
3943        }
3944
3945        Arc::new(Schema::new(fields))
3946    }
3947}
3948
3949impl DisplayAs for GraphVariableLengthTraverseMainExec {
3950    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3951        write!(
3952            f,
3953            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3954            self.source_column, self.type_names, self.min_hops, self.max_hops
3955        )
3956    }
3957}
3958
3959impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3960    fn name(&self) -> &str {
3961        "GraphVariableLengthTraverseMainExec"
3962    }
3963
3964    fn as_any(&self) -> &dyn Any {
3965        self
3966    }
3967
3968    fn schema(&self) -> SchemaRef {
3969        self.schema.clone()
3970    }
3971
3972    fn properties(&self) -> &Arc<PlanProperties> {
3973        &self.properties
3974    }
3975
3976    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3977        vec![&self.input]
3978    }
3979
3980    fn with_new_children(
3981        self: Arc<Self>,
3982        children: Vec<Arc<dyn ExecutionPlan>>,
3983    ) -> DFResult<Arc<dyn ExecutionPlan>> {
3984        if children.len() != 1 {
3985            return Err(datafusion::error::DataFusionError::Plan(
3986                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3987            ));
3988        }
3989
3990        Ok(Arc::new(Self::new(
3991            children[0].clone(),
3992            self.source_column.clone(),
3993            self.type_names.clone(),
3994            self.direction,
3995            self.min_hops,
3996            self.max_hops,
3997            self.target_variable.clone(),
3998            self.step_variable.clone(),
3999            self.path_variable.clone(),
4000            self.target_properties.clone(),
4001            self.graph_ctx.clone(),
4002            self.is_optional,
4003            self.bound_target_column.clone(),
4004            self.edge_lance_filter.clone(),
4005            self.edge_property_conditions.clone(),
4006            self.used_edge_columns.clone(),
4007            self.path_mode.clone(),
4008            self.output_mode.clone(),
4009        )))
4010    }
4011
4012    fn execute(
4013        &self,
4014        partition: usize,
4015        context: Arc<TaskContext>,
4016    ) -> DFResult<SendableRecordBatchStream> {
4017        let input_stream = self.input.execute(partition, context)?;
4018        let metrics = BaselineMetrics::new(&self.metrics, partition);
4019
4020        // Build adjacency map from main edges table (async)
4021        let graph_ctx = self.graph_ctx.clone();
4022        let type_names = self.type_names.clone();
4023        let direction = self.direction;
4024        let load_fut =
4025            async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
4026
4027        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4028            input: input_stream,
4029            source_column: self.source_column.clone(),
4030            type_names: self.type_names.clone(),
4031            direction: self.direction,
4032            min_hops: self.min_hops,
4033            max_hops: self.max_hops,
4034            target_variable: self.target_variable.clone(),
4035            step_variable: self.step_variable.clone(),
4036            path_variable: self.path_variable.clone(),
4037            target_properties: self.target_properties.clone(),
4038            graph_ctx: self.graph_ctx.clone(),
4039            is_optional: self.is_optional,
4040            bound_target_column: self.bound_target_column.clone(),
4041            edge_lance_filter: self.edge_lance_filter.clone(),
4042            edge_property_conditions: self.edge_property_conditions.clone(),
4043            used_edge_columns: self.used_edge_columns.clone(),
4044            path_mode: self.path_mode.clone(),
4045            output_mode: self.output_mode.clone(),
4046            schema: self.schema.clone(),
4047            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4048            metrics,
4049        }))
4050    }
4051
4052    fn metrics(&self) -> Option<MetricsSet> {
4053        Some(self.metrics.clone_inner())
4054    }
4055}
4056
4057/// State machine for VLP schemaless stream.
4058enum VarLengthMainStreamState {
4059    /// Loading adjacency map from main edges table.
4060    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4061    /// Processing input batches with loaded adjacency.
4062    Processing(EdgeAdjacencyMap),
4063    /// Materializing properties for a batch.
4064    Materializing {
4065        adjacency: EdgeAdjacencyMap,
4066        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4067    },
4068    /// Stream is done.
4069    Done,
4070}
4071
4072/// Stream for variable-length traversal on schemaless edges.
4073#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4074struct GraphVariableLengthTraverseMainStream {
4075    input: SendableRecordBatchStream,
4076    source_column: String,
4077    type_names: Vec<String>,
4078    direction: Direction,
4079    min_hops: usize,
4080    max_hops: usize,
4081    target_variable: String,
4082    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4083    step_variable: Option<String>,
4084    path_variable: Option<String>,
4085    target_properties: Vec<String>,
4086    graph_ctx: Arc<GraphExecutionContext>,
4087    is_optional: bool,
4088    bound_target_column: Option<String>,
4089    edge_lance_filter: Option<String>,
4090    /// Edge property conditions to check during BFS.
4091    edge_property_conditions: Vec<(String, UniValue)>,
4092    used_edge_columns: Vec<String>,
4093    path_mode: super::nfa::PathMode,
4094    output_mode: super::nfa::VlpOutputMode,
4095    schema: SchemaRef,
4096    state: VarLengthMainStreamState,
4097    metrics: BaselineMetrics,
4098}
4099
4100/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4101type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4102
4103impl GraphVariableLengthTraverseMainStream {
4104    /// Perform BFS from a source vertex using the adjacency map.
4105    ///
4106    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4107    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4108    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4109    fn bfs(
4110        &self,
4111        source: Vid,
4112        adjacency: &EdgeAdjacencyMap,
4113        used_eids: &FxHashSet<u64>,
4114    ) -> Vec<MainBfsResult> {
4115        let mut results = Vec::new();
4116        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4117
4118        queue.push_back((source, 0, vec![source], vec![]));
4119
4120        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4121            // Emit result if within hop range (including zero-length patterns)
4122            if depth >= self.min_hops && depth <= self.max_hops {
4123                results.push((current, depth, node_path.clone(), edge_path.clone()));
4124            }
4125
4126            // Stop if at max depth
4127            if depth >= self.max_hops {
4128                continue;
4129            }
4130
4131            // Get neighbors from adjacency map
4132            if let Some(neighbors) = adjacency.get(&current) {
4133                let is_undirected = matches!(self.direction, Direction::Both);
4134                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4135
4136                for (neighbor, eid, _edge_type, props) in neighbors {
4137                    // Deduplicate edges for undirected patterns
4138                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4139                        continue;
4140                    }
4141
4142                    // Enforce relationship uniqueness per-path (Cypher semantics).
4143                    if edge_path.contains(eid) {
4144                        continue;
4145                    }
4146
4147                    // Enforce cross-pattern relationship uniqueness: skip edges
4148                    // already bound by earlier pattern elements in the same MATCH.
4149                    if used_eids.contains(&eid.as_u64()) {
4150                        continue;
4151                    }
4152
4153                    // Check edge property conditions (e.g., {year: 1988}).
4154                    if !self.edge_property_conditions.is_empty() {
4155                        let passes =
4156                            self.edge_property_conditions
4157                                .iter()
4158                                .all(|(name, expected)| {
4159                                    props.get(name).is_some_and(|actual| actual == expected)
4160                                });
4161                        if !passes {
4162                            continue;
4163                        }
4164                    }
4165
4166                    let mut new_node_path = node_path.clone();
4167                    new_node_path.push(*neighbor);
4168                    let mut new_edge_path = edge_path.clone();
4169                    new_edge_path.push(*eid);
4170                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4171                }
4172            }
4173        }
4174
4175        results
4176    }
4177
4178    /// Process a batch using the adjacency map.
4179    fn process_batch(
4180        &self,
4181        batch: RecordBatch,
4182        adjacency: &EdgeAdjacencyMap,
4183    ) -> DFResult<RecordBatch> {
4184        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4185            datafusion::error::DataFusionError::Execution(format!(
4186                "Source column '{}' not found in input batch",
4187                self.source_column
4188            ))
4189        })?;
4190
4191        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4192        let source_vids: &UInt64Array = &source_vid_cow;
4193
4194        // Read bound target VIDs if column exists
4195        let bound_target_cow = self
4196            .bound_target_column
4197            .as_ref()
4198            .and_then(|col| batch.column_by_name(col))
4199            .map(|c| column_as_vid_array(c.as_ref()))
4200            .transpose()?;
4201        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4202
4203        // Extract used edge columns for cross-pattern relationship uniqueness
4204        let used_edge_arrays: Vec<&UInt64Array> = self
4205            .used_edge_columns
4206            .iter()
4207            .filter_map(|col| {
4208                batch
4209                    .column_by_name(col)?
4210                    .as_any()
4211                    .downcast_ref::<UInt64Array>()
4212            })
4213            .collect();
4214
4215        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4216        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4217
4218        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4219            let mut emitted_for_row = false;
4220
4221            if let Some(source_u64) = source_opt {
4222                let source = Vid::from(source_u64);
4223
4224                // Collect used edge IDs from previous hops for this row
4225                let used_eids: FxHashSet<u64> = used_edge_arrays
4226                    .iter()
4227                    .filter_map(|arr| {
4228                        if arr.is_null(row_idx) {
4229                            None
4230                        } else {
4231                            Some(arr.value(row_idx))
4232                        }
4233                    })
4234                    .collect();
4235
4236                let bfs_results = self.bfs(source, adjacency, &used_eids);
4237
4238                for (target, hops, node_path, edge_path) in bfs_results {
4239                    // Filter by bound target VID if set (for patterns where target is in scope).
4240                    // NULL bound targets do not match anything.
4241                    if let Some(targets) = expected_targets {
4242                        if targets.is_null(row_idx) {
4243                            continue;
4244                        }
4245                        let expected_vid = targets.value(row_idx);
4246                        if target.as_u64() != expected_vid {
4247                            continue;
4248                        }
4249                    }
4250
4251                    expansions.push((row_idx, target, hops, node_path, edge_path));
4252                    emitted_for_row = true;
4253                }
4254            }
4255
4256            if self.is_optional && !emitted_for_row {
4257                // Preserve source row with NULL optional bindings.
4258                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4259            }
4260        }
4261
4262        if expansions.is_empty() {
4263            if self.is_optional {
4264                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4265                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4266            }
4267            return Ok(RecordBatch::new_empty(self.schema.clone()));
4268        }
4269
4270        let num_rows = expansions.len();
4271        self.metrics.record_output(num_rows);
4272
4273        // Build output columns
4274        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4275
4276        // Expand input columns
4277        for col_idx in 0..batch.num_columns() {
4278            let array = batch.column(col_idx);
4279            let indices: Vec<u64> = expansions
4280                .iter()
4281                .map(|(idx, _, _, _, _)| *idx as u64)
4282                .collect();
4283            let take_indices = UInt64Array::from(indices);
4284            let expanded = arrow::compute::take(array, &take_indices, None)?;
4285            columns.push(expanded);
4286        }
4287
4288        // Add target VID column (only if not already in input)
4289        let target_vid_name = format!("{}._vid", self.target_variable);
4290        if batch.schema().column_with_name(&target_vid_name).is_none() {
4291            let target_vids: Vec<Option<u64>> = expansions
4292                .iter()
4293                .map(|(_, vid, _, node_path, edge_path)| {
4294                    if node_path.is_empty() && edge_path.is_empty() {
4295                        None
4296                    } else {
4297                        Some(vid.as_u64())
4298                    }
4299                })
4300                .collect();
4301            columns.push(Arc::new(UInt64Array::from(target_vids)));
4302        }
4303
4304        // Add target ._labels column (only if not already in input)
4305        let target_labels_name = format!("{}._labels", self.target_variable);
4306        if batch
4307            .schema()
4308            .column_with_name(&target_labels_name)
4309            .is_none()
4310        {
4311            use arrow_array::builder::{ListBuilder, StringBuilder};
4312            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4313            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4314                if node_path.is_empty() && edge_path.is_empty() {
4315                    labels_builder.append(false);
4316                    continue;
4317                }
4318                let mut row_labels: Vec<String> = Vec::new();
4319                let labels =
4320                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4321                for lbl in &labels {
4322                    if !row_labels.contains(lbl) {
4323                        row_labels.push(lbl.clone());
4324                    }
4325                }
4326                let values = labels_builder.values();
4327                for lbl in &row_labels {
4328                    values.append_value(lbl);
4329                }
4330                labels_builder.append(true);
4331            }
4332            columns.push(Arc::new(labels_builder.finish()));
4333        }
4334
4335        // Add hop count column
4336        let hop_counts: Vec<u64> = expansions
4337            .iter()
4338            .map(|(_, _, hops, _, _)| *hops as u64)
4339            .collect();
4340        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4341
4342        // Add step variable column if bound (list of edge structs).
4343        if self.step_variable.is_some() {
4344            let mut edges_builder = new_edge_list_builder();
4345            let query_ctx = self.graph_ctx.query_context();
4346            let type_names_str = self.type_names.join("|");
4347
4348            for (_, _, _, node_path, edge_path) in expansions.iter() {
4349                if node_path.is_empty() && edge_path.is_empty() {
4350                    edges_builder.append_null();
4351                } else if edge_path.is_empty() {
4352                    // Zero-hop match: empty list.
4353                    edges_builder.append(true);
4354                } else {
4355                    for (i, eid) in edge_path.iter().enumerate() {
4356                        append_edge_to_struct(
4357                            edges_builder.values(),
4358                            *eid,
4359                            &type_names_str,
4360                            node_path[i].as_u64(),
4361                            node_path[i + 1].as_u64(),
4362                            &query_ctx,
4363                        );
4364                    }
4365                    edges_builder.append(true);
4366                }
4367            }
4368
4369            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4370        }
4371
4372        // Add path variable column if bound.
4373        // If a path column already exists in input (from a prior BindFixedPath), extend it
4374        // rather than building from scratch.
4375        if let Some(path_var_name) = &self.path_variable {
4376            let existing_path_col_idx = batch
4377                .schema()
4378                .column_with_name(path_var_name)
4379                .map(|(idx, _)| idx);
4380            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4381            let existing_path = existing_path_arc
4382                .as_ref()
4383                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4384
4385            let mut nodes_builder = new_node_list_builder();
4386            let mut rels_builder = new_edge_list_builder();
4387            let query_ctx = self.graph_ctx.query_context();
4388            let type_names_str = self.type_names.join("|");
4389            let mut path_validity = Vec::with_capacity(expansions.len());
4390
4391            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4392                if node_path.is_empty() && edge_path.is_empty() {
4393                    nodes_builder.append(false);
4394                    rels_builder.append(false);
4395                    path_validity.push(false);
4396                    continue;
4397                }
4398
4399                // Prepend existing path prefix if extending
4400                let skip_first_vlp_node = if let Some(existing) = existing_path {
4401                    if !existing.is_null(row_out_idx) {
4402                        prepend_existing_path(
4403                            existing,
4404                            row_out_idx,
4405                            &mut nodes_builder,
4406                            &mut rels_builder,
4407                            &query_ctx,
4408                        );
4409                        true
4410                    } else {
4411                        false
4412                    }
4413                } else {
4414                    false
4415                };
4416
4417                // Append VLP nodes (skip first if extending — it's the junction point)
4418                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4419                for vid in &node_path[start_idx..] {
4420                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4421                }
4422                nodes_builder.append(true);
4423
4424                for (i, eid) in edge_path.iter().enumerate() {
4425                    append_edge_to_struct(
4426                        rels_builder.values(),
4427                        *eid,
4428                        &type_names_str,
4429                        node_path[i].as_u64(),
4430                        node_path[i + 1].as_u64(),
4431                        &query_ctx,
4432                    );
4433                }
4434                rels_builder.append(true);
4435                path_validity.push(true);
4436            }
4437
4438            // Finish the builders to get the arrays
4439            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4440            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4441
4442            // Build the path struct with nodes and relationships fields
4443            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4444            let rels_field = Arc::new(Field::new(
4445                "relationships",
4446                rels_array.data_type().clone(),
4447                true,
4448            ));
4449
4450            // Create the path struct array
4451            let path_struct = arrow_array::StructArray::try_new(
4452                vec![nodes_field, rels_field].into(),
4453                vec![nodes_array, rels_array],
4454                Some(arrow::buffer::NullBuffer::from(path_validity)),
4455            )
4456            .map_err(arrow_err)?;
4457
4458            if let Some(idx) = existing_path_col_idx {
4459                columns[idx] = Arc::new(path_struct);
4460            } else {
4461                columns.push(Arc::new(path_struct));
4462            }
4463        }
4464
4465        // Add target property columns as NULL for now (skip if already in input).
4466        // Property hydration happens via PropertyManager in the query execution pipeline.
4467        for prop_name in &self.target_properties {
4468            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4469            if batch.schema().column_with_name(&full_prop_name).is_none() {
4470                columns.push(arrow_array::new_null_array(
4471                    &DataType::LargeBinary,
4472                    num_rows,
4473                ));
4474            }
4475        }
4476
4477        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4478    }
4479}
4480
4481impl Stream for GraphVariableLengthTraverseMainStream {
4482    type Item = DFResult<RecordBatch>;
4483
4484    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4485        let metrics = self.metrics.clone();
4486        let _timer = metrics.elapsed_compute().timer();
4487        loop {
4488            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4489
4490            match state {
4491                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4492                    Poll::Ready(Ok(adjacency)) => {
4493                        self.state = VarLengthMainStreamState::Processing(adjacency);
4494                        // Continue loop to start processing
4495                    }
4496                    Poll::Ready(Err(e)) => {
4497                        self.state = VarLengthMainStreamState::Done;
4498                        return Poll::Ready(Some(Err(e)));
4499                    }
4500                    Poll::Pending => {
4501                        self.state = VarLengthMainStreamState::Loading(fut);
4502                        return Poll::Pending;
4503                    }
4504                },
4505                VarLengthMainStreamState::Processing(adjacency) => {
4506                    match self.input.poll_next_unpin(cx) {
4507                        Poll::Ready(Some(Ok(batch))) => {
4508                            let base_batch = match self.process_batch(batch, &adjacency) {
4509                                Ok(b) => b,
4510                                Err(e) => {
4511                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4512                                    return Poll::Ready(Some(Err(e)));
4513                                }
4514                            };
4515
4516                            // If no properties need async hydration, return directly
4517                            if self.target_properties.is_empty() {
4518                                self.state = VarLengthMainStreamState::Processing(adjacency);
4519                                return Poll::Ready(Some(Ok(base_batch)));
4520                            }
4521
4522                            // Create async hydration future
4523                            let schema = self.schema.clone();
4524                            let target_variable = self.target_variable.clone();
4525                            let target_properties = self.target_properties.clone();
4526                            let graph_ctx = self.graph_ctx.clone();
4527
4528                            let fut = hydrate_vlp_target_properties(
4529                                base_batch,
4530                                schema,
4531                                target_variable,
4532                                target_properties,
4533                                None, // schemaless — no label name
4534                                graph_ctx,
4535                            );
4536
4537                            self.state = VarLengthMainStreamState::Materializing {
4538                                adjacency,
4539                                fut: Box::pin(fut),
4540                            };
4541                            // Continue loop to poll the future
4542                        }
4543                        Poll::Ready(Some(Err(e))) => {
4544                            self.state = VarLengthMainStreamState::Done;
4545                            return Poll::Ready(Some(Err(e)));
4546                        }
4547                        Poll::Ready(None) => {
4548                            self.state = VarLengthMainStreamState::Done;
4549                            return Poll::Ready(None);
4550                        }
4551                        Poll::Pending => {
4552                            self.state = VarLengthMainStreamState::Processing(adjacency);
4553                            return Poll::Pending;
4554                        }
4555                    }
4556                }
4557                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4558                    match fut.as_mut().poll(cx) {
4559                        Poll::Ready(Ok(batch)) => {
4560                            self.state = VarLengthMainStreamState::Processing(adjacency);
4561                            return Poll::Ready(Some(Ok(batch)));
4562                        }
4563                        Poll::Ready(Err(e)) => {
4564                            self.state = VarLengthMainStreamState::Done;
4565                            return Poll::Ready(Some(Err(e)));
4566                        }
4567                        Poll::Pending => {
4568                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4569                            return Poll::Pending;
4570                        }
4571                    }
4572                }
4573                VarLengthMainStreamState::Done => {
4574                    return Poll::Ready(None);
4575                }
4576            }
4577        }
4578    }
4579}
4580
4581impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4582    fn schema(&self) -> SchemaRef {
4583        self.schema.clone()
4584    }
4585}
4586
4587#[cfg(test)]
4588mod tests {
4589    use super::*;
4590
4591    #[test]
4592    fn test_traverse_schema_without_edge() {
4593        let input_schema = Arc::new(Schema::new(vec![Field::new(
4594            "a._vid",
4595            DataType::UInt64,
4596            false,
4597        )]));
4598
4599        let output_schema =
4600            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4601
4602        // Schema: input + target VID + target _labels + internal edge ID
4603        assert_eq!(output_schema.fields().len(), 4);
4604        assert_eq!(output_schema.field(0).name(), "a._vid");
4605        assert_eq!(output_schema.field(1).name(), "m._vid");
4606        assert_eq!(output_schema.field(2).name(), "m._labels");
4607        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4608    }
4609
4610    #[test]
4611    fn test_traverse_schema_with_edge() {
4612        let input_schema = Arc::new(Schema::new(vec![Field::new(
4613            "a._vid",
4614            DataType::UInt64,
4615            false,
4616        )]));
4617
4618        let output_schema = GraphTraverseExec::build_schema(
4619            input_schema,
4620            "m",
4621            Some("r"),
4622            &[],
4623            &[],
4624            None,
4625            None,
4626            false,
4627        );
4628
4629        // Schema: input + target VID + target _labels + edge EID + edge _type
4630        assert_eq!(output_schema.fields().len(), 5);
4631        assert_eq!(output_schema.field(0).name(), "a._vid");
4632        assert_eq!(output_schema.field(1).name(), "m._vid");
4633        assert_eq!(output_schema.field(2).name(), "m._labels");
4634        assert_eq!(output_schema.field(3).name(), "r._eid");
4635        assert_eq!(output_schema.field(4).name(), "r._type");
4636    }
4637
4638    #[test]
4639    fn test_traverse_schema_with_target_properties() {
4640        let input_schema = Arc::new(Schema::new(vec![Field::new(
4641            "a._vid",
4642            DataType::UInt64,
4643            false,
4644        )]));
4645
4646        let target_props = vec!["name".to_string(), "age".to_string()];
4647        let output_schema = GraphTraverseExec::build_schema(
4648            input_schema,
4649            "m",
4650            Some("r"),
4651            &[],
4652            &target_props,
4653            None,
4654            None,
4655            false,
4656        );
4657
4658        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4659        assert_eq!(output_schema.fields().len(), 7);
4660        assert_eq!(output_schema.field(0).name(), "a._vid");
4661        assert_eq!(output_schema.field(1).name(), "m._vid");
4662        assert_eq!(output_schema.field(2).name(), "m._labels");
4663        assert_eq!(output_schema.field(3).name(), "m.name");
4664        assert_eq!(output_schema.field(4).name(), "m.age");
4665        assert_eq!(output_schema.field(5).name(), "r._eid");
4666        assert_eq!(output_schema.field(6).name(), "r._type");
4667    }
4668
4669    #[test]
4670    fn test_variable_length_schema() {
4671        let input_schema = Arc::new(Schema::new(vec![Field::new(
4672            "a._vid",
4673            DataType::UInt64,
4674            false,
4675        )]));
4676
4677        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4678            input_schema,
4679            "b",
4680            None,
4681            Some("p"),
4682            &[],
4683            None,
4684        );
4685
4686        assert_eq!(output_schema.fields().len(), 5);
4687        assert_eq!(output_schema.field(0).name(), "a._vid");
4688        assert_eq!(output_schema.field(1).name(), "b._vid");
4689        assert_eq!(output_schema.field(2).name(), "b._labels");
4690        assert_eq!(output_schema.field(3).name(), "_hop_count");
4691        assert_eq!(output_schema.field(4).name(), "p");
4692    }
4693
4694    #[test]
4695    fn test_traverse_main_schema_without_edge() {
4696        let input_schema = Arc::new(Schema::new(vec![Field::new(
4697            "a._vid",
4698            DataType::UInt64,
4699            false,
4700        )]));
4701
4702        let output_schema =
4703            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4704
4705        // a._vid, m._vid, m._labels, __eid_to_m
4706        assert_eq!(output_schema.fields().len(), 4);
4707        assert_eq!(output_schema.field(0).name(), "a._vid");
4708        assert_eq!(output_schema.field(1).name(), "m._vid");
4709        assert_eq!(output_schema.field(2).name(), "m._labels");
4710        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4711    }
4712
4713    #[test]
4714    fn test_traverse_main_schema_with_edge() {
4715        let input_schema = Arc::new(Schema::new(vec![Field::new(
4716            "a._vid",
4717            DataType::UInt64,
4718            false,
4719        )]));
4720
4721        let output_schema = GraphTraverseMainExec::build_schema(
4722            &input_schema,
4723            "m",
4724            &Some("r".to_string()),
4725            &[],
4726            &[],
4727            false,
4728        );
4729
4730        // a._vid, m._vid, m._labels, r._eid, r._type
4731        assert_eq!(output_schema.fields().len(), 5);
4732        assert_eq!(output_schema.field(0).name(), "a._vid");
4733        assert_eq!(output_schema.field(1).name(), "m._vid");
4734        assert_eq!(output_schema.field(2).name(), "m._labels");
4735        assert_eq!(output_schema.field(3).name(), "r._eid");
4736        assert_eq!(output_schema.field(4).name(), "r._type");
4737    }
4738
4739    #[test]
4740    fn test_traverse_main_schema_with_edge_properties() {
4741        let input_schema = Arc::new(Schema::new(vec![Field::new(
4742            "a._vid",
4743            DataType::UInt64,
4744            false,
4745        )]));
4746
4747        let edge_props = vec!["weight".to_string(), "since".to_string()];
4748        let output_schema = GraphTraverseMainExec::build_schema(
4749            &input_schema,
4750            "m",
4751            &Some("r".to_string()),
4752            &edge_props,
4753            &[],
4754            false,
4755        );
4756
4757        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4758        assert_eq!(output_schema.fields().len(), 7);
4759        assert_eq!(output_schema.field(0).name(), "a._vid");
4760        assert_eq!(output_schema.field(1).name(), "m._vid");
4761        assert_eq!(output_schema.field(2).name(), "m._labels");
4762        assert_eq!(output_schema.field(3).name(), "r._eid");
4763        assert_eq!(output_schema.field(4).name(), "r._type");
4764        assert_eq!(output_schema.field(5).name(), "r.weight");
4765        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4766        assert_eq!(output_schema.field(6).name(), "r.since");
4767        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4768    }
4769
4770    #[test]
4771    fn test_traverse_main_schema_with_target_properties() {
4772        let input_schema = Arc::new(Schema::new(vec![Field::new(
4773            "a._vid",
4774            DataType::UInt64,
4775            false,
4776        )]));
4777
4778        let target_props = vec!["name".to_string(), "age".to_string()];
4779        let output_schema = GraphTraverseMainExec::build_schema(
4780            &input_schema,
4781            "m",
4782            &Some("r".to_string()),
4783            &[],
4784            &target_props,
4785            false,
4786        );
4787
4788        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
4789        assert_eq!(output_schema.fields().len(), 7);
4790        assert_eq!(output_schema.field(0).name(), "a._vid");
4791        assert_eq!(output_schema.field(1).name(), "m._vid");
4792        assert_eq!(output_schema.field(2).name(), "m._labels");
4793        assert_eq!(output_schema.field(3).name(), "r._eid");
4794        assert_eq!(output_schema.field(4).name(), "r._type");
4795        assert_eq!(output_schema.field(5).name(), "m.name");
4796        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4797        assert_eq!(output_schema.field(6).name(), "m.age");
4798        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4799    }
4800}