Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{
42    build_property_column_static, property_field, resolve_property_type,
43};
44use arrow::compute::take;
45use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
46use arrow_schema::{DataType, Field, Schema, SchemaRef};
47use datafusion::common::Result as DFResult;
48use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
49use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
50use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
51use futures::{Stream, StreamExt};
52use fxhash::FxHashSet;
53use std::any::Any;
54use std::collections::{HashMap, HashSet, VecDeque};
55use std::fmt;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use uni_common::Value as UniValue;
60use uni_common::core::id::{Eid, Vid};
61use uni_store::runtime::l0_visibility;
62use uni_store::storage::direction::Direction;
63
64/// BFS result: (target_vid, hop_count, node_path, edge_path)
65type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
66
67/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
68type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
69
70/// Prepend nodes and edges from an existing path struct column into builders.
71///
72/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
73/// Reads the nodes and relationships from the existing path at `row_idx` and appends
74/// them to the provided builders. The caller should then skip the first VLP node
75/// (which is the junction point already present in the existing path).
76fn prepend_existing_path(
77    existing_path: &arrow_array::StructArray,
78    row_idx: usize,
79    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
80    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
81    query_ctx: &uni_store::runtime::context::QueryContext,
82) {
83    // Read existing nodes
84    let nodes_list = existing_path
85        .column(0)
86        .as_any()
87        .downcast_ref::<arrow_array::ListArray>()
88        .unwrap();
89    let node_values = nodes_list.value(row_idx);
90    let node_struct = node_values
91        .as_any()
92        .downcast_ref::<arrow_array::StructArray>()
93        .unwrap();
94    let vid_col = node_struct
95        .column(0)
96        .as_any()
97        .downcast_ref::<UInt64Array>()
98        .unwrap();
99    for i in 0..vid_col.len() {
100        append_node_to_struct(
101            nodes_builder.values(),
102            Vid::from(vid_col.value(i)),
103            query_ctx,
104        );
105    }
106
107    // Read existing edges
108    let rels_list = existing_path
109        .column(1)
110        .as_any()
111        .downcast_ref::<arrow_array::ListArray>()
112        .unwrap();
113    let edge_values = rels_list.value(row_idx);
114    let edge_struct = edge_values
115        .as_any()
116        .downcast_ref::<arrow_array::StructArray>()
117        .unwrap();
118    let eid_col = edge_struct
119        .column(0)
120        .as_any()
121        .downcast_ref::<UInt64Array>()
122        .unwrap();
123    let type_col = edge_struct
124        .column(1)
125        .as_any()
126        .downcast_ref::<arrow_array::StringArray>()
127        .unwrap();
128    let src_col = edge_struct
129        .column(2)
130        .as_any()
131        .downcast_ref::<UInt64Array>()
132        .unwrap();
133    let dst_col = edge_struct
134        .column(3)
135        .as_any()
136        .downcast_ref::<UInt64Array>()
137        .unwrap();
138    for i in 0..eid_col.len() {
139        append_edge_to_struct(
140            rels_builder.values(),
141            Eid::from(eid_col.value(i)),
142            type_col.value(i),
143            src_col.value(i),
144            dst_col.value(i),
145            query_ctx,
146        );
147    }
148}
149
150/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
151/// schemaless properties. Unlike vertex properties, schemaless edge properties must
152/// preserve original JSON value types (int, float, etc.) since edge types commonly
153/// lack explicit property definitions.
154fn resolve_edge_property_type(
155    prop: &str,
156    schema_props: Option<
157        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
158    >,
159) -> DataType {
160    if prop == "overflow_json" {
161        DataType::LargeBinary
162    } else if prop == "_created_at" || prop == "_updated_at" {
163        // System-managed timestamps surfaced via `created_at(r)` /
164        // `updated_at(r)`. Stored on every edge by the L0 buffer and
165        // the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
166        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
167    } else {
168        schema_props
169            .and_then(|props| props.get(prop))
170            .map(|meta| meta.r#type.to_arrow())
171            .unwrap_or(DataType::LargeBinary)
172    }
173}
174
175use crate::query::df_graph::common::merged_edge_schema_props;
176
177/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
178type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
179
180/// Single-hop graph traversal execution plan.
181///
182/// Expands each input row by traversing edges to find neighbors.
183/// For each (source, edge, target) triple, produces one output row
184/// containing the input columns plus target vertex and edge columns.
185///
186/// # Example
187///
188/// ```ignore
189/// // Input: batch with _vid column
190/// // Traverse KNOWS edges outgoing
191/// let traverse = GraphTraverseExec::new(
192///     input_plan,
193///     "_vid",
194///     vec![knows_type_id],
195///     Direction::Outgoing,
196///     "m",           // target variable
197///     Some("r"),     // edge variable
198///     None,          // no target label filter
199///     graph_ctx,
200/// );
201///
202/// // Output: input columns + m._vid + r._eid
203/// ```
204pub struct GraphTraverseExec {
205    /// Input execution plan.
206    input: Arc<dyn ExecutionPlan>,
207
208    /// Column name containing source VIDs.
209    source_column: String,
210
211    /// Edge type IDs to traverse.
212    edge_type_ids: Vec<u32>,
213
214    /// Traversal direction.
215    direction: Direction,
216
217    /// Variable name for target vertex columns.
218    target_variable: String,
219
220    /// Variable name for edge columns (if edge is bound).
221    edge_variable: Option<String>,
222
223    /// Edge properties to materialize (for pushdown hydration).
224    edge_properties: Vec<String>,
225
226    /// Target vertex properties to materialize.
227    target_properties: Vec<String>,
228
229    /// Target label name for property type resolution.
230    target_label_name: Option<String>,
231
232    /// Optional target label filter.
233    target_label_id: Option<u16>,
234
235    /// Graph execution context.
236    graph_ctx: Arc<GraphExecutionContext>,
237
238    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
239    optional: bool,
240
241    /// Variables introduced by the OPTIONAL MATCH pattern.
242    /// Used to determine which columns should be null-extended on failure.
243    optional_pattern_vars: HashSet<String>,
244
245    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
246    /// When set, only traversals that reach this VID are included.
247    bound_target_column: Option<String>,
248
249    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
250    /// Edges matching any of these IDs are excluded from traversal results.
251    used_edge_columns: Vec<String>,
252
253    /// Output schema.
254    schema: SchemaRef,
255
256    /// Cached plan properties.
257    properties: Arc<PlanProperties>,
258
259    /// Execution metrics.
260    metrics: ExecutionPlanMetricsSet,
261}
262
263impl fmt::Debug for GraphTraverseExec {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        f.debug_struct("GraphTraverseExec")
266            .field("source_column", &self.source_column)
267            .field("edge_type_ids", &self.edge_type_ids)
268            .field("direction", &self.direction)
269            .field("target_variable", &self.target_variable)
270            .field("edge_variable", &self.edge_variable)
271            .finish()
272    }
273}
274
275impl GraphTraverseExec {
276    /// Create a new single-hop traversal plan.
277    ///
278    /// # Arguments
279    ///
280    /// * `input` - Input plan providing source vertices
281    /// * `source_column` - Column name containing source VIDs
282    /// * `edge_type_ids` - Edge types to traverse
283    /// * `direction` - Traversal direction
284    /// * `target_variable` - Variable name for target vertices
285    /// * `edge_variable` - Optional variable name for edges
286    /// * `edge_properties` - Edge properties to materialize
287    /// * `target_label_id` - Optional target label filter
288    /// * `graph_ctx` - Graph execution context
289    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
290    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
291    #[expect(clippy::too_many_arguments)]
292    pub fn new(
293        input: Arc<dyn ExecutionPlan>,
294        source_column: impl Into<String>,
295        edge_type_ids: Vec<u32>,
296        direction: Direction,
297        target_variable: impl Into<String>,
298        edge_variable: Option<String>,
299        edge_properties: Vec<String>,
300        target_properties: Vec<String>,
301        target_label_name: Option<String>,
302        target_label_id: Option<u16>,
303        graph_ctx: Arc<GraphExecutionContext>,
304        optional: bool,
305        optional_pattern_vars: HashSet<String>,
306        bound_target_column: Option<String>,
307        used_edge_columns: Vec<String>,
308    ) -> Self {
309        let source_column = source_column.into();
310        let target_variable = target_variable.into();
311
312        // Resolve target property Arrow types from the schema
313        let uni_schema = graph_ctx.storage().schema_manager().schema();
314        let label_props = target_label_name
315            .as_deref()
316            .and_then(|ln| uni_schema.properties.get(ln));
317        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
318        let edge_props = if merged_edge_props.is_empty() {
319            None
320        } else {
321            Some(&merged_edge_props)
322        };
323
324        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
325        let schema = Self::build_schema(
326            input.schema(),
327            &target_variable,
328            edge_variable.as_deref(),
329            &edge_properties,
330            &target_properties,
331            label_props,
332            edge_props,
333            optional,
334        );
335
336        let properties = compute_plan_properties(schema.clone());
337
338        Self {
339            input,
340            source_column,
341            edge_type_ids,
342            direction,
343            target_variable,
344            edge_variable,
345            edge_properties,
346            target_properties,
347            target_label_name,
348            target_label_id,
349            graph_ctx,
350            optional,
351            optional_pattern_vars,
352            bound_target_column,
353            used_edge_columns,
354            schema,
355            properties,
356            metrics: ExecutionPlanMetricsSet::new(),
357        }
358    }
359
360    /// Build output schema.
361    #[expect(
362        clippy::too_many_arguments,
363        reason = "Schema construction needs all field metadata"
364    )]
365    fn build_schema(
366        input_schema: SchemaRef,
367        target_variable: &str,
368        edge_variable: Option<&str>,
369        edge_properties: &[String],
370        target_properties: &[String],
371        label_props: Option<
372            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
373        >,
374        edge_props: Option<
375            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
376        >,
377        optional: bool,
378    ) -> SchemaRef {
379        let mut fields: Vec<Field> = input_schema
380            .fields()
381            .iter()
382            .map(|f| f.as_ref().clone())
383            .collect();
384
385        // Add target VID column (nullable when optional — unmatched rows get NULL)
386        let target_vid_name = format!("{}._vid", target_variable);
387        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
388
389        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
390        fields.push(Field::new(
391            format!("{}._labels", target_variable),
392            labels_data_type(),
393            true,
394        ));
395
396        // Add target vertex property columns
397        for prop_name in target_properties {
398            let col_name = format!("{}.{}", target_variable, prop_name);
399            let arrow_type = resolve_property_type(prop_name, label_props);
400            let uni_type = label_props
401                .and_then(|p| p.get(prop_name))
402                .map(|m| &m.r#type);
403            fields.push(property_field(&col_name, arrow_type, uni_type));
404        }
405
406        // Add edge ID column if edge variable is bound
407        if let Some(edge_var) = edge_variable {
408            let edge_id_name = format!("{}._eid", edge_var);
409            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
410
411            // Add edge _type column for type(r) support
412            fields.push(Field::new(
413                format!("{}._type", edge_var),
414                DataType::Utf8,
415                true,
416            ));
417
418            // Add edge property columns with types resolved from schema
419            for prop_name in edge_properties {
420                let prop_col_name = format!("{}.{}", edge_var, prop_name);
421                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
422                let uni_type = edge_props.and_then(|p| p.get(prop_name)).map(|m| &m.r#type);
423                fields.push(property_field(&prop_col_name, arrow_type, uni_type));
424            }
425        } else {
426            // Add internal edge ID column for relationship uniqueness tracking
427            // even when edge variable is not explicitly bound.
428            let internal_eid_name = format!("__eid_to_{}", target_variable);
429            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
430        }
431
432        Arc::new(Schema::new(fields))
433    }
434}
435
436impl DisplayAs for GraphTraverseExec {
437    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        write!(
439            f,
440            "GraphTraverseExec: {} --[{:?}]--> {}",
441            self.source_column, self.edge_type_ids, self.target_variable
442        )?;
443        if let Some(ref edge_var) = self.edge_variable {
444            write!(f, " as {}", edge_var)?;
445        }
446        Ok(())
447    }
448}
449
450impl ExecutionPlan for GraphTraverseExec {
451    fn name(&self) -> &str {
452        "GraphTraverseExec"
453    }
454
455    fn as_any(&self) -> &dyn Any {
456        self
457    }
458
459    fn schema(&self) -> SchemaRef {
460        self.schema.clone()
461    }
462
463    fn properties(&self) -> &Arc<PlanProperties> {
464        &self.properties
465    }
466
467    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
468        vec![&self.input]
469    }
470
471    fn with_new_children(
472        self: Arc<Self>,
473        children: Vec<Arc<dyn ExecutionPlan>>,
474    ) -> DFResult<Arc<dyn ExecutionPlan>> {
475        if children.len() != 1 {
476            return Err(datafusion::error::DataFusionError::Plan(
477                "GraphTraverseExec requires exactly one child".to_string(),
478            ));
479        }
480
481        Ok(Arc::new(Self::new(
482            children[0].clone(),
483            self.source_column.clone(),
484            self.edge_type_ids.clone(),
485            self.direction,
486            self.target_variable.clone(),
487            self.edge_variable.clone(),
488            self.edge_properties.clone(),
489            self.target_properties.clone(),
490            self.target_label_name.clone(),
491            self.target_label_id,
492            self.graph_ctx.clone(),
493            self.optional,
494            self.optional_pattern_vars.clone(),
495            self.bound_target_column.clone(),
496            self.used_edge_columns.clone(),
497        )))
498    }
499
500    fn execute(
501        &self,
502        partition: usize,
503        context: Arc<TaskContext>,
504    ) -> DFResult<SendableRecordBatchStream> {
505        let input_stream = self.input.execute(partition, context)?;
506
507        let metrics = BaselineMetrics::new(&self.metrics, partition);
508
509        let warm_fut = self
510            .graph_ctx
511            .warming_future(self.edge_type_ids.clone(), self.direction);
512
513        Ok(Box::pin(GraphTraverseStream {
514            input: input_stream,
515            source_column: self.source_column.clone(),
516            edge_type_ids: self.edge_type_ids.clone(),
517            direction: self.direction,
518            target_variable: self.target_variable.clone(),
519            edge_variable: self.edge_variable.clone(),
520            edge_properties: self.edge_properties.clone(),
521            target_properties: self.target_properties.clone(),
522            target_label_name: self.target_label_name.clone(),
523            graph_ctx: self.graph_ctx.clone(),
524            optional: self.optional,
525            optional_pattern_vars: self.optional_pattern_vars.clone(),
526            bound_target_column: self.bound_target_column.clone(),
527            used_edge_columns: self.used_edge_columns.clone(),
528            schema: self.schema.clone(),
529            state: TraverseStreamState::Warming(warm_fut),
530            metrics,
531        }))
532    }
533
534    fn metrics(&self) -> Option<MetricsSet> {
535        Some(self.metrics.clone_inner())
536    }
537}
538
539/// State machine for traverse stream execution.
540enum TraverseStreamState {
541    /// Warming adjacency CSRs before first batch.
542    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
543    /// Polling the input stream for batches.
544    Reading,
545    /// Materializing target vertex properties asynchronously.
546    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
547    /// Stream is done.
548    Done,
549}
550
551/// Stream that performs single-hop traversal with async property materialization.
552struct GraphTraverseStream {
553    /// Input stream.
554    input: SendableRecordBatchStream,
555
556    /// Column name containing source VIDs.
557    source_column: String,
558
559    /// Edge type IDs to traverse.
560    edge_type_ids: Vec<u32>,
561
562    /// Traversal direction.
563    direction: Direction,
564
565    /// Variable name for target vertex (retained for diagnostics).
566    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
567    target_variable: String,
568
569    /// Variable name for edge (if bound).
570    edge_variable: Option<String>,
571
572    /// Edge properties to materialize.
573    edge_properties: Vec<String>,
574
575    /// Target vertex properties to materialize.
576    target_properties: Vec<String>,
577
578    /// Target label name for property resolution and filtering.
579    target_label_name: Option<String>,
580
581    /// Graph execution context.
582    graph_ctx: Arc<GraphExecutionContext>,
583
584    /// Whether this is an OPTIONAL MATCH.
585    optional: bool,
586
587    /// Variables introduced by the OPTIONAL MATCH pattern.
588    optional_pattern_vars: HashSet<String>,
589
590    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
591    bound_target_column: Option<String>,
592
593    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
594    used_edge_columns: Vec<String>,
595
596    /// Output schema.
597    schema: SchemaRef,
598
599    /// Stream state.
600    state: TraverseStreamState,
601
602    /// Metrics.
603    metrics: BaselineMetrics,
604}
605
606impl GraphTraverseStream {
607    /// Expand neighbors synchronously and return expansions.
608    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
609    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
610        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
611            datafusion::error::DataFusionError::Execution(format!(
612                "Source column '{}' not found",
613                self.source_column
614            ))
615        })?;
616
617        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
618        let source_vids: &UInt64Array = &source_vid_cow;
619
620        // If bound_target_column is set, get the expected target VIDs for each row.
621        // This is used for cycle patterns like n-->k<--n where the target must match.
622        let bound_target_cow = self
623            .bound_target_column
624            .as_ref()
625            .and_then(|col| batch.column_by_name(col))
626            .map(|c| column_as_vid_array(c.as_ref()))
627            .transpose()?;
628        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
629
630        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
631        let used_edge_arrays: Vec<&UInt64Array> = self
632            .used_edge_columns
633            .iter()
634            .filter_map(|col| {
635                batch
636                    .column_by_name(col)
637                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
638            })
639            .collect();
640
641        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
642        let is_undirected = matches!(self.direction, Direction::Both);
643
644        for (row_idx, source_vid) in source_vids.iter().enumerate() {
645            let Some(src) = source_vid else {
646                continue;
647            };
648
649            // Get expected target VID if this is a bound target pattern.
650            // Distinguish between:
651            // - no bound target column (no filtering),
652            // - bound target present but NULL for this row (must produce no expansion),
653            // - bound target present with VID.
654            let expected_target = bound_target_vids.map(|arr| {
655                if arr.is_null(row_idx) {
656                    None
657                } else {
658                    Some(arr.value(row_idx))
659                }
660            });
661
662            // Collect used edge IDs for this row from all previous hops
663            let used_eids: HashSet<u64> = used_edge_arrays
664                .iter()
665                .filter_map(|arr| {
666                    if arr.is_null(row_idx) {
667                        None
668                    } else {
669                        Some(arr.value(row_idx))
670                    }
671                })
672                .collect();
673
674            let vid = Vid::from(src);
675            // For Direction::Both, deduplicate edges by eid within each source.
676            // This prevents the same edge being counted twice (once outgoing, once incoming).
677            let mut seen_edges: HashSet<u64> = HashSet::new();
678
679            for &edge_type in &self.edge_type_ids {
680                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
681
682                for (target_vid, eid) in neighbors {
683                    let eid_u64 = eid.as_u64();
684
685                    // Skip edges already used in previous hops (relationship uniqueness)
686                    if used_eids.contains(&eid_u64) {
687                        continue;
688                    }
689
690                    // Deduplicate edges for undirected patterns
691                    if is_undirected && !seen_edges.insert(eid_u64) {
692                        continue;
693                    }
694
695                    // Filter by bound target VID if set (for cycle patterns).
696                    // NULL bound targets do not match anything.
697                    if let Some(expected_opt) = expected_target {
698                        let Some(expected) = expected_opt else {
699                            continue;
700                        };
701                        if target_vid.as_u64() != expected {
702                            continue;
703                        }
704                    }
705
706                    // Filter by target label using L0 visibility.
707                    // VIDs no longer embed label information, so we must look up labels.
708                    if let Some(ref label_name) = self.target_label_name {
709                        let query_ctx = self.graph_ctx.query_context();
710                        if let Some(vertex_labels) =
711                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
712                        {
713                            // Vertex is in L0 — require actual label match
714                            if !vertex_labels.contains(label_name) {
715                                continue;
716                            }
717                        }
718                        // else: vertex not in L0 → trust storage-level filtering
719                    }
720
721                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
722                }
723            }
724        }
725
726        Ok(expanded_rows)
727    }
728}
729
730/// Build target vertex labels column from L0 buffers.
731fn build_target_labels_column(
732    target_vids: &[Vid],
733    target_label_name: &Option<String>,
734    graph_ctx: &GraphExecutionContext,
735) -> ArrayRef {
736    use arrow_array::builder::{ListBuilder, StringBuilder};
737    let mut labels_builder = ListBuilder::new(StringBuilder::new());
738    let query_ctx = graph_ctx.query_context();
739    for vid in target_vids {
740        let row_labels: Vec<String> =
741            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
742                Some(labels) => labels,
743                None => {
744                    // Vertex not in L0 — trust schema label (storage already filtered)
745                    if let Some(label_name) = target_label_name {
746                        vec![label_name.clone()]
747                    } else {
748                        vec![]
749                    }
750                }
751            };
752        let values = labels_builder.values();
753        for lbl in &row_labels {
754            values.append_value(lbl);
755        }
756        labels_builder.append(true);
757    }
758    Arc::new(labels_builder.finish())
759}
760
761/// Build target vertex property columns from storage and L0.
762async fn build_target_property_columns(
763    target_vids: &[Vid],
764    target_properties: &[String],
765    target_label_name: &Option<String>,
766    graph_ctx: &Arc<GraphExecutionContext>,
767) -> DFResult<Vec<ArrayRef>> {
768    let mut columns = Vec::new();
769
770    if let Some(label_name) = target_label_name {
771        let property_manager = graph_ctx.property_manager();
772        let query_ctx = graph_ctx.query_context();
773
774        let props_map = property_manager
775            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
776            .await
777            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
778
779        let uni_schema = graph_ctx.storage().schema_manager().schema();
780        let label_props = uni_schema.properties.get(label_name.as_str());
781
782        for prop_name in target_properties {
783            let data_type = resolve_property_type(prop_name, label_props);
784            let column =
785                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
786            columns.push(column);
787        }
788    } else {
789        // No label name — use label-agnostic property lookup.
790        let non_internal_props: Vec<&str> = target_properties
791            .iter()
792            .filter(|p| *p != "_all_props")
793            .map(|s| s.as_str())
794            .collect();
795        let property_manager = graph_ctx.property_manager();
796        let query_ctx = graph_ctx.query_context();
797
798        let props_map = if !non_internal_props.is_empty() {
799            property_manager
800                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
801                .await
802                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
803        } else {
804            std::collections::HashMap::new()
805        };
806
807        for prop_name in target_properties {
808            if prop_name == "_all_props" {
809                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
810            } else {
811                let column = build_property_column_static(
812                    target_vids,
813                    &props_map,
814                    prop_name,
815                    &arrow::datatypes::DataType::LargeBinary,
816                )?;
817                columns.push(column);
818            }
819        }
820    }
821
822    Ok(columns)
823}
824
825/// Build a CypherValue blob column from all vertex properties (L0 + storage).
826fn build_all_props_column(
827    target_vids: &[Vid],
828    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
829    graph_ctx: &Arc<GraphExecutionContext>,
830) -> ArrayRef {
831    use arrow_array::builder::LargeBinaryBuilder;
832
833    let mut builder = LargeBinaryBuilder::new();
834    let l0_ctx = graph_ctx.l0_context();
835    for vid in target_vids {
836        // Merge in `Value` space so typed values (temporals) are preserved.
837        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
838        if let Some(vid_props) = props_map.get(vid) {
839            for (k, v) in vid_props.iter() {
840                merged_props.insert(k.clone(), v.clone());
841            }
842        }
843        for l0 in l0_ctx.iter_l0_buffers() {
844            let guard = l0.read();
845            if let Some(l0_props) = guard.vertex_properties.get(vid) {
846                for (k, v) in l0_props.iter() {
847                    merged_props.insert(k.clone(), v.clone());
848                }
849            }
850        }
851        if merged_props.is_empty() {
852            builder.append_null();
853        } else {
854            builder.append_value(uni_common::cypher_value_codec::encode(
855                &uni_common::Value::Map(merged_props),
856            ));
857        }
858    }
859    Arc::new(builder.finish())
860}
861
862/// Build edge ID, type, and property columns for bound edge variables.
863async fn build_edge_columns(
864    expansions: &[(usize, Vid, u64, u32)],
865    edge_properties: &[String],
866    edge_type_ids: &[u32],
867    graph_ctx: &Arc<GraphExecutionContext>,
868) -> DFResult<Vec<ArrayRef>> {
869    let mut columns = Vec::new();
870
871    let eids: Vec<Eid> = expansions
872        .iter()
873        .map(|(_, _, eid, _)| Eid::from(*eid))
874        .collect();
875    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
876    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
877
878    // Edge _type column
879    {
880        let uni_schema = graph_ctx.storage().schema_manager().schema();
881        let mut type_builder = arrow_array::builder::StringBuilder::new();
882        for (_, _, _, edge_type_id) in expansions {
883            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
884                type_builder.append_value(&name);
885            } else {
886                type_builder.append_null();
887            }
888        }
889        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
890    }
891
892    if !edge_properties.is_empty() {
893        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
894        let property_manager = graph_ctx.property_manager();
895        let query_ctx = graph_ctx.query_context();
896
897        let props_map = property_manager
898            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
899            .await
900            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
901
902        let uni_schema = graph_ctx.storage().schema_manager().schema();
903        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
904        let edge_type_props = if merged_edge_props.is_empty() {
905            None
906        } else {
907            Some(&merged_edge_props)
908        };
909
910        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
911
912        for prop_name in edge_properties {
913            // System-managed edge timestamps live outside the property bag.
914            // Read them directly from L0 (earliest creation, latest update).
915            // Disk-only edges will surface as null — acceptable for the
916            // common case where the queried edges are L0-resident in the
917            // same session.
918            if prop_name == "_created_at" || prop_name == "_updated_at" {
919                let mut builder =
920                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
921                let l0_ctx = graph_ctx.l0_context();
922                for eid in &eids {
923                    let mut value: Option<i64> = None;
924                    for l0 in l0_ctx.iter_l0_buffers() {
925                        let guard = l0.read();
926                        let opt = if prop_name == "_created_at" {
927                            guard.edge_created_at.get(eid).copied()
928                        } else {
929                            guard.edge_updated_at.get(eid).copied()
930                        };
931                        if let Some(ts) = opt {
932                            value = Some(match value {
933                                None => ts,
934                                Some(cur) if prop_name == "_created_at" => cur.min(ts),
935                                Some(cur) => cur.max(ts),
936                            });
937                        }
938                    }
939                    match value {
940                        Some(ts) => builder.append_value(ts),
941                        None => builder.append_null(),
942                    }
943                }
944                columns.push(Arc::new(builder.finish()) as ArrayRef);
945                continue;
946            }
947            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
948            let column =
949                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
950            columns.push(column);
951        }
952    }
953
954    Ok(columns)
955}
956
957/// Build the output batch with target vertex properties.
958///
959/// This is a standalone async function so it can be boxed into a `Send` future
960/// without borrowing from `GraphTraverseStream`.
961#[expect(
962    clippy::too_many_arguments,
963    reason = "Standalone async fn needs all context passed explicitly"
964)]
965async fn build_traverse_output_batch(
966    input: RecordBatch,
967    expansions: Vec<(usize, Vid, u64, u32)>,
968    schema: SchemaRef,
969    edge_variable: Option<String>,
970    edge_properties: Vec<String>,
971    edge_type_ids: Vec<u32>,
972    target_properties: Vec<String>,
973    target_label_name: Option<String>,
974    graph_ctx: Arc<GraphExecutionContext>,
975    optional: bool,
976    optional_pattern_vars: HashSet<String>,
977) -> DFResult<RecordBatch> {
978    if expansions.is_empty() {
979        if !optional {
980            return Ok(RecordBatch::new_empty(schema));
981        }
982        let unmatched_reps = collect_unmatched_optional_group_rows(
983            &input,
984            &HashSet::new(),
985            &schema,
986            &optional_pattern_vars,
987        )?;
988        if unmatched_reps.is_empty() {
989            return Ok(RecordBatch::new_empty(schema));
990        }
991        return build_optional_null_batch_for_rows_with_optional_vars(
992            &input,
993            &unmatched_reps,
994            &schema,
995            &optional_pattern_vars,
996        );
997    }
998
999    // Expand input columns via index array
1000    let indices: Vec<u64> = expansions
1001        .iter()
1002        .map(|(idx, _, _, _)| *idx as u64)
1003        .collect();
1004    let indices_array = UInt64Array::from(indices);
1005    let mut columns: Vec<ArrayRef> = input
1006        .columns()
1007        .iter()
1008        .map(|col| take(col.as_ref(), &indices_array, None))
1009        .collect::<Result<_, _>>()?;
1010
1011    // Target VID column
1012    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1013    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1014    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1015
1016    // Target labels column
1017    columns.push(build_target_labels_column(
1018        &target_vids,
1019        &target_label_name,
1020        &graph_ctx,
1021    ));
1022
1023    // Target vertex property columns
1024    if !target_properties.is_empty() {
1025        let prop_cols = build_target_property_columns(
1026            &target_vids,
1027            &target_properties,
1028            &target_label_name,
1029            &graph_ctx,
1030        )
1031        .await?;
1032        columns.extend(prop_cols);
1033    }
1034
1035    // Edge columns (bound or internal tracking)
1036    if edge_variable.is_some() {
1037        let edge_cols =
1038            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1039        columns.extend(edge_cols);
1040    } else {
1041        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1042        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1043    }
1044
1045    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1046
1047    // Append null rows for unmatched optional sources
1048    if optional {
1049        let matched_indices: HashSet<usize> =
1050            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1051        let unmatched = collect_unmatched_optional_group_rows(
1052            &input,
1053            &matched_indices,
1054            &schema,
1055            &optional_pattern_vars,
1056        )?;
1057
1058        if !unmatched.is_empty() {
1059            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1060                &input,
1061                &unmatched,
1062                &schema,
1063                &optional_pattern_vars,
1064            )?;
1065            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1066                .map_err(arrow_err)?;
1067            return Ok(combined);
1068        }
1069    }
1070
1071    Ok(expanded_batch)
1072}
1073
1074/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1075/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1076fn build_optional_null_batch_for_rows(
1077    input: &RecordBatch,
1078    unmatched_indices: &[usize],
1079    schema: &SchemaRef,
1080) -> DFResult<RecordBatch> {
1081    let num_rows = unmatched_indices.len();
1082    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1083    let indices_array = UInt64Array::from(indices);
1084
1085    // Take the unmatched input rows
1086    let mut columns: Vec<ArrayRef> = Vec::new();
1087    for col in input.columns() {
1088        let taken = take(col.as_ref(), &indices_array, None)?;
1089        columns.push(taken);
1090    }
1091    // Fill remaining columns with nulls
1092    for field in schema.fields().iter().skip(input.num_columns()) {
1093        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1094    }
1095    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1096}
1097
1098fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1099    optional_vars.contains(col_name)
1100        || optional_vars.iter().any(|var| {
1101            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1102                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1103        })
1104}
1105
1106fn collect_unmatched_optional_group_rows(
1107    input: &RecordBatch,
1108    matched_indices: &HashSet<usize>,
1109    schema: &SchemaRef,
1110    optional_vars: &HashSet<String>,
1111) -> DFResult<Vec<usize>> {
1112    if input.num_rows() == 0 {
1113        return Ok(Vec::new());
1114    }
1115
1116    if optional_vars.is_empty() {
1117        return Ok((0..input.num_rows())
1118            .filter(|idx| !matched_indices.contains(idx))
1119            .collect());
1120    }
1121
1122    let source_vid_indices: Vec<usize> = schema
1123        .fields()
1124        .iter()
1125        .enumerate()
1126        .filter_map(|(idx, field)| {
1127            if idx >= input.num_columns() {
1128                return None;
1129            }
1130            let name = field.name();
1131            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1132                Some(idx)
1133            } else {
1134                None
1135            }
1136        })
1137        .collect();
1138
1139    // Group rows by non-optional VID bindings and preserve group order.
1140    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1141    let mut group_order: Vec<Vec<u8>> = Vec::new();
1142
1143    for row_idx in 0..input.num_rows() {
1144        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1145        let entry = groups.entry(key.clone());
1146        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1147            group_order.push(key.clone());
1148        }
1149        let matched = matched_indices.contains(&row_idx);
1150        entry
1151            .and_modify(|(_, any_matched)| *any_matched |= matched)
1152            .or_insert((row_idx, matched));
1153    }
1154
1155    Ok(group_order
1156        .into_iter()
1157        .filter_map(|key| {
1158            groups
1159                .get(&key)
1160                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1161        })
1162        .collect())
1163}
1164
1165fn compute_optional_group_key(
1166    batch: &RecordBatch,
1167    row_idx: usize,
1168    source_vid_indices: &[usize],
1169) -> DFResult<Vec<u8>> {
1170    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1171    for &col_idx in source_vid_indices {
1172        let col = batch.column(col_idx);
1173        let vid_cow = column_as_vid_array(col.as_ref())?;
1174        let arr: &UInt64Array = &vid_cow;
1175        if arr.is_null(row_idx) {
1176            key.extend_from_slice(&u64::MAX.to_le_bytes());
1177        } else {
1178            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1179        }
1180    }
1181    Ok(key)
1182}
1183
1184fn build_optional_null_batch_for_rows_with_optional_vars(
1185    input: &RecordBatch,
1186    unmatched_indices: &[usize],
1187    schema: &SchemaRef,
1188    optional_vars: &HashSet<String>,
1189) -> DFResult<RecordBatch> {
1190    if optional_vars.is_empty() {
1191        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1192    }
1193
1194    let num_rows = unmatched_indices.len();
1195    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1196    let indices_array = UInt64Array::from(indices);
1197
1198    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1199    for (col_idx, field) in schema.fields().iter().enumerate() {
1200        if col_idx < input.num_columns() {
1201            if is_optional_column_for_vars(field.name(), optional_vars) {
1202                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1203            } else {
1204                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1205                columns.push(taken);
1206            }
1207        } else {
1208            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1209        }
1210    }
1211
1212    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1213}
1214
1215impl Stream for GraphTraverseStream {
1216    type Item = DFResult<RecordBatch>;
1217
1218    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1219        let metrics = self.metrics.clone();
1220        let _timer = metrics.elapsed_compute().timer();
1221        loop {
1222            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1223
1224            match state {
1225                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1226                    Poll::Ready(Ok(())) => {
1227                        self.state = TraverseStreamState::Reading;
1228                        // Continue loop to start reading
1229                    }
1230                    Poll::Ready(Err(e)) => {
1231                        self.state = TraverseStreamState::Done;
1232                        return Poll::Ready(Some(Err(e)));
1233                    }
1234                    Poll::Pending => {
1235                        self.state = TraverseStreamState::Warming(fut);
1236                        return Poll::Pending;
1237                    }
1238                },
1239                TraverseStreamState::Reading => {
1240                    // Check timeout
1241                    if let Err(e) = self.graph_ctx.check_timeout() {
1242                        return Poll::Ready(Some(Err(
1243                            datafusion::error::DataFusionError::Execution(e.to_string()),
1244                        )));
1245                    }
1246
1247                    match self.input.poll_next_unpin(cx) {
1248                        Poll::Ready(Some(Ok(batch))) => {
1249                            // Expand neighbors synchronously
1250                            let expansions = match self.expand_neighbors(&batch) {
1251                                Ok(exp) => exp,
1252                                Err(e) => {
1253                                    self.state = TraverseStreamState::Reading;
1254                                    return Poll::Ready(Some(Err(e)));
1255                                }
1256                            };
1257
1258                            // Build output synchronously only when no properties need async hydration
1259                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1260                            {
1261                                let result = build_traverse_output_batch_sync(
1262                                    &batch,
1263                                    &expansions,
1264                                    &self.schema,
1265                                    self.edge_variable.as_ref(),
1266                                    &self.graph_ctx,
1267                                    self.optional,
1268                                    &self.optional_pattern_vars,
1269                                );
1270                                self.state = TraverseStreamState::Reading;
1271                                if let Ok(ref r) = result {
1272                                    self.metrics.record_output(r.num_rows());
1273                                }
1274                                return Poll::Ready(Some(result));
1275                            }
1276
1277                            // Properties needed — create async future for hydration
1278                            let schema = self.schema.clone();
1279                            let edge_variable = self.edge_variable.clone();
1280                            let edge_properties = self.edge_properties.clone();
1281                            let edge_type_ids = self.edge_type_ids.clone();
1282                            let target_properties = self.target_properties.clone();
1283                            let target_label_name = self.target_label_name.clone();
1284                            let graph_ctx = self.graph_ctx.clone();
1285
1286                            let optional = self.optional;
1287                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1288
1289                            let fut = build_traverse_output_batch(
1290                                batch,
1291                                expansions,
1292                                schema,
1293                                edge_variable,
1294                                edge_properties,
1295                                edge_type_ids,
1296                                target_properties,
1297                                target_label_name,
1298                                graph_ctx,
1299                                optional,
1300                                optional_pattern_vars,
1301                            );
1302
1303                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1304                            // Continue loop to poll the future
1305                        }
1306                        Poll::Ready(Some(Err(e))) => {
1307                            self.state = TraverseStreamState::Done;
1308                            return Poll::Ready(Some(Err(e)));
1309                        }
1310                        Poll::Ready(None) => {
1311                            self.state = TraverseStreamState::Done;
1312                            return Poll::Ready(None);
1313                        }
1314                        Poll::Pending => {
1315                            self.state = TraverseStreamState::Reading;
1316                            return Poll::Pending;
1317                        }
1318                    }
1319                }
1320                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1321                    Poll::Ready(Ok(batch)) => {
1322                        self.state = TraverseStreamState::Reading;
1323                        self.metrics.record_output(batch.num_rows());
1324                        return Poll::Ready(Some(Ok(batch)));
1325                    }
1326                    Poll::Ready(Err(e)) => {
1327                        self.state = TraverseStreamState::Done;
1328                        return Poll::Ready(Some(Err(e)));
1329                    }
1330                    Poll::Pending => {
1331                        self.state = TraverseStreamState::Materializing(fut);
1332                        return Poll::Pending;
1333                    }
1334                },
1335                TraverseStreamState::Done => {
1336                    return Poll::Ready(None);
1337                }
1338            }
1339        }
1340    }
1341}
1342
1343/// Build output batch synchronously when no properties need async hydration.
1344///
1345/// Only called when both `target_properties` and `edge_properties` are empty,
1346/// so no property columns need to be materialized.
1347fn build_traverse_output_batch_sync(
1348    input: &RecordBatch,
1349    expansions: &[(usize, Vid, u64, u32)],
1350    schema: &SchemaRef,
1351    edge_variable: Option<&String>,
1352    graph_ctx: &GraphExecutionContext,
1353    optional: bool,
1354    optional_pattern_vars: &HashSet<String>,
1355) -> DFResult<RecordBatch> {
1356    if expansions.is_empty() {
1357        if !optional {
1358            return Ok(RecordBatch::new_empty(schema.clone()));
1359        }
1360        let unmatched_reps = collect_unmatched_optional_group_rows(
1361            input,
1362            &HashSet::new(),
1363            schema,
1364            optional_pattern_vars,
1365        )?;
1366        if unmatched_reps.is_empty() {
1367            return Ok(RecordBatch::new_empty(schema.clone()));
1368        }
1369        return build_optional_null_batch_for_rows_with_optional_vars(
1370            input,
1371            &unmatched_reps,
1372            schema,
1373            optional_pattern_vars,
1374        );
1375    }
1376
1377    let indices: Vec<u64> = expansions
1378        .iter()
1379        .map(|(idx, _, _, _)| *idx as u64)
1380        .collect();
1381    let indices_array = UInt64Array::from(indices);
1382
1383    let mut columns: Vec<ArrayRef> = Vec::new();
1384    for col in input.columns() {
1385        let expanded = take(col.as_ref(), &indices_array, None)?;
1386        columns.push(expanded);
1387    }
1388
1389    // Add target VID column
1390    let target_vids: Vec<u64> = expansions
1391        .iter()
1392        .map(|(_, vid, _, _)| vid.as_u64())
1393        .collect();
1394    columns.push(Arc::new(UInt64Array::from(target_vids)));
1395
1396    // Add target ._labels column (from L0 buffers)
1397    {
1398        use arrow_array::builder::{ListBuilder, StringBuilder};
1399        let l0_ctx = graph_ctx.l0_context();
1400        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1401        for (_, vid, _, _) in expansions {
1402            let mut row_labels: Vec<String> = Vec::new();
1403            for l0 in l0_ctx.iter_l0_buffers() {
1404                let guard = l0.read();
1405                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1406                    for lbl in l0_labels {
1407                        if !row_labels.contains(lbl) {
1408                            row_labels.push(lbl.clone());
1409                        }
1410                    }
1411                }
1412            }
1413            let values = labels_builder.values();
1414            for lbl in &row_labels {
1415                values.append_value(lbl);
1416            }
1417            labels_builder.append(true);
1418        }
1419        columns.push(Arc::new(labels_builder.finish()));
1420    }
1421
1422    // Add edge columns if edge is bound (no properties in sync path)
1423    if edge_variable.is_some() {
1424        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1425        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1426
1427        // Add edge _type column
1428        let uni_schema = graph_ctx.storage().schema_manager().schema();
1429        let mut type_builder = arrow_array::builder::StringBuilder::new();
1430        for (_, _, _, edge_type_id) in expansions {
1431            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1432                type_builder.append_value(&name);
1433            } else {
1434                type_builder.append_null();
1435            }
1436        }
1437        columns.push(Arc::new(type_builder.finish()));
1438    } else {
1439        // Internal EID column for relationship uniqueness tracking (matches schema)
1440        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1441        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1442    }
1443
1444    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1445
1446    if optional {
1447        let matched_indices: HashSet<usize> =
1448            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1449        let unmatched = collect_unmatched_optional_group_rows(
1450            input,
1451            &matched_indices,
1452            schema,
1453            optional_pattern_vars,
1454        )?;
1455
1456        if !unmatched.is_empty() {
1457            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1458                input,
1459                &unmatched,
1460                schema,
1461                optional_pattern_vars,
1462            )?;
1463            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1464                .map_err(|e| {
1465                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1466            })?;
1467            return Ok(combined);
1468        }
1469    }
1470
1471    Ok(expanded_batch)
1472}
1473
1474impl RecordBatchStream for GraphTraverseStream {
1475    fn schema(&self) -> SchemaRef {
1476        self.schema.clone()
1477    }
1478}
1479
1480/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1481///
1482/// Type name and properties are `Arc`-shared: an undirected (`Both`) traversal
1483/// stores every edge under both endpoints, which used to deep-clone the whole
1484/// property map per edge (review perf #5).
1485type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1486
1487/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1488///
1489/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1490/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1491///
1492/// # Example
1493///
1494/// ```ignore
1495/// // Traverse schemaless "CUSTOM" edges
1496/// let traverse = GraphTraverseMainExec::new(
1497///     input_plan,
1498///     "_vid",
1499///     "CUSTOM",
1500///     Direction::Outgoing,
1501///     "m",           // target variable
1502///     Some("r"),     // edge variable
1503///     vec![],        // edge properties
1504///     vec![],        // target properties
1505///     graph_ctx,
1506///     false,         // not optional
1507/// );
1508/// ```
1509pub struct GraphTraverseMainExec {
1510    /// Input execution plan.
1511    input: Arc<dyn ExecutionPlan>,
1512
1513    /// Column name containing source VIDs.
1514    source_column: String,
1515
1516    /// Edge type names (not IDs, since schemaless types may not have IDs).
1517    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1518    type_names: Vec<String>,
1519
1520    /// Traversal direction.
1521    direction: Direction,
1522
1523    /// Variable name for target vertex columns.
1524    target_variable: String,
1525
1526    /// Variable name for edge columns (if edge is bound).
1527    edge_variable: Option<String>,
1528
1529    /// Edge properties to materialize.
1530    edge_properties: Vec<String>,
1531
1532    /// Target vertex properties to materialize.
1533    target_properties: Vec<String>,
1534
1535    /// Graph execution context.
1536    graph_ctx: Arc<GraphExecutionContext>,
1537
1538    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1539    optional: bool,
1540
1541    /// Variables introduced by the OPTIONAL MATCH pattern.
1542    optional_pattern_vars: HashSet<String>,
1543
1544    /// Column name of an already-bound target VID (for patterns where target is in scope).
1545    /// When set, only traversals reaching this exact VID are included.
1546    bound_target_column: Option<String>,
1547
1548    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1549    /// Edges matching any of these IDs are excluded from traversal results.
1550    used_edge_columns: Vec<String>,
1551
1552    /// Output schema.
1553    schema: SchemaRef,
1554
1555    /// Cached plan properties.
1556    properties: Arc<PlanProperties>,
1557
1558    /// Execution metrics.
1559    metrics: ExecutionPlanMetricsSet,
1560}
1561
1562impl fmt::Debug for GraphTraverseMainExec {
1563    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1564        f.debug_struct("GraphTraverseMainExec")
1565            .field("type_names", &self.type_names)
1566            .field("direction", &self.direction)
1567            .field("target_variable", &self.target_variable)
1568            .field("edge_variable", &self.edge_variable)
1569            .finish()
1570    }
1571}
1572
1573impl GraphTraverseMainExec {
1574    /// Create a new schemaless traversal executor.
1575    #[expect(clippy::too_many_arguments)]
1576    pub fn new(
1577        input: Arc<dyn ExecutionPlan>,
1578        source_column: impl Into<String>,
1579        type_names: Vec<String>,
1580        direction: Direction,
1581        target_variable: impl Into<String>,
1582        edge_variable: Option<String>,
1583        edge_properties: Vec<String>,
1584        target_properties: Vec<String>,
1585        graph_ctx: Arc<GraphExecutionContext>,
1586        optional: bool,
1587        optional_pattern_vars: HashSet<String>,
1588        bound_target_column: Option<String>,
1589        used_edge_columns: Vec<String>,
1590    ) -> Self {
1591        let source_column = source_column.into();
1592        let target_variable = target_variable.into();
1593
1594        // Build output schema
1595        let schema = Self::build_schema(
1596            &input.schema(),
1597            &target_variable,
1598            &edge_variable,
1599            &edge_properties,
1600            &target_properties,
1601            optional,
1602        );
1603
1604        let properties = compute_plan_properties(schema.clone());
1605
1606        Self {
1607            input,
1608            source_column,
1609            type_names,
1610            direction,
1611            target_variable,
1612            edge_variable,
1613            edge_properties,
1614            target_properties,
1615            graph_ctx,
1616            optional,
1617            optional_pattern_vars,
1618            bound_target_column,
1619            used_edge_columns,
1620            schema,
1621            properties,
1622            metrics: ExecutionPlanMetricsSet::new(),
1623        }
1624    }
1625
1626    /// Build output schema for traversal.
1627    fn build_schema(
1628        input_schema: &SchemaRef,
1629        target_variable: &str,
1630        edge_variable: &Option<String>,
1631        edge_properties: &[String],
1632        target_properties: &[String],
1633        optional: bool,
1634    ) -> SchemaRef {
1635        let mut fields: Vec<Field> = input_schema
1636            .fields()
1637            .iter()
1638            .map(|f| f.as_ref().clone())
1639            .collect();
1640
1641        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1642        let target_vid_name = format!("{}._vid", target_variable);
1643        if input_schema.column_with_name(&target_vid_name).is_none() {
1644            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1645        }
1646
1647        // Add target ._labels column (only if not already in input)
1648        let target_labels_name = format!("{}._labels", target_variable);
1649        if input_schema.column_with_name(&target_labels_name).is_none() {
1650            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1651        }
1652
1653        // Add edge columns if edge variable is bound
1654        if let Some(edge_var) = edge_variable {
1655            fields.push(Field::new(
1656                format!("{}._eid", edge_var),
1657                DataType::UInt64,
1658                optional,
1659            ));
1660
1661            // Add edge ._type column for type(r) support
1662            fields.push(Field::new(
1663                format!("{}._type", edge_var),
1664                DataType::Utf8,
1665                true,
1666            ));
1667
1668            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1669            // Schemaless edges store properties as CypherValue blobs so that
1670            // Int, Float, etc. round-trip correctly through Arrow.
1671            for prop in edge_properties {
1672                let col_name = format!("{}.{}", edge_var, prop);
1673                let mut metadata = std::collections::HashMap::new();
1674                metadata.insert("cv_encoded".to_string(), "true".to_string());
1675                fields.push(
1676                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1677                );
1678            }
1679        } else {
1680            // Add internal edge ID for anonymous relationships so BindPath can
1681            // reconstruct named paths (p = (a)-[:T]->(b)).
1682            fields.push(Field::new(
1683                format!("__eid_to_{}", target_variable),
1684                DataType::UInt64,
1685                optional,
1686            ));
1687        }
1688
1689        // Target properties: all as LargeBinary (deferred to PropertyManager)
1690        for prop in target_properties {
1691            fields.push(Field::new(
1692                format!("{}.{}", target_variable, prop),
1693                DataType::LargeBinary,
1694                true,
1695            ));
1696        }
1697
1698        Arc::new(Schema::new(fields))
1699    }
1700}
1701
1702impl DisplayAs for GraphTraverseMainExec {
1703    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1704        write!(
1705            f,
1706            "GraphTraverseMainExec: types={:?}, direction={:?}",
1707            self.type_names, self.direction
1708        )
1709    }
1710}
1711
1712impl ExecutionPlan for GraphTraverseMainExec {
1713    fn name(&self) -> &str {
1714        "GraphTraverseMainExec"
1715    }
1716
1717    fn as_any(&self) -> &dyn Any {
1718        self
1719    }
1720
1721    fn schema(&self) -> SchemaRef {
1722        self.schema.clone()
1723    }
1724
1725    fn properties(&self) -> &Arc<PlanProperties> {
1726        &self.properties
1727    }
1728
1729    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1730        vec![&self.input]
1731    }
1732
1733    fn with_new_children(
1734        self: Arc<Self>,
1735        children: Vec<Arc<dyn ExecutionPlan>>,
1736    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1737        if children.len() != 1 {
1738            return Err(datafusion::error::DataFusionError::Plan(
1739                "GraphTraverseMainExec expects exactly one child".to_string(),
1740            ));
1741        }
1742
1743        Ok(Arc::new(Self {
1744            input: children[0].clone(),
1745            source_column: self.source_column.clone(),
1746            type_names: self.type_names.clone(),
1747            direction: self.direction,
1748            target_variable: self.target_variable.clone(),
1749            edge_variable: self.edge_variable.clone(),
1750            edge_properties: self.edge_properties.clone(),
1751            target_properties: self.target_properties.clone(),
1752            graph_ctx: self.graph_ctx.clone(),
1753            optional: self.optional,
1754            optional_pattern_vars: self.optional_pattern_vars.clone(),
1755            bound_target_column: self.bound_target_column.clone(),
1756            used_edge_columns: self.used_edge_columns.clone(),
1757            schema: self.schema.clone(),
1758            properties: self.properties.clone(),
1759            metrics: self.metrics.clone(),
1760        }))
1761    }
1762
1763    fn execute(
1764        &self,
1765        partition: usize,
1766        context: Arc<TaskContext>,
1767    ) -> DFResult<SendableRecordBatchStream> {
1768        let input_stream = self.input.execute(partition, context)?;
1769        let metrics = BaselineMetrics::new(&self.metrics, partition);
1770
1771        Ok(Box::pin(GraphTraverseMainStream::new(
1772            input_stream,
1773            self.source_column.clone(),
1774            self.type_names.clone(),
1775            self.direction,
1776            self.target_variable.clone(),
1777            self.edge_variable.clone(),
1778            self.edge_properties.clone(),
1779            self.target_properties.clone(),
1780            self.graph_ctx.clone(),
1781            self.optional,
1782            self.optional_pattern_vars.clone(),
1783            self.bound_target_column.clone(),
1784            self.used_edge_columns.clone(),
1785            self.schema.clone(),
1786            metrics,
1787        )))
1788    }
1789
1790    fn metrics(&self) -> Option<MetricsSet> {
1791        Some(self.metrics.clone_inner())
1792    }
1793}
1794
1795/// Maximum number of distinct source vids pushed into the edge scan; above
1796/// this a whole-type scan is cheaper than a giant `IN` filter (measured: a
1797/// 10k-vid IN over a 100k-edge table is ~2.6x slower than the full scan,
1798/// while small source sets are ~4.5x faster — see `schemaless_traversal`
1799/// bench).
1800const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1801
1802/// State machine for GraphTraverseMainStream.
1803///
1804/// The input is drained FIRST so the distinct source vids can be pushed into
1805/// the edge scan (review perf #5) — without this, a 1-source traversal
1806/// materialized every edge of the type. Memory note: buffering the input does
1807/// not change the operator's memory class; it already materialized the entire
1808/// edge type (and with pushdown materializes strictly less).
1809enum GraphTraverseMainState {
1810    /// Draining the input stream to learn the source-vid set.
1811    CollectingInput {
1812        input_stream: SendableRecordBatchStream,
1813        buffered: Vec<RecordBatch>,
1814    },
1815    /// Loading adjacency map from main edges table.
1816    LoadingEdges {
1817        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1818        buffered: Vec<RecordBatch>,
1819    },
1820    /// Replaying the buffered input batches against the loaded adjacency.
1821    Processing {
1822        adjacency: EdgeAdjacencyMap,
1823        buffered: std::vec::IntoIter<RecordBatch>,
1824    },
1825    /// Stream is done.
1826    Done,
1827}
1828
1829/// Stream that executes schemaless edge traversal.
1830struct GraphTraverseMainStream {
1831    /// Source column name.
1832    source_column: String,
1833
1834    /// Target variable name.
1835    target_variable: String,
1836
1837    /// Edge variable name.
1838    edge_variable: Option<String>,
1839
1840    /// Edge properties to materialize.
1841    edge_properties: Vec<String>,
1842
1843    /// Target properties to materialize.
1844    target_properties: Vec<String>,
1845
1846    /// Graph execution context.
1847    graph_ctx: Arc<GraphExecutionContext>,
1848
1849    /// Whether this is optional (preserve unmatched rows).
1850    optional: bool,
1851
1852    /// Variables introduced by OPTIONAL pattern.
1853    optional_pattern_vars: HashSet<String>,
1854
1855    /// Column name of an already-bound target VID (for filtering).
1856    bound_target_column: Option<String>,
1857
1858    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1859    used_edge_columns: Vec<String>,
1860
1861    /// Output schema.
1862    schema: SchemaRef,
1863
1864    /// Edge type names to traverse (retained for the deferred edge load).
1865    type_names: Vec<String>,
1866
1867    /// Traversal direction (retained for the deferred edge load).
1868    direction: Direction,
1869
1870    /// Stream state.
1871    state: GraphTraverseMainState,
1872
1873    /// Metrics.
1874    metrics: BaselineMetrics,
1875}
1876
1877impl GraphTraverseMainStream {
1878    /// Create a new traverse main stream.
1879    #[expect(clippy::too_many_arguments)]
1880    fn new(
1881        input_stream: SendableRecordBatchStream,
1882        source_column: String,
1883        type_names: Vec<String>,
1884        direction: Direction,
1885        target_variable: String,
1886        edge_variable: Option<String>,
1887        edge_properties: Vec<String>,
1888        target_properties: Vec<String>,
1889        graph_ctx: Arc<GraphExecutionContext>,
1890        optional: bool,
1891        optional_pattern_vars: HashSet<String>,
1892        bound_target_column: Option<String>,
1893        used_edge_columns: Vec<String>,
1894        schema: SchemaRef,
1895        metrics: BaselineMetrics,
1896    ) -> Self {
1897        // The input is drained first (CollectingInput) so the edge load can
1898        // push the bounded source-vid set into the scan.
1899        Self {
1900            source_column,
1901            target_variable,
1902            edge_variable,
1903            edge_properties,
1904            target_properties,
1905            graph_ctx,
1906            optional,
1907            optional_pattern_vars,
1908            bound_target_column,
1909            used_edge_columns,
1910            schema,
1911            type_names,
1912            direction,
1913            state: GraphTraverseMainState::CollectingInput {
1914                input_stream,
1915                buffered: Vec::new(),
1916            },
1917            metrics,
1918        }
1919    }
1920
1921    /// Distinct source vids across the buffered input, or `None` when the set
1922    /// exceeds [`TRAVERSE_PUSHDOWN_MAX_SOURCES`] (whole-type scan is cheaper)
1923    /// or the source column cannot be read (the edge load then behaves exactly
1924    /// as before pushdown existed).
1925    fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1926        let mut vids: HashSet<Vid> = HashSet::new();
1927        for batch in buffered {
1928            let col = batch.column_by_name(&self.source_column)?;
1929            let arr = column_as_vid_array(col.as_ref()).ok()?;
1930            for i in 0..arr.len() {
1931                if !arr.is_null(i) {
1932                    vids.insert(Vid::from(arr.value(i)));
1933                    if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1934                        return None;
1935                    }
1936                }
1937            }
1938        }
1939        Some(vids)
1940    }
1941
1942    /// Expand input batch using adjacency map (synchronous version).
1943    fn expand_batch(
1944        &self,
1945        input: &RecordBatch,
1946        adjacency: &EdgeAdjacencyMap,
1947    ) -> DFResult<RecordBatch> {
1948        // Extract source VIDs from source column
1949        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1950            datafusion::error::DataFusionError::Execution(format!(
1951                "Source column {} not found",
1952                self.source_column
1953            ))
1954        })?;
1955
1956        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1957        let source_vids: &UInt64Array = &source_vid_cow;
1958
1959        // Read bound target VIDs if column exists
1960        let bound_target_cow = self
1961            .bound_target_column
1962            .as_ref()
1963            .and_then(|col| input.column_by_name(col))
1964            .map(|c| column_as_vid_array(c.as_ref()))
1965            .transpose()?;
1966        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1967
1968        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1969        let used_edge_arrays: Vec<&UInt64Array> = self
1970            .used_edge_columns
1971            .iter()
1972            .filter_map(|col| {
1973                input
1974                    .column_by_name(col)
1975                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1976            })
1977            .collect();
1978
1979        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props).
1980        // Type/props are Arc-shared with the adjacency map (pointer clones).
1981        type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1982        let mut expansions: Vec<Expansion> = Vec::new();
1983
1984        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1985            if let Some(src_u64) = src_u64 {
1986                let src_vid = Vid::from(src_u64);
1987
1988                // Collect used edge IDs for this row from all previous hops
1989                let used_eids: HashSet<u64> = used_edge_arrays
1990                    .iter()
1991                    .filter_map(|arr| {
1992                        if arr.is_null(row_idx) {
1993                            None
1994                        } else {
1995                            Some(arr.value(row_idx))
1996                        }
1997                    })
1998                    .collect();
1999
2000                if let Some(neighbors) = adjacency.get(&src_vid) {
2001                    for (target_vid, eid, edge_type, props) in neighbors {
2002                        // Skip edges already used in previous hops (relationship uniqueness)
2003                        if used_eids.contains(&eid.as_u64()) {
2004                            continue;
2005                        }
2006
2007                        // Filter by bound target VID if set (for patterns where target is in scope).
2008                        // Only include traversals where the target matches the expected VID.
2009                        if let Some(targets) = expected_targets {
2010                            if targets.is_null(row_idx) {
2011                                continue;
2012                            }
2013                            let expected_vid = targets.value(row_idx);
2014                            if target_vid.as_u64() != expected_vid {
2015                                continue;
2016                            }
2017                        }
2018
2019                        expansions.push((
2020                            row_idx,
2021                            *target_vid,
2022                            *eid,
2023                            Arc::clone(edge_type),
2024                            Arc::clone(props),
2025                        ));
2026                    }
2027                }
2028            }
2029        }
2030
2031        // Handle OPTIONAL: preserve unmatched rows
2032        if expansions.is_empty() && self.optional {
2033            // No matches - return input with NULL columns appended
2034            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2035            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2036        }
2037
2038        if expansions.is_empty() {
2039            // No matches, not optional - return empty batch
2040            return Ok(RecordBatch::new_empty(self.schema.clone()));
2041        }
2042
2043        // Track matched rows for OPTIONAL handling
2044        let matched_rows: HashSet<usize> = if self.optional {
2045            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2046        } else {
2047            HashSet::new()
2048        };
2049
2050        // Expand input columns using Arrow take()
2051        let mut columns: Vec<ArrayRef> = Vec::new();
2052        let indices: Vec<u64> = expansions
2053            .iter()
2054            .map(|(idx, _, _, _, _)| *idx as u64)
2055            .collect();
2056        let indices_array = UInt64Array::from(indices);
2057
2058        for col in input.columns() {
2059            let expanded = take(col.as_ref(), &indices_array, None)?;
2060            columns.push(expanded);
2061        }
2062
2063        // Add target ._vid column (only if not already in input)
2064        let target_vid_name = format!("{}._vid", self.target_variable);
2065        let target_vids: Vec<u64> = expansions
2066            .iter()
2067            .map(|(_, vid, _, _, _)| vid.as_u64())
2068            .collect();
2069        if input.schema().column_with_name(&target_vid_name).is_none() {
2070            columns.push(Arc::new(UInt64Array::from(target_vids)));
2071        }
2072
2073        // Add target ._labels column (only if not already in input)
2074        let target_labels_name = format!("{}._labels", self.target_variable);
2075        if input
2076            .schema()
2077            .column_with_name(&target_labels_name)
2078            .is_none()
2079        {
2080            use arrow_array::builder::{ListBuilder, StringBuilder};
2081            let l0_ctx = self.graph_ctx.l0_context();
2082            let mut labels_builder = ListBuilder::new(StringBuilder::new());
2083            for (_, target_vid, _, _, _) in &expansions {
2084                let mut row_labels: Vec<String> = Vec::new();
2085                for l0 in l0_ctx.iter_l0_buffers() {
2086                    let guard = l0.read();
2087                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2088                        for lbl in l0_labels {
2089                            if !row_labels.contains(lbl) {
2090                                row_labels.push(lbl.clone());
2091                            }
2092                        }
2093                    }
2094                }
2095                let values = labels_builder.values();
2096                for lbl in &row_labels {
2097                    values.append_value(lbl);
2098                }
2099                labels_builder.append(true);
2100            }
2101            columns.push(Arc::new(labels_builder.finish()));
2102        }
2103
2104        // Add edge columns if edge variable is bound.
2105        // For anonymous relationships, emit internal edge IDs for BindPath.
2106        if self.edge_variable.is_some() {
2107            // Add edge ._eid column
2108            let eids: Vec<u64> = expansions
2109                .iter()
2110                .map(|(_, _, eid, _, _)| eid.as_u64())
2111                .collect();
2112            columns.push(Arc::new(UInt64Array::from(eids)));
2113
2114            // Add edge ._type column
2115            {
2116                let mut type_builder = arrow_array::builder::StringBuilder::new();
2117                for (_, _, _, edge_type, _) in &expansions {
2118                    type_builder.append_value(edge_type);
2119                }
2120                columns.push(Arc::new(type_builder.finish()));
2121            }
2122
2123            // Add edge property columns as cv_encoded LargeBinary to preserve types
2124            for prop_name in &self.edge_properties {
2125                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2126                if prop_name == "_all_props" {
2127                    // Serialize all edge properties to a CypherValue blob directly
2128                    // from `Value` so typed values (temporals) are preserved.
2129                    for (_, _, _, _, props) in &expansions {
2130                        if props.is_empty() {
2131                            builder.append_null();
2132                        } else {
2133                            let map: HashMap<String, uni_common::Value> =
2134                                props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2135                            builder.append_value(uni_common::cypher_value_codec::encode(
2136                                &uni_common::Value::Map(map),
2137                            ));
2138                        }
2139                    }
2140                } else {
2141                    // Named property as cv_encoded CypherValue
2142                    for (_, _, _, _, props) in &expansions {
2143                        match props.get(prop_name) {
2144                            Some(uni_common::Value::Null) | None => builder.append_null(),
2145                            Some(val) => {
2146                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2147                            }
2148                        }
2149                    }
2150                }
2151                columns.push(Arc::new(builder.finish()));
2152            }
2153        } else {
2154            let eids: Vec<u64> = expansions
2155                .iter()
2156                .map(|(_, _, eid, _, _)| eid.as_u64())
2157                .collect();
2158            columns.push(Arc::new(UInt64Array::from(eids)));
2159        }
2160
2161        // Add target property columns (hydrate from L0 buffers)
2162        {
2163            let l0_ctx = self.graph_ctx.l0_context();
2164
2165            for prop_name in &self.target_properties {
2166                if prop_name == "_all_props" {
2167                    // Build full CypherValue blob from all L0 vertex properties
2168                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2169                    for (_, target_vid, _, _, _) in &expansions {
2170                        // Merge in `Value` space so typed values (temporals) survive.
2171                        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2172                        for l0 in l0_ctx.iter_l0_buffers() {
2173                            let guard = l0.read();
2174                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2175                                for (k, v) in props.iter() {
2176                                    merged_props.insert(k.clone(), v.clone());
2177                                }
2178                            }
2179                        }
2180                        if merged_props.is_empty() {
2181                            builder.append_null();
2182                        } else {
2183                            builder.append_value(uni_common::cypher_value_codec::encode(
2184                                &uni_common::Value::Map(merged_props),
2185                            ));
2186                        }
2187                    }
2188                    columns.push(Arc::new(builder.finish()));
2189                } else {
2190                    // Extract individual property from L0 and encode as CypherValue
2191                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2192                    for (_, target_vid, _, _, _) in &expansions {
2193                        let mut found = false;
2194                        for l0 in l0_ctx.iter_l0_buffers() {
2195                            let guard = l0.read();
2196                            if let Some(props) = guard.vertex_properties.get(target_vid)
2197                                && let Some(val) = props.get(prop_name.as_str())
2198                                && !val.is_null()
2199                            {
2200                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2201                                found = true;
2202                                break;
2203                            }
2204                        }
2205                        if !found {
2206                            builder.append_null();
2207                        }
2208                    }
2209                    columns.push(Arc::new(builder.finish()));
2210                }
2211            }
2212        }
2213
2214        let matched_batch =
2215            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2216
2217        // Handle OPTIONAL: append unmatched rows with NULLs
2218        if self.optional {
2219            let unmatched = collect_unmatched_optional_group_rows(
2220                input,
2221                &matched_rows,
2222                &self.schema,
2223                &self.optional_pattern_vars,
2224            )?;
2225
2226            if unmatched.is_empty() {
2227                return Ok(matched_batch);
2228            }
2229
2230            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2231                input,
2232                &unmatched,
2233                &self.schema,
2234                &self.optional_pattern_vars,
2235            )?;
2236
2237            // Concatenate matched and unmatched batches
2238            use arrow::compute::concat_batches;
2239            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2240        } else {
2241            Ok(matched_batch)
2242        }
2243    }
2244}
2245
2246impl GraphExecutionContext {
2247    /// Records a schemaless traversal's full edge-type scan into the SSI read-set.
2248    ///
2249    /// `build_edge_adjacency_map` scans every edge of the traversed type(s) via
2250    /// `find_edges_by_type_names` up front, so the transaction's true read
2251    /// footprint is that whole adjacency — not just the neighbourhoods later
2252    /// expanded. Recording it here gives schemaless single-hop and
2253    /// variable-length traversal (which bypass the per-vertex `get_neighbors`
2254    /// path) the same item-level antidependency coverage the schema-typed
2255    /// `record_neighbor_reads` path provides. No-op for read-only transactions
2256    /// (`occ_read_set` is `Some` only for read-write transactions).
2257    fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2258        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2259            return;
2260        };
2261        let guard = tx_l0.read();
2262        let Some(read_set) = &guard.occ_read_set else {
2263            return;
2264        };
2265        let mut rs = read_set.lock();
2266        for (src, neighbors) in adjacency {
2267            rs.vertices.insert(*src);
2268            for (nbr, eid, _type, _props) in neighbors {
2269                rs.vertices.insert(*nbr);
2270                rs.edges.insert(*eid);
2271            }
2272        }
2273    }
2274}
2275
2276/// Build adjacency map from main edges table for given type names and direction.
2277///
2278/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2279/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2280/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2281///
2282/// `source_vids`, when supplied, is pushed into the persisted scan so only the
2283/// sources' incident edges are materialized instead of the whole edge type
2284/// (review perf #5). The L0 overlay below deliberately stays whole-type: L0 is
2285/// small, and unused adjacency entries are never expanded.
2286async fn build_edge_adjacency_map(
2287    graph_ctx: &GraphExecutionContext,
2288    type_names: &[String],
2289    direction: Direction,
2290    source_vids: Option<HashSet<Vid>>,
2291) -> DFResult<EdgeAdjacencyMap> {
2292    let storage = graph_ctx.storage();
2293    let l0_ctx = graph_ctx.l0_context();
2294
2295    // Step 1: Query main edges table for all type names, pushing the bounded
2296    // source set into the scan when present.
2297    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2298    let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2299    let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2300        let side = match direction {
2301            Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2302            Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2303            Direction::Both => uni_store::storage::EndpointSide::Either,
2304        };
2305        (side, vids)
2306    });
2307    let edges_with_type = storage
2308        .find_edges_by_type_names(&type_refs, endpoint_filter)
2309        .await
2310        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2311
2312    // Edge filter matching the pushed-down scan, applied to the L0 overlay
2313    // below so the map (and therefore the SSI read-set recorded from it) has
2314    // ONE consistent footprint across both storage tiers.
2315    let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2316        match &source_vids {
2317            None => true,
2318            Some(set) => match direction {
2319                Direction::Outgoing => set.contains(&src),
2320                Direction::Incoming => set.contains(&dst),
2321                Direction::Both => set.contains(&src) || set.contains(&dst),
2322            },
2323        }
2324    };
2325
2326    // Preserve edge type name in the adjacency map for type(r) support
2327    let mut edges: Vec<(
2328        uni_common::Eid,
2329        uni_common::Vid,
2330        uni_common::Vid,
2331        String,
2332        uni_common::Properties,
2333    )> = edges_with_type.into_iter().collect();
2334
2335    // Step 2: Overlay L0 buffers for all type names
2336    for l0 in l0_ctx.iter_l0_buffers() {
2337        let l0_guard = l0.read();
2338
2339        for type_name in type_names {
2340            let l0_eids = l0_guard.eids_for_type(type_name);
2341
2342            // For each L0 edge, extract its information
2343            for &eid in &l0_eids {
2344                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2345                    let src_vid = edge_ref.src_vid;
2346                    let dst_vid = edge_ref.dst_vid;
2347                    if !edge_in_scope(src_vid, dst_vid) {
2348                        continue;
2349                    }
2350
2351                    // Get properties for this edge from L0
2352                    let props = l0_guard
2353                        .edge_properties
2354                        .get(&eid)
2355                        .cloned()
2356                        .unwrap_or_default();
2357
2358                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2359                }
2360            }
2361        }
2362    }
2363
2364    // Step 3: Deduplicate by EID (L0 takes precedence)
2365    let mut seen_eids = HashSet::new();
2366    let mut unique_edges = Vec::new();
2367    for edge in edges.into_iter().rev() {
2368        if seen_eids.insert(edge.0) {
2369            unique_edges.push(edge);
2370        }
2371    }
2372    unique_edges.reverse();
2373
2374    // Step 4: Filter out edges tombstoned in any L0 buffer
2375    let mut tombstoned_eids = HashSet::new();
2376    for l0 in l0_ctx.iter_l0_buffers() {
2377        let l0_guard = l0.read();
2378        for eid in l0_guard.tombstones.keys() {
2379            tombstoned_eids.insert(*eid);
2380        }
2381    }
2382    if !tombstoned_eids.is_empty() {
2383        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2384    }
2385
2386    // Step 5: Build adjacency map based on direction. Type name and props are
2387    // Arc-shared so the `Both` arm clones two pointers, not the property map.
2388    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2389
2390    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2391        let edge_type: Arc<str> = edge_type.into();
2392        let props = Arc::new(props);
2393        match direction {
2394            Direction::Outgoing => {
2395                adjacency
2396                    .entry(src_vid)
2397                    .or_default()
2398                    .push((dst_vid, eid, edge_type, props));
2399            }
2400            Direction::Incoming => {
2401                adjacency
2402                    .entry(dst_vid)
2403                    .or_default()
2404                    .push((src_vid, eid, edge_type, props));
2405            }
2406            Direction::Both => {
2407                adjacency.entry(src_vid).or_default().push((
2408                    dst_vid,
2409                    eid,
2410                    Arc::clone(&edge_type),
2411                    Arc::clone(&props),
2412                ));
2413                adjacency
2414                    .entry(dst_vid)
2415                    .or_default()
2416                    .push((src_vid, eid, edge_type, props));
2417            }
2418        }
2419    }
2420
2421    // SSI: a schemaless traversal physically scans the edge type above —
2422    // whole-type, or with source pushdown every edge incident to every actual
2423    // source — so record that read footprint for read-write transactions
2424    // (no-op otherwise). With pushdown this records exactly what the
2425    // transaction logically read; the whole-type scan was incidentally
2426    // (not load-bearing) more conservative under the item-level,
2427    // phantoms-out-of-scope read-set contract.
2428    graph_ctx.record_edge_adjacency(&adjacency);
2429
2430    Ok(adjacency)
2431}
2432
2433/// Build the edge-property `EidFilter` for a typed variable-length traversal.
2434///
2435/// For a pattern like `[r:KNOWS*1..3 {year: 1988}]` the inline edge-property
2436/// conditions must filter *flushed* (CSR/Lance) edges too — not only L0
2437/// in-memory edges as the original code did. We pre-scan every edge of the
2438/// traversed type(s) — flushed edges via `find_edges_by_type_names`, L0 edges
2439/// via the overlay, both handled (with tombstone/dedup semantics) by
2440/// [`build_edge_adjacency_map`] — evaluate the conditions against each edge's
2441/// materialized properties, and collect the passing Eids into an allow-set.
2442///
2443/// Returns [`EidFilter::AllAllowed`] when there are no conditions (nothing to
2444/// filter) or when no edge type id resolves to a name (cannot pre-scan —
2445/// degrade to the prior, less-restrictive behaviour rather than drop every
2446/// edge).
2447async fn build_edge_property_filter(
2448    graph_ctx: &Arc<GraphExecutionContext>,
2449    edge_type_ids: &[u32],
2450    direction: Direction,
2451    conditions: &[(String, UniValue)],
2452) -> DFResult<EidFilter> {
2453    if conditions.is_empty() {
2454        return Ok(EidFilter::AllAllowed);
2455    }
2456
2457    let uni_schema = graph_ctx.storage().schema_manager().schema();
2458    let type_names: Vec<String> = edge_type_ids
2459        .iter()
2460        .filter_map(|id| uni_schema.edge_type_name_by_id_unified(*id))
2461        .collect();
2462    if type_names.is_empty() {
2463        return Ok(EidFilter::AllAllowed);
2464    }
2465
2466    // Enumerate every edge of these types (flushed + L0, tombstone-aware) with
2467    // properties materialized. `None` source-vids => whole-type scan, matching
2468    // the schemaless VLP path.
2469    let adjacency = build_edge_adjacency_map(graph_ctx, &type_names, direction, None).await?;
2470
2471    let mut passing: Vec<u64> = Vec::new();
2472    let mut max_eid: u64 = 0;
2473    let mut seen: FxHashSet<u64> = FxHashSet::default();
2474    for edges in adjacency.values() {
2475        for (_neighbor, eid, _etype, props) in edges {
2476            let raw = eid.as_u64();
2477            // `Direction::Both` lists each edge under both endpoints — dedup so
2478            // the density heuristic in `from_eids` sees a true count.
2479            if !seen.insert(raw) {
2480                continue;
2481            }
2482            max_eid = max_eid.max(raw);
2483            let passes = conditions
2484                .iter()
2485                .all(|(name, expected)| props.get(name).is_some_and(|actual| actual == expected));
2486            if passes {
2487                passing.push(raw);
2488            }
2489        }
2490    }
2491
2492    Ok(EidFilter::from_eids(passing, max_eid as usize + 1))
2493}
2494
2495impl Stream for GraphTraverseMainStream {
2496    type Item = DFResult<RecordBatch>;
2497
2498    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2499        let metrics = self.metrics.clone();
2500        let _timer = metrics.elapsed_compute().timer();
2501        loop {
2502            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2503
2504            match state {
2505                GraphTraverseMainState::CollectingInput {
2506                    mut input_stream,
2507                    mut buffered,
2508                } => match input_stream.poll_next_unpin(cx) {
2509                    Poll::Ready(Some(Ok(batch))) => {
2510                        buffered.push(batch);
2511                        self.state = GraphTraverseMainState::CollectingInput {
2512                            input_stream,
2513                            buffered,
2514                        };
2515                        // Continue loop to keep draining.
2516                    }
2517                    Poll::Ready(Some(Err(e))) => {
2518                        self.state = GraphTraverseMainState::Done;
2519                        return Poll::Ready(Some(Err(e)));
2520                    }
2521                    Poll::Ready(None) => {
2522                        // Input fully drained: kick off the edge load with the
2523                        // bounded source set pushed into the scan (when small).
2524                        let source_vids = self.collect_source_vids(&buffered);
2525                        let loading_ctx = self.graph_ctx.clone();
2526                        let loading_types = self.type_names.clone();
2527                        let direction = self.direction;
2528                        let fut = async move {
2529                            build_edge_adjacency_map(
2530                                &loading_ctx,
2531                                &loading_types,
2532                                direction,
2533                                source_vids,
2534                            )
2535                            .await
2536                        };
2537                        self.state = GraphTraverseMainState::LoadingEdges {
2538                            future: Box::pin(fut),
2539                            buffered,
2540                        };
2541                        // Continue loop to poll the load future.
2542                    }
2543                    Poll::Pending => {
2544                        self.state = GraphTraverseMainState::CollectingInput {
2545                            input_stream,
2546                            buffered,
2547                        };
2548                        return Poll::Pending;
2549                    }
2550                },
2551                GraphTraverseMainState::LoadingEdges {
2552                    mut future,
2553                    buffered,
2554                } => match future.as_mut().poll(cx) {
2555                    Poll::Ready(Ok(adjacency)) => {
2556                        // Move to processing state with loaded adjacency
2557                        self.state = GraphTraverseMainState::Processing {
2558                            adjacency,
2559                            buffered: buffered.into_iter(),
2560                        };
2561                        // Continue loop to start processing
2562                    }
2563                    Poll::Ready(Err(e)) => {
2564                        self.state = GraphTraverseMainState::Done;
2565                        return Poll::Ready(Some(Err(e)));
2566                    }
2567                    Poll::Pending => {
2568                        self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2569                        return Poll::Pending;
2570                    }
2571                },
2572                GraphTraverseMainState::Processing {
2573                    adjacency,
2574                    mut buffered,
2575                } => {
2576                    // Check timeout
2577                    if let Err(e) = self.graph_ctx.check_timeout() {
2578                        return Poll::Ready(Some(Err(
2579                            datafusion::error::DataFusionError::Execution(e.to_string()),
2580                        )));
2581                    }
2582
2583                    match buffered.next() {
2584                        Some(batch) => {
2585                            // Expand batch using adjacency map
2586                            let result = self.expand_batch(&batch, &adjacency);
2587
2588                            self.state = GraphTraverseMainState::Processing {
2589                                adjacency,
2590                                buffered,
2591                            };
2592
2593                            if let Ok(ref r) = result {
2594                                self.metrics.record_output(r.num_rows());
2595                            }
2596                            return Poll::Ready(Some(result));
2597                        }
2598                        None => {
2599                            self.state = GraphTraverseMainState::Done;
2600                            return Poll::Ready(None);
2601                        }
2602                    }
2603                }
2604                GraphTraverseMainState::Done => {
2605                    return Poll::Ready(None);
2606                }
2607            }
2608        }
2609    }
2610}
2611
2612impl RecordBatchStream for GraphTraverseMainStream {
2613    fn schema(&self) -> SchemaRef {
2614        self.schema.clone()
2615    }
2616}
2617
2618/// Variable-length graph traversal execution plan.
2619///
2620/// Performs BFS traversal from source vertices with configurable min/max hops.
2621/// Tracks visited nodes to avoid cycles.
2622///
2623/// # Example
2624///
2625/// ```ignore
2626/// // Find all nodes 1-3 hops away via KNOWS edges
2627/// let traverse = GraphVariableLengthTraverseExec::new(
2628///     input_plan,
2629///     "_vid",
2630///     knows_type_id,
2631///     Direction::Outgoing,
2632///     1,  // min_hops
2633///     3,  // max_hops
2634///     Some("p"), // path variable
2635///     graph_ctx,
2636/// );
2637/// ```
2638pub struct GraphVariableLengthTraverseExec {
2639    /// Input execution plan.
2640    input: Arc<dyn ExecutionPlan>,
2641
2642    /// Column name containing source VIDs.
2643    source_column: String,
2644
2645    /// Edge type IDs to traverse.
2646    edge_type_ids: Vec<u32>,
2647
2648    /// Traversal direction.
2649    direction: Direction,
2650
2651    /// Minimum number of hops.
2652    min_hops: usize,
2653
2654    /// Maximum number of hops.
2655    max_hops: usize,
2656
2657    /// Variable name for target vertex columns.
2658    target_variable: String,
2659
2660    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2661    step_variable: Option<String>,
2662
2663    /// Variable name for path (if path is bound).
2664    path_variable: Option<String>,
2665
2666    /// Target vertex properties to materialize.
2667    target_properties: Vec<String>,
2668
2669    /// Target label name for property type resolution.
2670    target_label_name: Option<String>,
2671
2672    /// Whether this is an optional match (LEFT JOIN semantics).
2673    is_optional: bool,
2674
2675    /// Column name of an already-bound target VID (for patterns where target is in scope).
2676    bound_target_column: Option<String>,
2677
2678    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2679    edge_lance_filter: Option<String>,
2680
2681    /// Simple property equality conditions for per-edge L0 checking during BFS.
2682    /// Each entry is (property_name, expected_value).
2683    edge_property_conditions: Vec<(String, UniValue)>,
2684
2685    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2686    used_edge_columns: Vec<String>,
2687
2688    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2689    path_mode: super::nfa::PathMode,
2690
2691    /// Output mode determining BFS strategy.
2692    output_mode: super::nfa::VlpOutputMode,
2693
2694    /// Compiled NFA for path pattern matching.
2695    nfa: Arc<PathNfa>,
2696
2697    /// Graph execution context.
2698    graph_ctx: Arc<GraphExecutionContext>,
2699
2700    /// Output schema.
2701    schema: SchemaRef,
2702
2703    /// Cached plan properties.
2704    properties: Arc<PlanProperties>,
2705
2706    /// Execution metrics.
2707    metrics: ExecutionPlanMetricsSet,
2708}
2709
2710impl fmt::Debug for GraphVariableLengthTraverseExec {
2711    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2712        f.debug_struct("GraphVariableLengthTraverseExec")
2713            .field("source_column", &self.source_column)
2714            .field("edge_type_ids", &self.edge_type_ids)
2715            .field("direction", &self.direction)
2716            .field("min_hops", &self.min_hops)
2717            .field("max_hops", &self.max_hops)
2718            .field("target_variable", &self.target_variable)
2719            .finish()
2720    }
2721}
2722
2723impl GraphVariableLengthTraverseExec {
2724    /// Create a new variable-length traversal plan.
2725    ///
2726    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2727    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2728    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2729    #[expect(clippy::too_many_arguments)]
2730    pub fn new(
2731        input: Arc<dyn ExecutionPlan>,
2732        source_column: impl Into<String>,
2733        edge_type_ids: Vec<u32>,
2734        direction: Direction,
2735        min_hops: usize,
2736        max_hops: usize,
2737        target_variable: impl Into<String>,
2738        step_variable: Option<String>,
2739        path_variable: Option<String>,
2740        target_properties: Vec<String>,
2741        target_label_name: Option<String>,
2742        graph_ctx: Arc<GraphExecutionContext>,
2743        is_optional: bool,
2744        bound_target_column: Option<String>,
2745        edge_lance_filter: Option<String>,
2746        edge_property_conditions: Vec<(String, UniValue)>,
2747        used_edge_columns: Vec<String>,
2748        path_mode: super::nfa::PathMode,
2749        output_mode: super::nfa::VlpOutputMode,
2750        qpp_nfa: Option<PathNfa>,
2751    ) -> Self {
2752        let source_column = source_column.into();
2753        let target_variable = target_variable.into();
2754
2755        // Resolve target property Arrow types from the schema
2756        let uni_schema = graph_ctx.storage().schema_manager().schema();
2757        let label_props = target_label_name
2758            .as_deref()
2759            .and_then(|ln| uni_schema.properties.get(ln));
2760
2761        // Build output schema
2762        let schema = Self::build_schema(
2763            input.schema(),
2764            &target_variable,
2765            step_variable.as_deref(),
2766            path_variable.as_deref(),
2767            &target_properties,
2768            label_props,
2769        );
2770        let properties = compute_plan_properties(schema.clone());
2771
2772        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2773        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2774            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2775        }));
2776
2777        Self {
2778            input,
2779            source_column,
2780            edge_type_ids,
2781            direction,
2782            min_hops,
2783            max_hops,
2784            target_variable,
2785            step_variable,
2786            path_variable,
2787            target_properties,
2788            target_label_name,
2789            is_optional,
2790            bound_target_column,
2791            edge_lance_filter,
2792            edge_property_conditions,
2793            used_edge_columns,
2794            path_mode,
2795            output_mode,
2796            nfa,
2797            graph_ctx,
2798            schema,
2799            properties,
2800            metrics: ExecutionPlanMetricsSet::new(),
2801        }
2802    }
2803
2804    /// Build output schema.
2805    fn build_schema(
2806        input_schema: SchemaRef,
2807        target_variable: &str,
2808        step_variable: Option<&str>,
2809        path_variable: Option<&str>,
2810        target_properties: &[String],
2811        label_props: Option<
2812            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2813        >,
2814    ) -> SchemaRef {
2815        let mut fields: Vec<Field> = input_schema
2816            .fields()
2817            .iter()
2818            .map(|f| f.as_ref().clone())
2819            .collect();
2820
2821        // Add target VID column (only if not already in input)
2822        let target_vid_name = format!("{}._vid", target_variable);
2823        if input_schema.column_with_name(&target_vid_name).is_none() {
2824            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2825        }
2826
2827        // Add target ._labels column (only if not already in input)
2828        let target_labels_name = format!("{}._labels", target_variable);
2829        if input_schema.column_with_name(&target_labels_name).is_none() {
2830            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2831        }
2832
2833        // Add target vertex property columns (skip if already in input)
2834        for prop_name in target_properties {
2835            let col_name = format!("{}.{}", target_variable, prop_name);
2836            if input_schema.column_with_name(&col_name).is_none() {
2837                let arrow_type = resolve_property_type(prop_name, label_props);
2838                let uni_type = label_props
2839                    .and_then(|p| p.get(prop_name))
2840                    .map(|m| &m.r#type);
2841                fields.push(property_field(&col_name, arrow_type, uni_type));
2842            }
2843        }
2844
2845        // Add hop count
2846        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2847
2848        // Add step variable (edge list) if bound
2849        if let Some(step_var) = step_variable {
2850            fields.push(build_edge_list_field(step_var));
2851        }
2852
2853        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2854        if let Some(path_var) = path_variable
2855            && input_schema.column_with_name(path_var).is_none()
2856        {
2857            fields.push(build_path_struct_field(path_var));
2858        }
2859
2860        Arc::new(Schema::new(fields))
2861    }
2862}
2863
2864impl DisplayAs for GraphVariableLengthTraverseExec {
2865    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2866        write!(
2867            f,
2868            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2869            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2870        )
2871    }
2872}
2873
2874impl ExecutionPlan for GraphVariableLengthTraverseExec {
2875    fn name(&self) -> &str {
2876        "GraphVariableLengthTraverseExec"
2877    }
2878
2879    fn as_any(&self) -> &dyn Any {
2880        self
2881    }
2882
2883    fn schema(&self) -> SchemaRef {
2884        self.schema.clone()
2885    }
2886
2887    fn properties(&self) -> &Arc<PlanProperties> {
2888        &self.properties
2889    }
2890
2891    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2892        vec![&self.input]
2893    }
2894
2895    fn with_new_children(
2896        self: Arc<Self>,
2897        children: Vec<Arc<dyn ExecutionPlan>>,
2898    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2899        if children.len() != 1 {
2900            return Err(datafusion::error::DataFusionError::Plan(
2901                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2902            ));
2903        }
2904
2905        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2906        Ok(Arc::new(Self::new(
2907            children[0].clone(),
2908            self.source_column.clone(),
2909            self.edge_type_ids.clone(),
2910            self.direction,
2911            self.min_hops,
2912            self.max_hops,
2913            self.target_variable.clone(),
2914            self.step_variable.clone(),
2915            self.path_variable.clone(),
2916            self.target_properties.clone(),
2917            self.target_label_name.clone(),
2918            self.graph_ctx.clone(),
2919            self.is_optional,
2920            self.bound_target_column.clone(),
2921            self.edge_lance_filter.clone(),
2922            self.edge_property_conditions.clone(),
2923            self.used_edge_columns.clone(),
2924            self.path_mode.clone(),
2925            self.output_mode.clone(),
2926            Some((*self.nfa).clone()),
2927        )))
2928    }
2929
2930    fn execute(
2931        &self,
2932        partition: usize,
2933        context: Arc<TaskContext>,
2934    ) -> DFResult<SendableRecordBatchStream> {
2935        let input_stream = self.input.execute(partition, context)?;
2936
2937        let metrics = BaselineMetrics::new(&self.metrics, partition);
2938
2939        // Warm the adjacency CSRs, then (when the pattern carries inline
2940        // edge-property conditions) build the real `EidFilter` from a property
2941        // pre-scan so that flushed CSR/Lance edges are filtered too — not just
2942        // L0 in-memory edges. See `build_edge_property_filter`.
2943        let graph_ctx = self.graph_ctx.clone();
2944        let edge_type_ids = self.edge_type_ids.clone();
2945        let direction = self.direction;
2946        let edge_property_conditions = self.edge_property_conditions.clone();
2947        let warm_fut: Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>> =
2948            Box::pin(async move {
2949                graph_ctx
2950                    .ensure_adjacency_warmed(&edge_type_ids, direction)
2951                    .await
2952                    .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2953                build_edge_property_filter(
2954                    &graph_ctx,
2955                    &edge_type_ids,
2956                    direction,
2957                    &edge_property_conditions,
2958                )
2959                .await
2960            });
2961
2962        Ok(Box::pin(GraphVariableLengthTraverseStream {
2963            input: input_stream,
2964            exec: Arc::new(self.clone_for_stream()),
2965            schema: self.schema.clone(),
2966            state: VarLengthStreamState::Warming(warm_fut),
2967            edge_property_filter: EidFilter::AllAllowed,
2968            metrics,
2969        }))
2970    }
2971
2972    fn metrics(&self) -> Option<MetricsSet> {
2973        Some(self.metrics.clone_inner())
2974    }
2975}
2976
2977impl GraphVariableLengthTraverseExec {
2978    /// Clone fields needed for stream (avoids cloning the full struct).
2979    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2980        GraphVariableLengthTraverseExecData {
2981            source_column: self.source_column.clone(),
2982            edge_type_ids: self.edge_type_ids.clone(),
2983            direction: self.direction,
2984            min_hops: self.min_hops,
2985            max_hops: self.max_hops,
2986            target_variable: self.target_variable.clone(),
2987            step_variable: self.step_variable.clone(),
2988            path_variable: self.path_variable.clone(),
2989            target_properties: self.target_properties.clone(),
2990            target_label_name: self.target_label_name.clone(),
2991            is_optional: self.is_optional,
2992            bound_target_column: self.bound_target_column.clone(),
2993            edge_lance_filter: self.edge_lance_filter.clone(),
2994            edge_property_conditions: self.edge_property_conditions.clone(),
2995            used_edge_columns: self.used_edge_columns.clone(),
2996            path_mode: self.path_mode.clone(),
2997            output_mode: self.output_mode.clone(),
2998            nfa: self.nfa.clone(),
2999            graph_ctx: self.graph_ctx.clone(),
3000        }
3001    }
3002}
3003
3004/// Data needed by the stream (without ExecutionPlan overhead).
3005#[expect(
3006    dead_code,
3007    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
3008)]
3009struct GraphVariableLengthTraverseExecData {
3010    source_column: String,
3011    edge_type_ids: Vec<u32>,
3012    direction: Direction,
3013    min_hops: usize,
3014    max_hops: usize,
3015    target_variable: String,
3016    step_variable: Option<String>,
3017    path_variable: Option<String>,
3018    target_properties: Vec<String>,
3019    target_label_name: Option<String>,
3020    is_optional: bool,
3021    bound_target_column: Option<String>,
3022    #[expect(dead_code, reason = "Used in Phase 3 warming")]
3023    edge_lance_filter: Option<String>,
3024    /// Simple property equality conditions for per-edge L0 checking during BFS.
3025    edge_property_conditions: Vec<(String, UniValue)>,
3026    used_edge_columns: Vec<String>,
3027    path_mode: super::nfa::PathMode,
3028    output_mode: super::nfa::VlpOutputMode,
3029    nfa: Arc<PathNfa>,
3030    graph_ctx: Arc<GraphExecutionContext>,
3031}
3032
3033/// Safety cap for frontier size to prevent OOM on pathological graphs.
3034const MAX_FRONTIER_SIZE: usize = 500_000;
3035/// Safety cap for predecessor pool size.
3036const MAX_PRED_POOL_SIZE: usize = 2_000_000;
3037
3038impl GraphVariableLengthTraverseExecData {
3039    /// Check if a vertex passes the target label filter.
3040    fn check_target_label(&self, vid: Vid) -> bool {
3041        if let Some(ref label_name) = self.target_label_name {
3042            let query_ctx = self.graph_ctx.query_context();
3043            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3044                Some(labels) => labels.contains(label_name),
3045                None => true, // not in L0, trust storage
3046            }
3047        } else {
3048            true
3049        }
3050    }
3051
3052    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
3053    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
3054        match constraint {
3055            super::nfa::VertexConstraint::Label(label_name) => {
3056                let query_ctx = self.graph_ctx.query_context();
3057                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3058                    Some(labels) => labels.contains(label_name),
3059                    None => true, // not in L0, trust storage
3060                }
3061            }
3062        }
3063    }
3064
3065    /// Expand neighbors from a vertex through all NFA transitions from the given state.
3066    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
3067    fn expand_neighbors(
3068        &self,
3069        vid: Vid,
3070        state: NfaStateId,
3071        eid_filter: &EidFilter,
3072        used_eids: &FxHashSet<u64>,
3073    ) -> Vec<(Vid, Eid, NfaStateId)> {
3074        let is_undirected = matches!(self.direction, Direction::Both);
3075        let mut results = Vec::new();
3076
3077        for transition in self.nfa.transitions_from(state) {
3078            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
3079
3080            for &etype in &transition.edge_type_ids {
3081                for (neighbor, eid) in
3082                    self.graph_ctx
3083                        .get_neighbors(vid, etype, transition.direction)
3084                {
3085                    // Deduplicate edges for undirected patterns
3086                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
3087                        continue;
3088                    }
3089
3090                    // Edge-property predicate gate. When the pattern carries
3091                    // inline `{prop: value}` conditions, `eid_filter` is the
3092                    // allow-set of edges (flushed *and* L0) that satisfy them,
3093                    // built during warming by `build_edge_property_filter`. When
3094                    // there are no conditions it is `AllAllowed`. This replaces
3095                    // the old bifurcated check that filtered only L0 edges and
3096                    // let every flushed CSR/Lance edge through (H4).
3097                    if !eid_filter.contains(eid) {
3098                        continue;
3099                    }
3100
3101                    // Check cross-pattern relationship uniqueness
3102                    if used_eids.contains(&eid.as_u64()) {
3103                        continue;
3104                    }
3105
3106                    // Check NFA state constraint on the destination state (QPP label filters)
3107                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
3108                        && !self.check_state_constraint(neighbor, constraint)
3109                    {
3110                        continue;
3111                    }
3112
3113                    results.push((neighbor, eid, transition.to));
3114                }
3115            }
3116        }
3117
3118        results
3119    }
3120
3121    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
3122    ///
3123    /// Returns BFS results in the same format as the old bfs() for compatibility
3124    /// with build_output_batch.
3125    fn bfs_with_dag(
3126        &self,
3127        source: Vid,
3128        eid_filter: &EidFilter,
3129        used_eids: &FxHashSet<u64>,
3130        vid_filter: &VidFilter,
3131    ) -> Vec<BfsResult> {
3132        let nfa = &self.nfa;
3133        let selector = PathSelector::All;
3134        let mut dag = PredecessorDag::new(selector);
3135        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3136
3137        // Handle zero-length paths (min_hops == 0)
3138        if nfa.is_accepting(nfa.start_state())
3139            && self.check_target_label(source)
3140            && vid_filter.contains(source)
3141        {
3142            accepting.push((source, nfa.start_state(), 0));
3143        }
3144
3145        // Per-depth frontier BFS
3146        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3147        let mut depth: u32 = 0;
3148
3149        while !frontier.is_empty() && depth < self.max_hops as u32 {
3150            depth += 1;
3151            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3152            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3153
3154            for &(vid, state) in &frontier {
3155                for (neighbor, eid, dst_state) in
3156                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3157                {
3158                    // Record in predecessor DAG
3159                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3160
3161                    // Add to next frontier (deduplicated per depth)
3162                    if seen_at_depth.insert((neighbor, dst_state)) {
3163                        next_frontier.push((neighbor, dst_state));
3164
3165                        // Check if accepting
3166                        if nfa.is_accepting(dst_state)
3167                            && self.check_target_label(neighbor)
3168                            && vid_filter.contains(neighbor)
3169                        {
3170                            accepting.push((neighbor, dst_state, depth));
3171                        }
3172                    }
3173                }
3174            }
3175
3176            // Safety cap
3177            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3178                break;
3179            }
3180
3181            frontier = next_frontier;
3182        }
3183
3184        // Enumerate paths from DAG to produce BfsResult tuples
3185        let mut results: Vec<BfsResult> = Vec::new();
3186        for &(target, state, depth) in &accepting {
3187            dag.enumerate_paths(
3188                source,
3189                target,
3190                state,
3191                depth,
3192                depth,
3193                &self.path_mode,
3194                &mut |nodes, edges| {
3195                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3196                    std::ops::ControlFlow::Continue(())
3197                },
3198            );
3199        }
3200
3201        results
3202    }
3203
3204    /// NFA-driven BFS returning only endpoints and depths (Mode A).
3205    ///
3206    /// More efficient when no path/step variable is bound — skips full path enumeration.
3207    /// Uses lightweight trail verification via has_trail_valid_path().
3208    fn bfs_endpoints_only(
3209        &self,
3210        source: Vid,
3211        eid_filter: &EidFilter,
3212        used_eids: &FxHashSet<u64>,
3213        vid_filter: &VidFilter,
3214    ) -> Vec<(Vid, u32)> {
3215        let nfa = &self.nfa;
3216        let selector = PathSelector::Any; // Only need existence, not all paths
3217        let mut dag = PredecessorDag::new(selector);
3218        let mut results: Vec<(Vid, u32)> = Vec::new();
3219
3220        // Handle zero-length paths
3221        if nfa.is_accepting(nfa.start_state())
3222            && self.check_target_label(source)
3223            && vid_filter.contains(source)
3224        {
3225            results.push((source, 0));
3226        }
3227
3228        // Per-depth frontier BFS
3229        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3230        let mut depth: u32 = 0;
3231
3232        while !frontier.is_empty() && depth < self.max_hops as u32 {
3233            depth += 1;
3234            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3235            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3236
3237            for &(vid, state) in &frontier {
3238                for (neighbor, eid, dst_state) in
3239                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3240                {
3241                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3242
3243                    if seen_at_depth.insert((neighbor, dst_state)) {
3244                        next_frontier.push((neighbor, dst_state));
3245
3246                        // Check if accepting with trail verification
3247                        if nfa.is_accepting(dst_state)
3248                            && self.check_target_label(neighbor)
3249                            && vid_filter.contains(neighbor)
3250                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3251                        {
3252                            results.push((neighbor, depth));
3253                        }
3254                    }
3255                }
3256            }
3257
3258            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3259                break;
3260            }
3261
3262            frontier = next_frontier;
3263        }
3264
3265        results
3266    }
3267}
3268
3269/// State machine for variable-length traverse stream.
3270enum VarLengthStreamState {
3271    /// Warming adjacency CSRs before first batch. Resolves to the prebuilt
3272    /// edge-property `EidFilter` (the allow-set of edges — flushed *and* L0 —
3273    /// that satisfy the inline `{prop: value}` conditions), or
3274    /// `EidFilter::AllAllowed` when there are no conditions.
3275    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>>),
3276    /// Processing input batches.
3277    Reading,
3278    /// Materializing target vertex properties asynchronously.
3279    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3280    /// Stream is done.
3281    Done,
3282}
3283
3284/// Stream for variable-length traversal.
3285struct GraphVariableLengthTraverseStream {
3286    input: SendableRecordBatchStream,
3287    exec: Arc<GraphVariableLengthTraverseExecData>,
3288    schema: SchemaRef,
3289    state: VarLengthStreamState,
3290    /// Edge-property allow-set built during warming (see [`VarLengthStreamState::Warming`]).
3291    /// `AllAllowed` until warming completes (or when there are no edge-property
3292    /// conditions).
3293    edge_property_filter: EidFilter,
3294    metrics: BaselineMetrics,
3295}
3296
3297impl Stream for GraphVariableLengthTraverseStream {
3298    type Item = DFResult<RecordBatch>;
3299
3300    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3301        let metrics = self.metrics.clone();
3302        let _timer = metrics.elapsed_compute().timer();
3303        loop {
3304            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3305
3306            match state {
3307                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3308                    Poll::Ready(Ok(filter)) => {
3309                        self.edge_property_filter = filter;
3310                        self.state = VarLengthStreamState::Reading;
3311                        // Continue loop to start reading
3312                    }
3313                    Poll::Ready(Err(e)) => {
3314                        self.state = VarLengthStreamState::Done;
3315                        return Poll::Ready(Some(Err(e)));
3316                    }
3317                    Poll::Pending => {
3318                        self.state = VarLengthStreamState::Warming(fut);
3319                        return Poll::Pending;
3320                    }
3321                },
3322                VarLengthStreamState::Reading => {
3323                    // Check timeout
3324                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3325                        return Poll::Ready(Some(Err(
3326                            datafusion::error::DataFusionError::Execution(e.to_string()),
3327                        )));
3328                    }
3329
3330                    match self.input.poll_next_unpin(cx) {
3331                        Poll::Ready(Some(Ok(batch))) => {
3332                            // Build base batch synchronously (BFS + expand).
3333                            // `edge_property_filter` was built during warming and
3334                            // gates flushed edges by their properties; VidFilter
3335                            // is still unconstrained (TODO: source pre-scan).
3336                            let vid_filter = VidFilter::AllAllowed;
3337                            let base_result = self.process_batch_base(
3338                                batch,
3339                                &self.edge_property_filter,
3340                                &vid_filter,
3341                            );
3342                            let base_batch = match base_result {
3343                                Ok(b) => b,
3344                                Err(e) => {
3345                                    self.state = VarLengthStreamState::Reading;
3346                                    return Poll::Ready(Some(Err(e)));
3347                                }
3348                            };
3349
3350                            // If no properties need async hydration, return directly
3351                            if self.exec.target_properties.is_empty() {
3352                                self.state = VarLengthStreamState::Reading;
3353                                return Poll::Ready(Some(Ok(base_batch)));
3354                            }
3355
3356                            // Properties needed — create async future for hydration
3357                            let schema = self.schema.clone();
3358                            let target_variable = self.exec.target_variable.clone();
3359                            let target_properties = self.exec.target_properties.clone();
3360                            let target_label_name = self.exec.target_label_name.clone();
3361                            let graph_ctx = self.exec.graph_ctx.clone();
3362
3363                            let fut = hydrate_vlp_target_properties(
3364                                base_batch,
3365                                schema,
3366                                target_variable,
3367                                target_properties,
3368                                target_label_name,
3369                                graph_ctx,
3370                            );
3371
3372                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3373                            // Continue loop to poll the future
3374                        }
3375                        Poll::Ready(Some(Err(e))) => {
3376                            self.state = VarLengthStreamState::Done;
3377                            return Poll::Ready(Some(Err(e)));
3378                        }
3379                        Poll::Ready(None) => {
3380                            self.state = VarLengthStreamState::Done;
3381                            return Poll::Ready(None);
3382                        }
3383                        Poll::Pending => {
3384                            self.state = VarLengthStreamState::Reading;
3385                            return Poll::Pending;
3386                        }
3387                    }
3388                }
3389                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3390                    Poll::Ready(Ok(batch)) => {
3391                        self.state = VarLengthStreamState::Reading;
3392                        self.metrics.record_output(batch.num_rows());
3393                        return Poll::Ready(Some(Ok(batch)));
3394                    }
3395                    Poll::Ready(Err(e)) => {
3396                        self.state = VarLengthStreamState::Done;
3397                        return Poll::Ready(Some(Err(e)));
3398                    }
3399                    Poll::Pending => {
3400                        self.state = VarLengthStreamState::Materializing(fut);
3401                        return Poll::Pending;
3402                    }
3403                },
3404                VarLengthStreamState::Done => {
3405                    return Poll::Ready(None);
3406                }
3407            }
3408        }
3409    }
3410}
3411
3412impl GraphVariableLengthTraverseStream {
3413    fn process_batch_base(
3414        &self,
3415        batch: RecordBatch,
3416        eid_filter: &EidFilter,
3417        vid_filter: &VidFilter,
3418    ) -> DFResult<RecordBatch> {
3419        let source_col = batch
3420            .column_by_name(&self.exec.source_column)
3421            .ok_or_else(|| {
3422                datafusion::error::DataFusionError::Execution(format!(
3423                    "Source column '{}' not found",
3424                    self.exec.source_column
3425                ))
3426            })?;
3427
3428        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3429        let source_vids: &UInt64Array = &source_vid_cow;
3430
3431        // Read bound target VIDs if column exists
3432        let bound_target_cow = self
3433            .exec
3434            .bound_target_column
3435            .as_ref()
3436            .and_then(|col| batch.column_by_name(col))
3437            .map(|c| column_as_vid_array(c.as_ref()))
3438            .transpose()?;
3439        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3440
3441        // Extract used edge columns for cross-pattern relationship uniqueness
3442        let used_edge_arrays: Vec<&UInt64Array> = self
3443            .exec
3444            .used_edge_columns
3445            .iter()
3446            .filter_map(|col| {
3447                batch
3448                    .column_by_name(col)?
3449                    .as_any()
3450                    .downcast_ref::<UInt64Array>()
3451            })
3452            .collect();
3453
3454        // Collect all BFS results
3455        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3456
3457        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3458            let mut emitted_for_row = false;
3459
3460            if let Some(src) = source_vid {
3461                let vid = Vid::from(src);
3462
3463                // Collect used edge IDs from previous hops for this row
3464                let used_eids: FxHashSet<u64> = used_edge_arrays
3465                    .iter()
3466                    .filter_map(|arr| {
3467                        if arr.is_null(row_idx) {
3468                            None
3469                        } else {
3470                            Some(arr.value(row_idx))
3471                        }
3472                    })
3473                    .collect();
3474
3475                // Dispatch to appropriate BFS mode based on output_mode
3476                match &self.exec.output_mode {
3477                    VlpOutputMode::EndpointsOnly => {
3478                        let endpoints = self
3479                            .exec
3480                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3481                        for (target, depth) in endpoints {
3482                            // Filter by bound target VID
3483                            if let Some(targets) = expected_targets {
3484                                if targets.is_null(row_idx) {
3485                                    continue;
3486                                }
3487                                if target.as_u64() != targets.value(row_idx) {
3488                                    continue;
3489                                }
3490                            }
3491                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3492                            emitted_for_row = true;
3493                        }
3494                    }
3495                    _ => {
3496                        // FullPath, StepVariable, CountOnly, etc.
3497                        let bfs_results = self
3498                            .exec
3499                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3500                        for (target, hop_count, node_path, edge_path) in bfs_results {
3501                            // Filter by bound target VID
3502                            if let Some(targets) = expected_targets {
3503                                if targets.is_null(row_idx) {
3504                                    continue;
3505                                }
3506                                if target.as_u64() != targets.value(row_idx) {
3507                                    continue;
3508                                }
3509                            }
3510                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3511                            emitted_for_row = true;
3512                        }
3513                    }
3514                }
3515            }
3516
3517            if self.exec.is_optional && !emitted_for_row {
3518                // Preserve the source row with NULL optional bindings.
3519                // We use empty node/edge paths to mark unmatched rows.
3520                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3521            }
3522        }
3523
3524        self.build_output_batch(&batch, &expansions)
3525    }
3526
3527    fn build_output_batch(
3528        &self,
3529        input: &RecordBatch,
3530        expansions: &[VarLengthExpansion],
3531    ) -> DFResult<RecordBatch> {
3532        if expansions.is_empty() {
3533            return Ok(RecordBatch::new_empty(self.schema.clone()));
3534        }
3535
3536        let num_rows = expansions.len();
3537
3538        // Build index array
3539        let indices: Vec<u64> = expansions
3540            .iter()
3541            .map(|(idx, _, _, _, _)| *idx as u64)
3542            .collect();
3543        let indices_array = UInt64Array::from(indices);
3544
3545        // Expand input columns
3546        let mut columns: Vec<ArrayRef> = Vec::new();
3547        for col in input.columns() {
3548            let expanded = take(col.as_ref(), &indices_array, None)?;
3549            columns.push(expanded);
3550        }
3551
3552        // Collect target VIDs and unmatched markers for use in multiple places.
3553        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3554        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3555        let unmatched_rows: Vec<bool> = expansions
3556            .iter()
3557            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3558            .collect();
3559        let target_vids: Vec<Option<u64>> = expansions
3560            .iter()
3561            .zip(unmatched_rows.iter())
3562            .map(
3563                |((_, vid, _, _, _), unmatched)| {
3564                    if *unmatched { None } else { Some(vid.as_u64()) }
3565                },
3566            )
3567            .collect();
3568
3569        // Add target VID column (only if not already in input)
3570        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3571        if input.schema().column_with_name(&target_vid_name).is_none() {
3572            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3573        }
3574
3575        // Add target ._labels column (only if not already in input)
3576        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3577        if input
3578            .schema()
3579            .column_with_name(&target_labels_name)
3580            .is_none()
3581        {
3582            use arrow_array::builder::{ListBuilder, StringBuilder};
3583            let query_ctx = self.exec.graph_ctx.query_context();
3584            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3585            for target_vid in &target_vids {
3586                let Some(vid_u64) = target_vid else {
3587                    labels_builder.append(false);
3588                    continue;
3589                };
3590                let vid = Vid::from(*vid_u64);
3591                let row_labels: Vec<String> =
3592                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3593                        Some(labels) => {
3594                            // Vertex is in L0 — use actual labels only
3595                            labels
3596                        }
3597                        None => {
3598                            // Vertex not in L0 — trust schema label (storage already filtered)
3599                            if let Some(ref label_name) = self.exec.target_label_name {
3600                                vec![label_name.clone()]
3601                            } else {
3602                                vec![]
3603                            }
3604                        }
3605                    };
3606                let values = labels_builder.values();
3607                for lbl in &row_labels {
3608                    values.append_value(lbl);
3609                }
3610                labels_builder.append(true);
3611            }
3612            columns.push(Arc::new(labels_builder.finish()));
3613        }
3614
3615        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3616        for prop_name in &self.exec.target_properties {
3617            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3618            if input.schema().column_with_name(&full_prop_name).is_none() {
3619                let col_idx = columns.len();
3620                if col_idx < self.schema.fields().len() {
3621                    let field = self.schema.field(col_idx);
3622                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3623                }
3624            }
3625        }
3626
3627        // Add hop count column
3628        let hop_counts: Vec<u64> = expansions
3629            .iter()
3630            .map(|(_, _, hops, _, _)| *hops as u64)
3631            .collect();
3632        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3633
3634        // Add step variable (edge list) column if bound
3635        if self.exec.step_variable.is_some() {
3636            let mut edges_builder = new_edge_list_builder();
3637            let query_ctx = self.exec.graph_ctx.query_context();
3638
3639            for (_, _, _, node_path, edge_path) in expansions {
3640                if node_path.is_empty() && edge_path.is_empty() {
3641                    // Null row for OPTIONAL MATCH unmatched
3642                    edges_builder.append_null();
3643                } else if edge_path.is_empty() {
3644                    // Zero-hop match: empty list
3645                    edges_builder.append(true);
3646                } else {
3647                    for (i, eid) in edge_path.iter().enumerate() {
3648                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3649                            .unwrap_or_else(|| "UNKNOWN".to_string());
3650                        // Report the relationship's STORED direction, not the
3651                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
3652                        // Resolves flushed (L1-resident) edges too, where the L0
3653                        // chain no longer holds the stored endpoints.
3654                        let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3655                            *eid,
3656                            node_path[i],
3657                            node_path[i + 1],
3658                            &self.exec.edge_type_ids,
3659                        );
3660                        append_edge_to_struct(
3661                            edges_builder.values(),
3662                            *eid,
3663                            &type_name,
3664                            src,
3665                            dst,
3666                            &query_ctx,
3667                        );
3668                    }
3669                    edges_builder.append(true);
3670                }
3671            }
3672
3673            columns.push(Arc::new(edges_builder.finish()));
3674        }
3675
3676        // Add path variable column if bound.
3677        // For named paths, we output a Path struct with nodes and relationships arrays.
3678        // If a path column already exists in input (from a prior BindFixedPath), extend it
3679        // rather than building from scratch.
3680        if let Some(path_var_name) = &self.exec.path_variable {
3681            let existing_path_col_idx = input
3682                .schema()
3683                .column_with_name(path_var_name)
3684                .map(|(idx, _)| idx);
3685            // Clone the Arc so we can read existing path without borrowing `columns`
3686            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3687            let existing_path = existing_path_arc
3688                .as_ref()
3689                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3690
3691            let mut nodes_builder = new_node_list_builder();
3692            let mut rels_builder = new_edge_list_builder();
3693            let query_ctx = self.exec.graph_ctx.query_context();
3694            let mut path_validity = Vec::with_capacity(expansions.len());
3695
3696            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3697                if node_path.is_empty() && edge_path.is_empty() {
3698                    nodes_builder.append(false);
3699                    rels_builder.append(false);
3700                    path_validity.push(false);
3701                    continue;
3702                }
3703
3704                // Prepend existing path prefix if extending
3705                let skip_first_vlp_node = if let Some(existing) = existing_path {
3706                    if !existing.is_null(row_out_idx) {
3707                        prepend_existing_path(
3708                            existing,
3709                            row_out_idx,
3710                            &mut nodes_builder,
3711                            &mut rels_builder,
3712                            &query_ctx,
3713                        );
3714                        true
3715                    } else {
3716                        false
3717                    }
3718                } else {
3719                    false
3720                };
3721
3722                // Append VLP nodes (skip first if extending — it's the junction point)
3723                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3724                for vid in &node_path[start_idx..] {
3725                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3726                }
3727                nodes_builder.append(true);
3728
3729                for (i, eid) in edge_path.iter().enumerate() {
3730                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3731                        .unwrap_or_else(|| "UNKNOWN".to_string());
3732                    // Report the relationship's STORED direction, not the traversal
3733                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
3734                    // (L1-resident) edges too, where the L0 chain no longer holds
3735                    // the stored endpoints.
3736                    let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3737                        *eid,
3738                        node_path[i],
3739                        node_path[i + 1],
3740                        &self.exec.edge_type_ids,
3741                    );
3742                    append_edge_to_struct(
3743                        rels_builder.values(),
3744                        *eid,
3745                        &type_name,
3746                        src,
3747                        dst,
3748                        &query_ctx,
3749                    );
3750                }
3751                rels_builder.append(true);
3752                path_validity.push(true);
3753            }
3754
3755            // Finish builders and get ListArrays
3756            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3757            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3758
3759            // Build the path struct fields
3760            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3761            let rels_field = Arc::new(Field::new(
3762                "relationships",
3763                rels_array.data_type().clone(),
3764                true,
3765            ));
3766
3767            // Create the path struct array
3768            let path_struct = arrow_array::StructArray::try_new(
3769                vec![nodes_field, rels_field].into(),
3770                vec![nodes_array, rels_array],
3771                Some(arrow::buffer::NullBuffer::from(path_validity)),
3772            )
3773            .map_err(arrow_err)?;
3774
3775            if let Some(idx) = existing_path_col_idx {
3776                columns[idx] = Arc::new(path_struct);
3777            } else {
3778                columns.push(Arc::new(path_struct));
3779            }
3780        }
3781
3782        self.metrics.record_output(num_rows);
3783
3784        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3785    }
3786}
3787
3788impl RecordBatchStream for GraphVariableLengthTraverseStream {
3789    fn schema(&self) -> SchemaRef {
3790        self.schema.clone()
3791    }
3792}
3793
3794/// Hydrate target vertex properties into a VLP batch.
3795///
3796/// The base batch already has null placeholder columns for target properties.
3797/// This function replaces them with actual property values fetched from storage.
3798async fn hydrate_vlp_target_properties(
3799    base_batch: RecordBatch,
3800    schema: SchemaRef,
3801    target_variable: String,
3802    target_properties: Vec<String>,
3803    target_label_name: Option<String>,
3804    graph_ctx: Arc<GraphExecutionContext>,
3805) -> DFResult<RecordBatch> {
3806    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3807        return Ok(base_batch);
3808    }
3809
3810    // Find the target VID column by exact name.
3811    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3812    //
3813    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3814    // clauses referencing the same variable), there may be duplicate column names. We need
3815    // the LAST occurrence of target._vid, which is the one added by the VLP.
3816    let target_vid_col_name = format!("{}._vid", target_variable);
3817    let vid_col_idx = schema
3818        .fields()
3819        .iter()
3820        .enumerate()
3821        .rev()
3822        .find(|(_, f)| f.name() == &target_vid_col_name)
3823        .map(|(i, _)| i);
3824
3825    let Some(vid_col_idx) = vid_col_idx else {
3826        return Ok(base_batch);
3827    };
3828
3829    let vid_col = base_batch.column(vid_col_idx);
3830    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3831    let target_vid_array: &UInt64Array = &target_vid_cow;
3832
3833    let target_vids: Vec<Vid> = target_vid_array
3834        .iter()
3835        // Preserve null rows by mapping them to a sentinel VID that never resolves
3836        // to stored properties. The output property columns remain NULL for these rows.
3837        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3838        .collect();
3839
3840    // Fetch properties from storage
3841    let mut property_columns: Vec<ArrayRef> = Vec::new();
3842
3843    if let Some(ref label_name) = target_label_name {
3844        let property_manager = graph_ctx.property_manager();
3845        let query_ctx = graph_ctx.query_context();
3846
3847        let props_map = property_manager
3848            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3849            .await
3850            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3851
3852        let uni_schema = graph_ctx.storage().schema_manager().schema();
3853        let label_props = uni_schema.properties.get(label_name.as_str());
3854
3855        for prop_name in &target_properties {
3856            let data_type = resolve_property_type(prop_name, label_props);
3857            let column =
3858                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3859            property_columns.push(column);
3860        }
3861    } else {
3862        // No label name — use label-agnostic property lookup.
3863        // This scans all label datasets, slower but correct for label-less traversals.
3864        let non_internal_props: Vec<&str> = target_properties
3865            .iter()
3866            .filter(|p| *p != "_all_props")
3867            .map(|s| s.as_str())
3868            .collect();
3869        let property_manager = graph_ctx.property_manager();
3870        let query_ctx = graph_ctx.query_context();
3871
3872        let props_map = if !non_internal_props.is_empty() {
3873            property_manager
3874                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3875                .await
3876                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3877        } else {
3878            std::collections::HashMap::new()
3879        };
3880
3881        for prop_name in &target_properties {
3882            if prop_name == "_all_props" {
3883                // Build CypherValue blob from all vertex properties (L0 + storage),
3884                // staying in `Value` space so typed values (temporals) are preserved.
3885                use arrow_array::builder::LargeBinaryBuilder;
3886
3887                let mut builder = LargeBinaryBuilder::new();
3888                let l0_ctx = graph_ctx.l0_context();
3889                for vid in &target_vids {
3890                    let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3891                    // Collect from storage-hydrated props
3892                    if let Some(vid_props) = props_map.get(vid) {
3893                        for (k, v) in vid_props.iter() {
3894                            merged_props.insert(k.clone(), v.clone());
3895                        }
3896                    }
3897                    // Overlay L0 properties
3898                    for l0 in l0_ctx.iter_l0_buffers() {
3899                        let guard = l0.read();
3900                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3901                            for (k, v) in l0_props.iter() {
3902                                merged_props.insert(k.clone(), v.clone());
3903                            }
3904                        }
3905                    }
3906                    if merged_props.is_empty() {
3907                        builder.append_null();
3908                    } else {
3909                        builder.append_value(uni_common::cypher_value_codec::encode(
3910                            &uni_common::Value::Map(merged_props),
3911                        ));
3912                    }
3913                }
3914                property_columns.push(Arc::new(builder.finish()));
3915            } else {
3916                let column = build_property_column_static(
3917                    &target_vids,
3918                    &props_map,
3919                    prop_name,
3920                    &arrow::datatypes::DataType::LargeBinary,
3921                )?;
3922                property_columns.push(column);
3923            }
3924        }
3925    }
3926
3927    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3928    // Find each property column by name — works regardless of column ordering
3929    // (schema-aware puts props before _hop_count; schemaless puts them after).
3930    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3931    // not pre-existing input columns with the same name (duplicate variable binding).
3932    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3933    let mut prop_idx = 0;
3934    for (col_idx, field) in schema.fields().iter().enumerate() {
3935        let is_target_prop = col_idx > vid_col_idx
3936            && target_properties
3937                .iter()
3938                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3939        if is_target_prop && prop_idx < property_columns.len() {
3940            new_columns.push(property_columns[prop_idx].clone());
3941            prop_idx += 1;
3942        } else {
3943            new_columns.push(base_batch.column(col_idx).clone());
3944        }
3945    }
3946
3947    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3948}
3949
3950// ============================================================================
3951// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3952// ============================================================================
3953
3954/// Execution plan for variable-length path traversal on schemaless edge types.
3955///
3956/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3957/// that don't have schema-defined IDs. It queries the main edges table by type name.
3958/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3959pub struct GraphVariableLengthTraverseMainExec {
3960    /// Input execution plan.
3961    input: Arc<dyn ExecutionPlan>,
3962
3963    /// Column name containing source VIDs.
3964    source_column: String,
3965
3966    /// Edge type names (not IDs, since schemaless types may not have IDs).
3967    type_names: Vec<String>,
3968
3969    /// Traversal direction.
3970    direction: Direction,
3971
3972    /// Minimum number of hops.
3973    min_hops: usize,
3974
3975    /// Maximum number of hops.
3976    max_hops: usize,
3977
3978    /// Variable name for target vertex columns.
3979    target_variable: String,
3980
3981    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3982    step_variable: Option<String>,
3983
3984    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3985    path_variable: Option<String>,
3986
3987    /// Target vertex properties to materialize.
3988    target_properties: Vec<String>,
3989
3990    /// Whether this is an optional match (LEFT JOIN semantics).
3991    is_optional: bool,
3992
3993    /// Column name of an already-bound target VID (for patterns where target is in scope).
3994    bound_target_column: Option<String>,
3995
3996    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3997    edge_lance_filter: Option<String>,
3998
3999    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
4000    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
4001    edge_property_conditions: Vec<(String, UniValue)>,
4002
4003    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
4004    used_edge_columns: Vec<String>,
4005
4006    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
4007    path_mode: super::nfa::PathMode,
4008
4009    /// Output mode determining BFS strategy.
4010    output_mode: super::nfa::VlpOutputMode,
4011
4012    /// Graph execution context.
4013    graph_ctx: Arc<GraphExecutionContext>,
4014
4015    /// Output schema.
4016    schema: SchemaRef,
4017
4018    /// Cached plan properties.
4019    properties: Arc<PlanProperties>,
4020
4021    /// Execution metrics.
4022    metrics: ExecutionPlanMetricsSet,
4023}
4024
4025impl fmt::Debug for GraphVariableLengthTraverseMainExec {
4026    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4027        f.debug_struct("GraphVariableLengthTraverseMainExec")
4028            .field("source_column", &self.source_column)
4029            .field("type_names", &self.type_names)
4030            .field("direction", &self.direction)
4031            .field("min_hops", &self.min_hops)
4032            .field("max_hops", &self.max_hops)
4033            .field("target_variable", &self.target_variable)
4034            .finish()
4035    }
4036}
4037
4038impl GraphVariableLengthTraverseMainExec {
4039    /// Create a new variable-length traversal plan for schemaless edges.
4040    #[expect(clippy::too_many_arguments)]
4041    pub fn new(
4042        input: Arc<dyn ExecutionPlan>,
4043        source_column: impl Into<String>,
4044        type_names: Vec<String>,
4045        direction: Direction,
4046        min_hops: usize,
4047        max_hops: usize,
4048        target_variable: impl Into<String>,
4049        step_variable: Option<String>,
4050        path_variable: Option<String>,
4051        target_properties: Vec<String>,
4052        graph_ctx: Arc<GraphExecutionContext>,
4053        is_optional: bool,
4054        bound_target_column: Option<String>,
4055        edge_lance_filter: Option<String>,
4056        edge_property_conditions: Vec<(String, UniValue)>,
4057        used_edge_columns: Vec<String>,
4058        path_mode: super::nfa::PathMode,
4059        output_mode: super::nfa::VlpOutputMode,
4060    ) -> Self {
4061        let source_column = source_column.into();
4062        let target_variable = target_variable.into();
4063
4064        // Build output schema
4065        let schema = Self::build_schema(
4066            input.schema(),
4067            &target_variable,
4068            step_variable.as_deref(),
4069            path_variable.as_deref(),
4070            &target_properties,
4071        );
4072        let properties = compute_plan_properties(schema.clone());
4073
4074        Self {
4075            input,
4076            source_column,
4077            type_names,
4078            direction,
4079            min_hops,
4080            max_hops,
4081            target_variable,
4082            step_variable,
4083            path_variable,
4084            target_properties,
4085            is_optional,
4086            bound_target_column,
4087            edge_lance_filter,
4088            edge_property_conditions,
4089            used_edge_columns,
4090            path_mode,
4091            output_mode,
4092            graph_ctx,
4093            schema,
4094            properties,
4095            metrics: ExecutionPlanMetricsSet::new(),
4096        }
4097    }
4098
4099    /// Build output schema.
4100    fn build_schema(
4101        input_schema: SchemaRef,
4102        target_variable: &str,
4103        step_variable: Option<&str>,
4104        path_variable: Option<&str>,
4105        target_properties: &[String],
4106    ) -> SchemaRef {
4107        let mut fields: Vec<Field> = input_schema
4108            .fields()
4109            .iter()
4110            .map(|f| f.as_ref().clone())
4111            .collect();
4112
4113        // Add target VID column (only if not already in input)
4114        let target_vid_name = format!("{}._vid", target_variable);
4115        if input_schema.column_with_name(&target_vid_name).is_none() {
4116            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4117        }
4118
4119        // Add target ._labels column (only if not already in input)
4120        let target_labels_name = format!("{}._labels", target_variable);
4121        if input_schema.column_with_name(&target_labels_name).is_none() {
4122            fields.push(Field::new(target_labels_name, labels_data_type(), true));
4123        }
4124
4125        // Add hop count
4126        fields.push(Field::new("_hop_count", DataType::UInt64, false));
4127
4128        // Add step variable column (list of edge structs) if bound
4129        // This is the relationship variable like `r` in `[r*1..3]`
4130        if let Some(step_var) = step_variable {
4131            fields.push(build_edge_list_field(step_var));
4132        }
4133
4134        // Add path struct if bound (only if not already in input from prior BindFixedPath)
4135        if let Some(path_var) = path_variable
4136            && input_schema.column_with_name(path_var).is_none()
4137        {
4138            fields.push(build_path_struct_field(path_var));
4139        }
4140
4141        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
4142        // Skip properties that are already in the input schema
4143        for prop in target_properties {
4144            let prop_name = format!("{}.{}", target_variable, prop);
4145            if input_schema.column_with_name(&prop_name).is_none() {
4146                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4147            }
4148        }
4149
4150        Arc::new(Schema::new(fields))
4151    }
4152}
4153
4154impl DisplayAs for GraphVariableLengthTraverseMainExec {
4155    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4156        write!(
4157            f,
4158            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4159            self.source_column, self.type_names, self.min_hops, self.max_hops
4160        )
4161    }
4162}
4163
4164impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4165    fn name(&self) -> &str {
4166        "GraphVariableLengthTraverseMainExec"
4167    }
4168
4169    fn as_any(&self) -> &dyn Any {
4170        self
4171    }
4172
4173    fn schema(&self) -> SchemaRef {
4174        self.schema.clone()
4175    }
4176
4177    fn properties(&self) -> &Arc<PlanProperties> {
4178        &self.properties
4179    }
4180
4181    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4182        vec![&self.input]
4183    }
4184
4185    fn with_new_children(
4186        self: Arc<Self>,
4187        children: Vec<Arc<dyn ExecutionPlan>>,
4188    ) -> DFResult<Arc<dyn ExecutionPlan>> {
4189        if children.len() != 1 {
4190            return Err(datafusion::error::DataFusionError::Plan(
4191                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4192            ));
4193        }
4194
4195        Ok(Arc::new(Self::new(
4196            children[0].clone(),
4197            self.source_column.clone(),
4198            self.type_names.clone(),
4199            self.direction,
4200            self.min_hops,
4201            self.max_hops,
4202            self.target_variable.clone(),
4203            self.step_variable.clone(),
4204            self.path_variable.clone(),
4205            self.target_properties.clone(),
4206            self.graph_ctx.clone(),
4207            self.is_optional,
4208            self.bound_target_column.clone(),
4209            self.edge_lance_filter.clone(),
4210            self.edge_property_conditions.clone(),
4211            self.used_edge_columns.clone(),
4212            self.path_mode.clone(),
4213            self.output_mode.clone(),
4214        )))
4215    }
4216
4217    fn execute(
4218        &self,
4219        partition: usize,
4220        context: Arc<TaskContext>,
4221    ) -> DFResult<SendableRecordBatchStream> {
4222        let input_stream = self.input.execute(partition, context)?;
4223        let metrics = BaselineMetrics::new(&self.metrics, partition);
4224
4225        // Build adjacency map from main edges table (async)
4226        let graph_ctx = self.graph_ctx.clone();
4227        let type_names = self.type_names.clone();
4228        let direction = self.direction;
4229        let load_fut = async move {
4230            // VLP frontier is unknown ahead of time: no source pushdown (whole-type).
4231            build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4232        };
4233
4234        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4235            input: input_stream,
4236            source_column: self.source_column.clone(),
4237            type_names: self.type_names.clone(),
4238            direction: self.direction,
4239            min_hops: self.min_hops,
4240            max_hops: self.max_hops,
4241            target_variable: self.target_variable.clone(),
4242            step_variable: self.step_variable.clone(),
4243            path_variable: self.path_variable.clone(),
4244            target_properties: self.target_properties.clone(),
4245            graph_ctx: self.graph_ctx.clone(),
4246            is_optional: self.is_optional,
4247            bound_target_column: self.bound_target_column.clone(),
4248            edge_lance_filter: self.edge_lance_filter.clone(),
4249            edge_property_conditions: self.edge_property_conditions.clone(),
4250            used_edge_columns: self.used_edge_columns.clone(),
4251            path_mode: self.path_mode.clone(),
4252            output_mode: self.output_mode.clone(),
4253            schema: self.schema.clone(),
4254            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4255            metrics,
4256        }))
4257    }
4258
4259    fn metrics(&self) -> Option<MetricsSet> {
4260        Some(self.metrics.clone_inner())
4261    }
4262}
4263
4264/// State machine for VLP schemaless stream.
4265enum VarLengthMainStreamState {
4266    /// Loading adjacency map from main edges table.
4267    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4268    /// Processing input batches with loaded adjacency.
4269    Processing(EdgeAdjacencyMap),
4270    /// Materializing properties for a batch.
4271    Materializing {
4272        adjacency: EdgeAdjacencyMap,
4273        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4274    },
4275    /// Stream is done.
4276    Done,
4277}
4278
4279/// Stream for variable-length traversal on schemaless edges.
4280#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4281struct GraphVariableLengthTraverseMainStream {
4282    input: SendableRecordBatchStream,
4283    source_column: String,
4284    type_names: Vec<String>,
4285    direction: Direction,
4286    min_hops: usize,
4287    max_hops: usize,
4288    target_variable: String,
4289    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4290    step_variable: Option<String>,
4291    path_variable: Option<String>,
4292    target_properties: Vec<String>,
4293    graph_ctx: Arc<GraphExecutionContext>,
4294    is_optional: bool,
4295    bound_target_column: Option<String>,
4296    edge_lance_filter: Option<String>,
4297    /// Edge property conditions to check during BFS.
4298    edge_property_conditions: Vec<(String, UniValue)>,
4299    used_edge_columns: Vec<String>,
4300    path_mode: super::nfa::PathMode,
4301    output_mode: super::nfa::VlpOutputMode,
4302    schema: SchemaRef,
4303    state: VarLengthMainStreamState,
4304    metrics: BaselineMetrics,
4305}
4306
4307/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4308type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4309
4310impl GraphVariableLengthTraverseMainStream {
4311    /// Perform BFS from a source vertex using the adjacency map.
4312    ///
4313    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4314    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4315    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4316    fn bfs(
4317        &self,
4318        source: Vid,
4319        adjacency: &EdgeAdjacencyMap,
4320        used_eids: &FxHashSet<u64>,
4321    ) -> Vec<MainBfsResult> {
4322        let mut results = Vec::new();
4323        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4324
4325        queue.push_back((source, 0, vec![source], vec![]));
4326
4327        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4328            // Emit result if within hop range (including zero-length patterns)
4329            if depth >= self.min_hops && depth <= self.max_hops {
4330                results.push((current, depth, node_path.clone(), edge_path.clone()));
4331            }
4332
4333            // Stop if at max depth
4334            if depth >= self.max_hops {
4335                continue;
4336            }
4337
4338            // Get neighbors from adjacency map
4339            if let Some(neighbors) = adjacency.get(&current) {
4340                let is_undirected = matches!(self.direction, Direction::Both);
4341                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4342
4343                for (neighbor, eid, _edge_type, props) in neighbors {
4344                    // Deduplicate edges for undirected patterns
4345                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4346                        continue;
4347                    }
4348
4349                    // Enforce relationship uniqueness per-path (Cypher semantics).
4350                    if edge_path.contains(eid) {
4351                        continue;
4352                    }
4353
4354                    // Enforce cross-pattern relationship uniqueness: skip edges
4355                    // already bound by earlier pattern elements in the same MATCH.
4356                    if used_eids.contains(&eid.as_u64()) {
4357                        continue;
4358                    }
4359
4360                    // Check edge property conditions (e.g., {year: 1988}).
4361                    if !self.edge_property_conditions.is_empty() {
4362                        let passes =
4363                            self.edge_property_conditions
4364                                .iter()
4365                                .all(|(name, expected)| {
4366                                    props.get(name).is_some_and(|actual| actual == expected)
4367                                });
4368                        if !passes {
4369                            continue;
4370                        }
4371                    }
4372
4373                    let mut new_node_path = node_path.clone();
4374                    new_node_path.push(*neighbor);
4375                    let mut new_edge_path = edge_path.clone();
4376                    new_edge_path.push(*eid);
4377                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4378                }
4379            }
4380        }
4381
4382        results
4383    }
4384
4385    /// Process a batch using the adjacency map.
4386    fn process_batch(
4387        &self,
4388        batch: RecordBatch,
4389        adjacency: &EdgeAdjacencyMap,
4390    ) -> DFResult<RecordBatch> {
4391        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4392            datafusion::error::DataFusionError::Execution(format!(
4393                "Source column '{}' not found in input batch",
4394                self.source_column
4395            ))
4396        })?;
4397
4398        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4399        let source_vids: &UInt64Array = &source_vid_cow;
4400
4401        // Read bound target VIDs if column exists
4402        let bound_target_cow = self
4403            .bound_target_column
4404            .as_ref()
4405            .and_then(|col| batch.column_by_name(col))
4406            .map(|c| column_as_vid_array(c.as_ref()))
4407            .transpose()?;
4408        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4409
4410        // Extract used edge columns for cross-pattern relationship uniqueness
4411        let used_edge_arrays: Vec<&UInt64Array> = self
4412            .used_edge_columns
4413            .iter()
4414            .filter_map(|col| {
4415                batch
4416                    .column_by_name(col)?
4417                    .as_any()
4418                    .downcast_ref::<UInt64Array>()
4419            })
4420            .collect();
4421
4422        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4423        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4424
4425        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4426            let mut emitted_for_row = false;
4427
4428            if let Some(source_u64) = source_opt {
4429                let source = Vid::from(source_u64);
4430
4431                // Collect used edge IDs from previous hops for this row
4432                let used_eids: FxHashSet<u64> = used_edge_arrays
4433                    .iter()
4434                    .filter_map(|arr| {
4435                        if arr.is_null(row_idx) {
4436                            None
4437                        } else {
4438                            Some(arr.value(row_idx))
4439                        }
4440                    })
4441                    .collect();
4442
4443                let bfs_results = self.bfs(source, adjacency, &used_eids);
4444
4445                for (target, hops, node_path, edge_path) in bfs_results {
4446                    // Filter by bound target VID if set (for patterns where target is in scope).
4447                    // NULL bound targets do not match anything.
4448                    if let Some(targets) = expected_targets {
4449                        if targets.is_null(row_idx) {
4450                            continue;
4451                        }
4452                        let expected_vid = targets.value(row_idx);
4453                        if target.as_u64() != expected_vid {
4454                            continue;
4455                        }
4456                    }
4457
4458                    expansions.push((row_idx, target, hops, node_path, edge_path));
4459                    emitted_for_row = true;
4460                }
4461            }
4462
4463            if self.is_optional && !emitted_for_row {
4464                // Preserve source row with NULL optional bindings.
4465                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4466            }
4467        }
4468
4469        if expansions.is_empty() {
4470            if self.is_optional {
4471                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4472                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4473            }
4474            return Ok(RecordBatch::new_empty(self.schema.clone()));
4475        }
4476
4477        let num_rows = expansions.len();
4478        self.metrics.record_output(num_rows);
4479
4480        // Build output columns
4481        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4482
4483        // Resolve this operator's edge type names to numeric ids once, so path
4484        // builders can recover the STORED orientation of flushed (L1-resident)
4485        // relationships via a directed-outgoing adjacency probe.
4486        let edge_type_ids: Vec<u32> = {
4487            let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4488            self.type_names
4489                .iter()
4490                .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4491                .collect()
4492        };
4493
4494        // Expand input columns
4495        for col_idx in 0..batch.num_columns() {
4496            let array = batch.column(col_idx);
4497            let indices: Vec<u64> = expansions
4498                .iter()
4499                .map(|(idx, _, _, _, _)| *idx as u64)
4500                .collect();
4501            let take_indices = UInt64Array::from(indices);
4502            let expanded = arrow::compute::take(array, &take_indices, None)?;
4503            columns.push(expanded);
4504        }
4505
4506        // Add target VID column (only if not already in input)
4507        let target_vid_name = format!("{}._vid", self.target_variable);
4508        if batch.schema().column_with_name(&target_vid_name).is_none() {
4509            let target_vids: Vec<Option<u64>> = expansions
4510                .iter()
4511                .map(|(_, vid, _, node_path, edge_path)| {
4512                    if node_path.is_empty() && edge_path.is_empty() {
4513                        None
4514                    } else {
4515                        Some(vid.as_u64())
4516                    }
4517                })
4518                .collect();
4519            columns.push(Arc::new(UInt64Array::from(target_vids)));
4520        }
4521
4522        // Add target ._labels column (only if not already in input)
4523        let target_labels_name = format!("{}._labels", self.target_variable);
4524        if batch
4525            .schema()
4526            .column_with_name(&target_labels_name)
4527            .is_none()
4528        {
4529            use arrow_array::builder::{ListBuilder, StringBuilder};
4530            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4531            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4532                if node_path.is_empty() && edge_path.is_empty() {
4533                    labels_builder.append(false);
4534                    continue;
4535                }
4536                let mut row_labels: Vec<String> = Vec::new();
4537                let labels =
4538                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4539                for lbl in &labels {
4540                    if !row_labels.contains(lbl) {
4541                        row_labels.push(lbl.clone());
4542                    }
4543                }
4544                let values = labels_builder.values();
4545                for lbl in &row_labels {
4546                    values.append_value(lbl);
4547                }
4548                labels_builder.append(true);
4549            }
4550            columns.push(Arc::new(labels_builder.finish()));
4551        }
4552
4553        // Add hop count column
4554        let hop_counts: Vec<u64> = expansions
4555            .iter()
4556            .map(|(_, _, hops, _, _)| *hops as u64)
4557            .collect();
4558        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4559
4560        // Add step variable column if bound (list of edge structs).
4561        if self.step_variable.is_some() {
4562            let mut edges_builder = new_edge_list_builder();
4563            let query_ctx = self.graph_ctx.query_context();
4564            let type_names_str = self.type_names.join("|");
4565
4566            for (_, _, _, node_path, edge_path) in expansions.iter() {
4567                if node_path.is_empty() && edge_path.is_empty() {
4568                    edges_builder.append_null();
4569                } else if edge_path.is_empty() {
4570                    // Zero-hop match: empty list.
4571                    edges_builder.append(true);
4572                } else {
4573                    for (i, eid) in edge_path.iter().enumerate() {
4574                        // Report the relationship's STORED direction, not the
4575                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
4576                        // Resolves flushed (L1-resident) edges too, where the L0
4577                        // chain no longer holds the stored endpoints.
4578                        let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4579                            *eid,
4580                            node_path[i],
4581                            node_path[i + 1],
4582                            &edge_type_ids,
4583                        );
4584                        append_edge_to_struct(
4585                            edges_builder.values(),
4586                            *eid,
4587                            &type_names_str,
4588                            src,
4589                            dst,
4590                            &query_ctx,
4591                        );
4592                    }
4593                    edges_builder.append(true);
4594                }
4595            }
4596
4597            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4598        }
4599
4600        // Add path variable column if bound.
4601        // If a path column already exists in input (from a prior BindFixedPath), extend it
4602        // rather than building from scratch.
4603        if let Some(path_var_name) = &self.path_variable {
4604            let existing_path_col_idx = batch
4605                .schema()
4606                .column_with_name(path_var_name)
4607                .map(|(idx, _)| idx);
4608            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4609            let existing_path = existing_path_arc
4610                .as_ref()
4611                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4612
4613            let mut nodes_builder = new_node_list_builder();
4614            let mut rels_builder = new_edge_list_builder();
4615            let query_ctx = self.graph_ctx.query_context();
4616            let type_names_str = self.type_names.join("|");
4617            let mut path_validity = Vec::with_capacity(expansions.len());
4618
4619            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4620                if node_path.is_empty() && edge_path.is_empty() {
4621                    nodes_builder.append(false);
4622                    rels_builder.append(false);
4623                    path_validity.push(false);
4624                    continue;
4625                }
4626
4627                // Prepend existing path prefix if extending
4628                let skip_first_vlp_node = if let Some(existing) = existing_path {
4629                    if !existing.is_null(row_out_idx) {
4630                        prepend_existing_path(
4631                            existing,
4632                            row_out_idx,
4633                            &mut nodes_builder,
4634                            &mut rels_builder,
4635                            &query_ctx,
4636                        );
4637                        true
4638                    } else {
4639                        false
4640                    }
4641                } else {
4642                    false
4643                };
4644
4645                // Append VLP nodes (skip first if extending — it's the junction point)
4646                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4647                for vid in &node_path[start_idx..] {
4648                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4649                }
4650                nodes_builder.append(true);
4651
4652                for (i, eid) in edge_path.iter().enumerate() {
4653                    // Report the relationship's STORED direction, not the traversal
4654                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
4655                    // (L1-resident) edges too, where the L0 chain no longer holds
4656                    // the stored endpoints.
4657                    let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4658                        *eid,
4659                        node_path[i],
4660                        node_path[i + 1],
4661                        &edge_type_ids,
4662                    );
4663                    append_edge_to_struct(
4664                        rels_builder.values(),
4665                        *eid,
4666                        &type_names_str,
4667                        src,
4668                        dst,
4669                        &query_ctx,
4670                    );
4671                }
4672                rels_builder.append(true);
4673                path_validity.push(true);
4674            }
4675
4676            // Finish the builders to get the arrays
4677            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4678            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4679
4680            // Build the path struct with nodes and relationships fields
4681            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4682            let rels_field = Arc::new(Field::new(
4683                "relationships",
4684                rels_array.data_type().clone(),
4685                true,
4686            ));
4687
4688            // Create the path struct array
4689            let path_struct = arrow_array::StructArray::try_new(
4690                vec![nodes_field, rels_field].into(),
4691                vec![nodes_array, rels_array],
4692                Some(arrow::buffer::NullBuffer::from(path_validity)),
4693            )
4694            .map_err(arrow_err)?;
4695
4696            if let Some(idx) = existing_path_col_idx {
4697                columns[idx] = Arc::new(path_struct);
4698            } else {
4699                columns.push(Arc::new(path_struct));
4700            }
4701        }
4702
4703        // Add target property columns as NULL for now (skip if already in input).
4704        // Property hydration happens via PropertyManager in the query execution pipeline.
4705        for prop_name in &self.target_properties {
4706            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4707            if batch.schema().column_with_name(&full_prop_name).is_none() {
4708                columns.push(arrow_array::new_null_array(
4709                    &DataType::LargeBinary,
4710                    num_rows,
4711                ));
4712            }
4713        }
4714
4715        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4716    }
4717}
4718
4719impl Stream for GraphVariableLengthTraverseMainStream {
4720    type Item = DFResult<RecordBatch>;
4721
4722    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4723        let metrics = self.metrics.clone();
4724        let _timer = metrics.elapsed_compute().timer();
4725        loop {
4726            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4727
4728            match state {
4729                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4730                    Poll::Ready(Ok(adjacency)) => {
4731                        self.state = VarLengthMainStreamState::Processing(adjacency);
4732                        // Continue loop to start processing
4733                    }
4734                    Poll::Ready(Err(e)) => {
4735                        self.state = VarLengthMainStreamState::Done;
4736                        return Poll::Ready(Some(Err(e)));
4737                    }
4738                    Poll::Pending => {
4739                        self.state = VarLengthMainStreamState::Loading(fut);
4740                        return Poll::Pending;
4741                    }
4742                },
4743                VarLengthMainStreamState::Processing(adjacency) => {
4744                    match self.input.poll_next_unpin(cx) {
4745                        Poll::Ready(Some(Ok(batch))) => {
4746                            let base_batch = match self.process_batch(batch, &adjacency) {
4747                                Ok(b) => b,
4748                                Err(e) => {
4749                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4750                                    return Poll::Ready(Some(Err(e)));
4751                                }
4752                            };
4753
4754                            // If no properties need async hydration, return directly
4755                            if self.target_properties.is_empty() {
4756                                self.state = VarLengthMainStreamState::Processing(adjacency);
4757                                return Poll::Ready(Some(Ok(base_batch)));
4758                            }
4759
4760                            // Create async hydration future
4761                            let schema = self.schema.clone();
4762                            let target_variable = self.target_variable.clone();
4763                            let target_properties = self.target_properties.clone();
4764                            let graph_ctx = self.graph_ctx.clone();
4765
4766                            let fut = hydrate_vlp_target_properties(
4767                                base_batch,
4768                                schema,
4769                                target_variable,
4770                                target_properties,
4771                                None, // schemaless — no label name
4772                                graph_ctx,
4773                            );
4774
4775                            self.state = VarLengthMainStreamState::Materializing {
4776                                adjacency,
4777                                fut: Box::pin(fut),
4778                            };
4779                            // Continue loop to poll the future
4780                        }
4781                        Poll::Ready(Some(Err(e))) => {
4782                            self.state = VarLengthMainStreamState::Done;
4783                            return Poll::Ready(Some(Err(e)));
4784                        }
4785                        Poll::Ready(None) => {
4786                            self.state = VarLengthMainStreamState::Done;
4787                            return Poll::Ready(None);
4788                        }
4789                        Poll::Pending => {
4790                            self.state = VarLengthMainStreamState::Processing(adjacency);
4791                            return Poll::Pending;
4792                        }
4793                    }
4794                }
4795                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4796                    match fut.as_mut().poll(cx) {
4797                        Poll::Ready(Ok(batch)) => {
4798                            self.state = VarLengthMainStreamState::Processing(adjacency);
4799                            return Poll::Ready(Some(Ok(batch)));
4800                        }
4801                        Poll::Ready(Err(e)) => {
4802                            self.state = VarLengthMainStreamState::Done;
4803                            return Poll::Ready(Some(Err(e)));
4804                        }
4805                        Poll::Pending => {
4806                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4807                            return Poll::Pending;
4808                        }
4809                    }
4810                }
4811                VarLengthMainStreamState::Done => {
4812                    return Poll::Ready(None);
4813                }
4814            }
4815        }
4816    }
4817}
4818
4819impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4820    fn schema(&self) -> SchemaRef {
4821        self.schema.clone()
4822    }
4823}
4824
4825#[cfg(test)]
4826mod tests {
4827    use super::*;
4828
4829    #[test]
4830    fn test_traverse_schema_without_edge() {
4831        let input_schema = Arc::new(Schema::new(vec![Field::new(
4832            "a._vid",
4833            DataType::UInt64,
4834            false,
4835        )]));
4836
4837        let output_schema =
4838            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4839
4840        // Schema: input + target VID + target _labels + internal edge ID
4841        assert_eq!(output_schema.fields().len(), 4);
4842        assert_eq!(output_schema.field(0).name(), "a._vid");
4843        assert_eq!(output_schema.field(1).name(), "m._vid");
4844        assert_eq!(output_schema.field(2).name(), "m._labels");
4845        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4846    }
4847
4848    #[test]
4849    fn test_traverse_schema_with_edge() {
4850        let input_schema = Arc::new(Schema::new(vec![Field::new(
4851            "a._vid",
4852            DataType::UInt64,
4853            false,
4854        )]));
4855
4856        let output_schema = GraphTraverseExec::build_schema(
4857            input_schema,
4858            "m",
4859            Some("r"),
4860            &[],
4861            &[],
4862            None,
4863            None,
4864            false,
4865        );
4866
4867        // Schema: input + target VID + target _labels + edge EID + edge _type
4868        assert_eq!(output_schema.fields().len(), 5);
4869        assert_eq!(output_schema.field(0).name(), "a._vid");
4870        assert_eq!(output_schema.field(1).name(), "m._vid");
4871        assert_eq!(output_schema.field(2).name(), "m._labels");
4872        assert_eq!(output_schema.field(3).name(), "r._eid");
4873        assert_eq!(output_schema.field(4).name(), "r._type");
4874    }
4875
4876    #[test]
4877    fn test_traverse_schema_with_target_properties() {
4878        let input_schema = Arc::new(Schema::new(vec![Field::new(
4879            "a._vid",
4880            DataType::UInt64,
4881            false,
4882        )]));
4883
4884        let target_props = vec!["name".to_string(), "age".to_string()];
4885        let output_schema = GraphTraverseExec::build_schema(
4886            input_schema,
4887            "m",
4888            Some("r"),
4889            &[],
4890            &target_props,
4891            None,
4892            None,
4893            false,
4894        );
4895
4896        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4897        assert_eq!(output_schema.fields().len(), 7);
4898        assert_eq!(output_schema.field(0).name(), "a._vid");
4899        assert_eq!(output_schema.field(1).name(), "m._vid");
4900        assert_eq!(output_schema.field(2).name(), "m._labels");
4901        assert_eq!(output_schema.field(3).name(), "m.name");
4902        assert_eq!(output_schema.field(4).name(), "m.age");
4903        assert_eq!(output_schema.field(5).name(), "r._eid");
4904        assert_eq!(output_schema.field(6).name(), "r._type");
4905    }
4906
4907    #[test]
4908    fn test_variable_length_schema() {
4909        let input_schema = Arc::new(Schema::new(vec![Field::new(
4910            "a._vid",
4911            DataType::UInt64,
4912            false,
4913        )]));
4914
4915        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4916            input_schema,
4917            "b",
4918            None,
4919            Some("p"),
4920            &[],
4921            None,
4922        );
4923
4924        assert_eq!(output_schema.fields().len(), 5);
4925        assert_eq!(output_schema.field(0).name(), "a._vid");
4926        assert_eq!(output_schema.field(1).name(), "b._vid");
4927        assert_eq!(output_schema.field(2).name(), "b._labels");
4928        assert_eq!(output_schema.field(3).name(), "_hop_count");
4929        assert_eq!(output_schema.field(4).name(), "p");
4930    }
4931
4932    #[test]
4933    fn test_traverse_main_schema_without_edge() {
4934        let input_schema = Arc::new(Schema::new(vec![Field::new(
4935            "a._vid",
4936            DataType::UInt64,
4937            false,
4938        )]));
4939
4940        let output_schema =
4941            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4942
4943        // a._vid, m._vid, m._labels, __eid_to_m
4944        assert_eq!(output_schema.fields().len(), 4);
4945        assert_eq!(output_schema.field(0).name(), "a._vid");
4946        assert_eq!(output_schema.field(1).name(), "m._vid");
4947        assert_eq!(output_schema.field(2).name(), "m._labels");
4948        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4949    }
4950
4951    #[test]
4952    fn test_traverse_main_schema_with_edge() {
4953        let input_schema = Arc::new(Schema::new(vec![Field::new(
4954            "a._vid",
4955            DataType::UInt64,
4956            false,
4957        )]));
4958
4959        let output_schema = GraphTraverseMainExec::build_schema(
4960            &input_schema,
4961            "m",
4962            &Some("r".to_string()),
4963            &[],
4964            &[],
4965            false,
4966        );
4967
4968        // a._vid, m._vid, m._labels, r._eid, r._type
4969        assert_eq!(output_schema.fields().len(), 5);
4970        assert_eq!(output_schema.field(0).name(), "a._vid");
4971        assert_eq!(output_schema.field(1).name(), "m._vid");
4972        assert_eq!(output_schema.field(2).name(), "m._labels");
4973        assert_eq!(output_schema.field(3).name(), "r._eid");
4974        assert_eq!(output_schema.field(4).name(), "r._type");
4975    }
4976
4977    #[test]
4978    fn test_traverse_main_schema_with_edge_properties() {
4979        let input_schema = Arc::new(Schema::new(vec![Field::new(
4980            "a._vid",
4981            DataType::UInt64,
4982            false,
4983        )]));
4984
4985        let edge_props = vec!["weight".to_string(), "since".to_string()];
4986        let output_schema = GraphTraverseMainExec::build_schema(
4987            &input_schema,
4988            "m",
4989            &Some("r".to_string()),
4990            &edge_props,
4991            &[],
4992            false,
4993        );
4994
4995        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4996        assert_eq!(output_schema.fields().len(), 7);
4997        assert_eq!(output_schema.field(0).name(), "a._vid");
4998        assert_eq!(output_schema.field(1).name(), "m._vid");
4999        assert_eq!(output_schema.field(2).name(), "m._labels");
5000        assert_eq!(output_schema.field(3).name(), "r._eid");
5001        assert_eq!(output_schema.field(4).name(), "r._type");
5002        assert_eq!(output_schema.field(5).name(), "r.weight");
5003        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5004        assert_eq!(output_schema.field(6).name(), "r.since");
5005        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5006    }
5007
5008    #[test]
5009    fn test_traverse_main_schema_with_target_properties() {
5010        let input_schema = Arc::new(Schema::new(vec![Field::new(
5011            "a._vid",
5012            DataType::UInt64,
5013            false,
5014        )]));
5015
5016        let target_props = vec!["name".to_string(), "age".to_string()];
5017        let output_schema = GraphTraverseMainExec::build_schema(
5018            &input_schema,
5019            "m",
5020            &Some("r".to_string()),
5021            &[],
5022            &target_props,
5023            false,
5024        );
5025
5026        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
5027        assert_eq!(output_schema.fields().len(), 7);
5028        assert_eq!(output_schema.field(0).name(), "a._vid");
5029        assert_eq!(output_schema.field(1).name(), "m._vid");
5030        assert_eq!(output_schema.field(2).name(), "m._labels");
5031        assert_eq!(output_schema.field(3).name(), "r._eid");
5032        assert_eq!(output_schema.field(4).name(), "r._type");
5033        assert_eq!(output_schema.field(5).name(), "m.name");
5034        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5035        assert_eq!(output_schema.field(6).name(), "m.age");
5036        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5037    }
5038}