Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62/// BFS result: (target_vid, hop_count, node_path, edge_path)
63type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
66type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68/// Prepend nodes and edges from an existing path struct column into builders.
69///
70/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
71/// Reads the nodes and relationships from the existing path at `row_idx` and appends
72/// them to the provided builders. The caller should then skip the first VLP node
73/// (which is the junction point already present in the existing path).
74fn prepend_existing_path(
75    existing_path: &arrow_array::StructArray,
76    row_idx: usize,
77    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79    query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81    // Read existing nodes
82    let nodes_list = existing_path
83        .column(0)
84        .as_any()
85        .downcast_ref::<arrow_array::ListArray>()
86        .unwrap();
87    let node_values = nodes_list.value(row_idx);
88    let node_struct = node_values
89        .as_any()
90        .downcast_ref::<arrow_array::StructArray>()
91        .unwrap();
92    let vid_col = node_struct
93        .column(0)
94        .as_any()
95        .downcast_ref::<UInt64Array>()
96        .unwrap();
97    for i in 0..vid_col.len() {
98        append_node_to_struct(
99            nodes_builder.values(),
100            Vid::from(vid_col.value(i)),
101            query_ctx,
102        );
103    }
104
105    // Read existing edges
106    let rels_list = existing_path
107        .column(1)
108        .as_any()
109        .downcast_ref::<arrow_array::ListArray>()
110        .unwrap();
111    let edge_values = rels_list.value(row_idx);
112    let edge_struct = edge_values
113        .as_any()
114        .downcast_ref::<arrow_array::StructArray>()
115        .unwrap();
116    let eid_col = edge_struct
117        .column(0)
118        .as_any()
119        .downcast_ref::<UInt64Array>()
120        .unwrap();
121    let type_col = edge_struct
122        .column(1)
123        .as_any()
124        .downcast_ref::<arrow_array::StringArray>()
125        .unwrap();
126    let src_col = edge_struct
127        .column(2)
128        .as_any()
129        .downcast_ref::<UInt64Array>()
130        .unwrap();
131    let dst_col = edge_struct
132        .column(3)
133        .as_any()
134        .downcast_ref::<UInt64Array>()
135        .unwrap();
136    for i in 0..eid_col.len() {
137        append_edge_to_struct(
138            rels_builder.values(),
139            Eid::from(eid_col.value(i)),
140            type_col.value(i),
141            src_col.value(i),
142            dst_col.value(i),
143            query_ctx,
144        );
145    }
146}
147
148/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
149/// schemaless properties. Unlike vertex properties, schemaless edge properties must
150/// preserve original JSON value types (int, float, etc.) since edge types commonly
151/// lack explicit property definitions.
152fn resolve_edge_property_type(
153    prop: &str,
154    schema_props: Option<
155        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156    >,
157) -> DataType {
158    if prop == "overflow_json" {
159        DataType::LargeBinary
160    } else if prop == "_created_at" || prop == "_updated_at" {
161        // System-managed timestamps surfaced via `created_at(r)` /
162        // `updated_at(r)`. Stored on every edge by the L0 buffer and
163        // the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
164        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
165    } else {
166        schema_props
167            .and_then(|props| props.get(prop))
168            .map(|meta| meta.r#type.to_arrow())
169            .unwrap_or(DataType::LargeBinary)
170    }
171}
172
173use crate::query::df_graph::common::merged_edge_schema_props;
174
175/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
176type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
177
178/// Single-hop graph traversal execution plan.
179///
180/// Expands each input row by traversing edges to find neighbors.
181/// For each (source, edge, target) triple, produces one output row
182/// containing the input columns plus target vertex and edge columns.
183///
184/// # Example
185///
186/// ```ignore
187/// // Input: batch with _vid column
188/// // Traverse KNOWS edges outgoing
189/// let traverse = GraphTraverseExec::new(
190///     input_plan,
191///     "_vid",
192///     vec![knows_type_id],
193///     Direction::Outgoing,
194///     "m",           // target variable
195///     Some("r"),     // edge variable
196///     None,          // no target label filter
197///     graph_ctx,
198/// );
199///
200/// // Output: input columns + m._vid + r._eid
201/// ```
202pub struct GraphTraverseExec {
203    /// Input execution plan.
204    input: Arc<dyn ExecutionPlan>,
205
206    /// Column name containing source VIDs.
207    source_column: String,
208
209    /// Edge type IDs to traverse.
210    edge_type_ids: Vec<u32>,
211
212    /// Traversal direction.
213    direction: Direction,
214
215    /// Variable name for target vertex columns.
216    target_variable: String,
217
218    /// Variable name for edge columns (if edge is bound).
219    edge_variable: Option<String>,
220
221    /// Edge properties to materialize (for pushdown hydration).
222    edge_properties: Vec<String>,
223
224    /// Target vertex properties to materialize.
225    target_properties: Vec<String>,
226
227    /// Target label name for property type resolution.
228    target_label_name: Option<String>,
229
230    /// Optional target label filter.
231    target_label_id: Option<u16>,
232
233    /// Graph execution context.
234    graph_ctx: Arc<GraphExecutionContext>,
235
236    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
237    optional: bool,
238
239    /// Variables introduced by the OPTIONAL MATCH pattern.
240    /// Used to determine which columns should be null-extended on failure.
241    optional_pattern_vars: HashSet<String>,
242
243    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
244    /// When set, only traversals that reach this VID are included.
245    bound_target_column: Option<String>,
246
247    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
248    /// Edges matching any of these IDs are excluded from traversal results.
249    used_edge_columns: Vec<String>,
250
251    /// Output schema.
252    schema: SchemaRef,
253
254    /// Cached plan properties.
255    properties: Arc<PlanProperties>,
256
257    /// Execution metrics.
258    metrics: ExecutionPlanMetricsSet,
259}
260
261impl fmt::Debug for GraphTraverseExec {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("GraphTraverseExec")
264            .field("source_column", &self.source_column)
265            .field("edge_type_ids", &self.edge_type_ids)
266            .field("direction", &self.direction)
267            .field("target_variable", &self.target_variable)
268            .field("edge_variable", &self.edge_variable)
269            .finish()
270    }
271}
272
273impl GraphTraverseExec {
274    /// Create a new single-hop traversal plan.
275    ///
276    /// # Arguments
277    ///
278    /// * `input` - Input plan providing source vertices
279    /// * `source_column` - Column name containing source VIDs
280    /// * `edge_type_ids` - Edge types to traverse
281    /// * `direction` - Traversal direction
282    /// * `target_variable` - Variable name for target vertices
283    /// * `edge_variable` - Optional variable name for edges
284    /// * `edge_properties` - Edge properties to materialize
285    /// * `target_label_id` - Optional target label filter
286    /// * `graph_ctx` - Graph execution context
287    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
288    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
289    #[expect(clippy::too_many_arguments)]
290    pub fn new(
291        input: Arc<dyn ExecutionPlan>,
292        source_column: impl Into<String>,
293        edge_type_ids: Vec<u32>,
294        direction: Direction,
295        target_variable: impl Into<String>,
296        edge_variable: Option<String>,
297        edge_properties: Vec<String>,
298        target_properties: Vec<String>,
299        target_label_name: Option<String>,
300        target_label_id: Option<u16>,
301        graph_ctx: Arc<GraphExecutionContext>,
302        optional: bool,
303        optional_pattern_vars: HashSet<String>,
304        bound_target_column: Option<String>,
305        used_edge_columns: Vec<String>,
306    ) -> Self {
307        let source_column = source_column.into();
308        let target_variable = target_variable.into();
309
310        // Resolve target property Arrow types from the schema
311        let uni_schema = graph_ctx.storage().schema_manager().schema();
312        let label_props = target_label_name
313            .as_deref()
314            .and_then(|ln| uni_schema.properties.get(ln));
315        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
316        let edge_props = if merged_edge_props.is_empty() {
317            None
318        } else {
319            Some(&merged_edge_props)
320        };
321
322        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
323        let schema = Self::build_schema(
324            input.schema(),
325            &target_variable,
326            edge_variable.as_deref(),
327            &edge_properties,
328            &target_properties,
329            label_props,
330            edge_props,
331            optional,
332        );
333
334        let properties = compute_plan_properties(schema.clone());
335
336        Self {
337            input,
338            source_column,
339            edge_type_ids,
340            direction,
341            target_variable,
342            edge_variable,
343            edge_properties,
344            target_properties,
345            target_label_name,
346            target_label_id,
347            graph_ctx,
348            optional,
349            optional_pattern_vars,
350            bound_target_column,
351            used_edge_columns,
352            schema,
353            properties,
354            metrics: ExecutionPlanMetricsSet::new(),
355        }
356    }
357
358    /// Build output schema.
359    #[expect(
360        clippy::too_many_arguments,
361        reason = "Schema construction needs all field metadata"
362    )]
363    fn build_schema(
364        input_schema: SchemaRef,
365        target_variable: &str,
366        edge_variable: Option<&str>,
367        edge_properties: &[String],
368        target_properties: &[String],
369        label_props: Option<
370            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
371        >,
372        edge_props: Option<
373            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
374        >,
375        optional: bool,
376    ) -> SchemaRef {
377        let mut fields: Vec<Field> = input_schema
378            .fields()
379            .iter()
380            .map(|f| f.as_ref().clone())
381            .collect();
382
383        // Add target VID column (nullable when optional — unmatched rows get NULL)
384        let target_vid_name = format!("{}._vid", target_variable);
385        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
386
387        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
388        fields.push(Field::new(
389            format!("{}._labels", target_variable),
390            labels_data_type(),
391            true,
392        ));
393
394        // Add target vertex property columns
395        for prop_name in target_properties {
396            let col_name = format!("{}.{}", target_variable, prop_name);
397            let arrow_type = resolve_property_type(prop_name, label_props);
398            fields.push(Field::new(&col_name, arrow_type, true));
399        }
400
401        // Add edge ID column if edge variable is bound
402        if let Some(edge_var) = edge_variable {
403            let edge_id_name = format!("{}._eid", edge_var);
404            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
405
406            // Add edge _type column for type(r) support
407            fields.push(Field::new(
408                format!("{}._type", edge_var),
409                DataType::Utf8,
410                true,
411            ));
412
413            // Add edge property columns with types resolved from schema
414            for prop_name in edge_properties {
415                let prop_col_name = format!("{}.{}", edge_var, prop_name);
416                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
417                fields.push(Field::new(&prop_col_name, arrow_type, true));
418            }
419        } else {
420            // Add internal edge ID column for relationship uniqueness tracking
421            // even when edge variable is not explicitly bound.
422            let internal_eid_name = format!("__eid_to_{}", target_variable);
423            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
424        }
425
426        Arc::new(Schema::new(fields))
427    }
428}
429
430impl DisplayAs for GraphTraverseExec {
431    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432        write!(
433            f,
434            "GraphTraverseExec: {} --[{:?}]--> {}",
435            self.source_column, self.edge_type_ids, self.target_variable
436        )?;
437        if let Some(ref edge_var) = self.edge_variable {
438            write!(f, " as {}", edge_var)?;
439        }
440        Ok(())
441    }
442}
443
444impl ExecutionPlan for GraphTraverseExec {
445    fn name(&self) -> &str {
446        "GraphTraverseExec"
447    }
448
449    fn as_any(&self) -> &dyn Any {
450        self
451    }
452
453    fn schema(&self) -> SchemaRef {
454        self.schema.clone()
455    }
456
457    fn properties(&self) -> &Arc<PlanProperties> {
458        &self.properties
459    }
460
461    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
462        vec![&self.input]
463    }
464
465    fn with_new_children(
466        self: Arc<Self>,
467        children: Vec<Arc<dyn ExecutionPlan>>,
468    ) -> DFResult<Arc<dyn ExecutionPlan>> {
469        if children.len() != 1 {
470            return Err(datafusion::error::DataFusionError::Plan(
471                "GraphTraverseExec requires exactly one child".to_string(),
472            ));
473        }
474
475        Ok(Arc::new(Self::new(
476            children[0].clone(),
477            self.source_column.clone(),
478            self.edge_type_ids.clone(),
479            self.direction,
480            self.target_variable.clone(),
481            self.edge_variable.clone(),
482            self.edge_properties.clone(),
483            self.target_properties.clone(),
484            self.target_label_name.clone(),
485            self.target_label_id,
486            self.graph_ctx.clone(),
487            self.optional,
488            self.optional_pattern_vars.clone(),
489            self.bound_target_column.clone(),
490            self.used_edge_columns.clone(),
491        )))
492    }
493
494    fn execute(
495        &self,
496        partition: usize,
497        context: Arc<TaskContext>,
498    ) -> DFResult<SendableRecordBatchStream> {
499        let input_stream = self.input.execute(partition, context)?;
500
501        let metrics = BaselineMetrics::new(&self.metrics, partition);
502
503        let warm_fut = self
504            .graph_ctx
505            .warming_future(self.edge_type_ids.clone(), self.direction);
506
507        Ok(Box::pin(GraphTraverseStream {
508            input: input_stream,
509            source_column: self.source_column.clone(),
510            edge_type_ids: self.edge_type_ids.clone(),
511            direction: self.direction,
512            target_variable: self.target_variable.clone(),
513            edge_variable: self.edge_variable.clone(),
514            edge_properties: self.edge_properties.clone(),
515            target_properties: self.target_properties.clone(),
516            target_label_name: self.target_label_name.clone(),
517            graph_ctx: self.graph_ctx.clone(),
518            optional: self.optional,
519            optional_pattern_vars: self.optional_pattern_vars.clone(),
520            bound_target_column: self.bound_target_column.clone(),
521            used_edge_columns: self.used_edge_columns.clone(),
522            schema: self.schema.clone(),
523            state: TraverseStreamState::Warming(warm_fut),
524            metrics,
525        }))
526    }
527
528    fn metrics(&self) -> Option<MetricsSet> {
529        Some(self.metrics.clone_inner())
530    }
531}
532
533/// State machine for traverse stream execution.
534enum TraverseStreamState {
535    /// Warming adjacency CSRs before first batch.
536    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
537    /// Polling the input stream for batches.
538    Reading,
539    /// Materializing target vertex properties asynchronously.
540    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
541    /// Stream is done.
542    Done,
543}
544
545/// Stream that performs single-hop traversal with async property materialization.
546struct GraphTraverseStream {
547    /// Input stream.
548    input: SendableRecordBatchStream,
549
550    /// Column name containing source VIDs.
551    source_column: String,
552
553    /// Edge type IDs to traverse.
554    edge_type_ids: Vec<u32>,
555
556    /// Traversal direction.
557    direction: Direction,
558
559    /// Variable name for target vertex (retained for diagnostics).
560    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
561    target_variable: String,
562
563    /// Variable name for edge (if bound).
564    edge_variable: Option<String>,
565
566    /// Edge properties to materialize.
567    edge_properties: Vec<String>,
568
569    /// Target vertex properties to materialize.
570    target_properties: Vec<String>,
571
572    /// Target label name for property resolution and filtering.
573    target_label_name: Option<String>,
574
575    /// Graph execution context.
576    graph_ctx: Arc<GraphExecutionContext>,
577
578    /// Whether this is an OPTIONAL MATCH.
579    optional: bool,
580
581    /// Variables introduced by the OPTIONAL MATCH pattern.
582    optional_pattern_vars: HashSet<String>,
583
584    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
585    bound_target_column: Option<String>,
586
587    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
588    used_edge_columns: Vec<String>,
589
590    /// Output schema.
591    schema: SchemaRef,
592
593    /// Stream state.
594    state: TraverseStreamState,
595
596    /// Metrics.
597    metrics: BaselineMetrics,
598}
599
600impl GraphTraverseStream {
601    /// Expand neighbors synchronously and return expansions.
602    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
603    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
604        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
605            datafusion::error::DataFusionError::Execution(format!(
606                "Source column '{}' not found",
607                self.source_column
608            ))
609        })?;
610
611        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
612        let source_vids: &UInt64Array = &source_vid_cow;
613
614        // If bound_target_column is set, get the expected target VIDs for each row.
615        // This is used for cycle patterns like n-->k<--n where the target must match.
616        let bound_target_cow = self
617            .bound_target_column
618            .as_ref()
619            .and_then(|col| batch.column_by_name(col))
620            .map(|c| column_as_vid_array(c.as_ref()))
621            .transpose()?;
622        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
623
624        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
625        let used_edge_arrays: Vec<&UInt64Array> = self
626            .used_edge_columns
627            .iter()
628            .filter_map(|col| {
629                batch
630                    .column_by_name(col)
631                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
632            })
633            .collect();
634
635        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
636        let is_undirected = matches!(self.direction, Direction::Both);
637
638        for (row_idx, source_vid) in source_vids.iter().enumerate() {
639            let Some(src) = source_vid else {
640                continue;
641            };
642
643            // Get expected target VID if this is a bound target pattern.
644            // Distinguish between:
645            // - no bound target column (no filtering),
646            // - bound target present but NULL for this row (must produce no expansion),
647            // - bound target present with VID.
648            let expected_target = bound_target_vids.map(|arr| {
649                if arr.is_null(row_idx) {
650                    None
651                } else {
652                    Some(arr.value(row_idx))
653                }
654            });
655
656            // Collect used edge IDs for this row from all previous hops
657            let used_eids: HashSet<u64> = used_edge_arrays
658                .iter()
659                .filter_map(|arr| {
660                    if arr.is_null(row_idx) {
661                        None
662                    } else {
663                        Some(arr.value(row_idx))
664                    }
665                })
666                .collect();
667
668            let vid = Vid::from(src);
669            // For Direction::Both, deduplicate edges by eid within each source.
670            // This prevents the same edge being counted twice (once outgoing, once incoming).
671            let mut seen_edges: HashSet<u64> = HashSet::new();
672
673            for &edge_type in &self.edge_type_ids {
674                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
675
676                for (target_vid, eid) in neighbors {
677                    let eid_u64 = eid.as_u64();
678
679                    // Skip edges already used in previous hops (relationship uniqueness)
680                    if used_eids.contains(&eid_u64) {
681                        continue;
682                    }
683
684                    // Deduplicate edges for undirected patterns
685                    if is_undirected && !seen_edges.insert(eid_u64) {
686                        continue;
687                    }
688
689                    // Filter by bound target VID if set (for cycle patterns).
690                    // NULL bound targets do not match anything.
691                    if let Some(expected_opt) = expected_target {
692                        let Some(expected) = expected_opt else {
693                            continue;
694                        };
695                        if target_vid.as_u64() != expected {
696                            continue;
697                        }
698                    }
699
700                    // Filter by target label using L0 visibility.
701                    // VIDs no longer embed label information, so we must look up labels.
702                    if let Some(ref label_name) = self.target_label_name {
703                        let query_ctx = self.graph_ctx.query_context();
704                        if let Some(vertex_labels) =
705                            l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
706                        {
707                            // Vertex is in L0 — require actual label match
708                            if !vertex_labels.contains(label_name) {
709                                continue;
710                            }
711                        }
712                        // else: vertex not in L0 → trust storage-level filtering
713                    }
714
715                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
716                }
717            }
718        }
719
720        Ok(expanded_rows)
721    }
722}
723
724/// Build target vertex labels column from L0 buffers.
725fn build_target_labels_column(
726    target_vids: &[Vid],
727    target_label_name: &Option<String>,
728    graph_ctx: &GraphExecutionContext,
729) -> ArrayRef {
730    use arrow_array::builder::{ListBuilder, StringBuilder};
731    let mut labels_builder = ListBuilder::new(StringBuilder::new());
732    let query_ctx = graph_ctx.query_context();
733    for vid in target_vids {
734        let row_labels: Vec<String> =
735            match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
736                Some(labels) => labels,
737                None => {
738                    // Vertex not in L0 — trust schema label (storage already filtered)
739                    if let Some(label_name) = target_label_name {
740                        vec![label_name.clone()]
741                    } else {
742                        vec![]
743                    }
744                }
745            };
746        let values = labels_builder.values();
747        for lbl in &row_labels {
748            values.append_value(lbl);
749        }
750        labels_builder.append(true);
751    }
752    Arc::new(labels_builder.finish())
753}
754
755/// Build target vertex property columns from storage and L0.
756async fn build_target_property_columns(
757    target_vids: &[Vid],
758    target_properties: &[String],
759    target_label_name: &Option<String>,
760    graph_ctx: &Arc<GraphExecutionContext>,
761) -> DFResult<Vec<ArrayRef>> {
762    let mut columns = Vec::new();
763
764    if let Some(label_name) = target_label_name {
765        let property_manager = graph_ctx.property_manager();
766        let query_ctx = graph_ctx.query_context();
767
768        let props_map = property_manager
769            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
770            .await
771            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
772
773        let uni_schema = graph_ctx.storage().schema_manager().schema();
774        let label_props = uni_schema.properties.get(label_name.as_str());
775
776        for prop_name in target_properties {
777            let data_type = resolve_property_type(prop_name, label_props);
778            let column =
779                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
780            columns.push(column);
781        }
782    } else {
783        // No label name — use label-agnostic property lookup.
784        let non_internal_props: Vec<&str> = target_properties
785            .iter()
786            .filter(|p| *p != "_all_props")
787            .map(|s| s.as_str())
788            .collect();
789        let property_manager = graph_ctx.property_manager();
790        let query_ctx = graph_ctx.query_context();
791
792        let props_map = if !non_internal_props.is_empty() {
793            property_manager
794                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
795                .await
796                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
797        } else {
798            std::collections::HashMap::new()
799        };
800
801        for prop_name in target_properties {
802            if prop_name == "_all_props" {
803                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
804            } else {
805                let column = build_property_column_static(
806                    target_vids,
807                    &props_map,
808                    prop_name,
809                    &arrow::datatypes::DataType::LargeBinary,
810                )?;
811                columns.push(column);
812            }
813        }
814    }
815
816    Ok(columns)
817}
818
819/// Build a CypherValue blob column from all vertex properties (L0 + storage).
820fn build_all_props_column(
821    target_vids: &[Vid],
822    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
823    graph_ctx: &Arc<GraphExecutionContext>,
824) -> ArrayRef {
825    use arrow_array::builder::LargeBinaryBuilder;
826
827    let mut builder = LargeBinaryBuilder::new();
828    let l0_ctx = graph_ctx.l0_context();
829    for vid in target_vids {
830        // Merge in `Value` space so typed values (temporals) are preserved.
831        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
832        if let Some(vid_props) = props_map.get(vid) {
833            for (k, v) in vid_props.iter() {
834                merged_props.insert(k.clone(), v.clone());
835            }
836        }
837        for l0 in l0_ctx.iter_l0_buffers() {
838            let guard = l0.read();
839            if let Some(l0_props) = guard.vertex_properties.get(vid) {
840                for (k, v) in l0_props.iter() {
841                    merged_props.insert(k.clone(), v.clone());
842                }
843            }
844        }
845        if merged_props.is_empty() {
846            builder.append_null();
847        } else {
848            builder.append_value(uni_common::cypher_value_codec::encode(
849                &uni_common::Value::Map(merged_props),
850            ));
851        }
852    }
853    Arc::new(builder.finish())
854}
855
856/// Build edge ID, type, and property columns for bound edge variables.
857async fn build_edge_columns(
858    expansions: &[(usize, Vid, u64, u32)],
859    edge_properties: &[String],
860    edge_type_ids: &[u32],
861    graph_ctx: &Arc<GraphExecutionContext>,
862) -> DFResult<Vec<ArrayRef>> {
863    let mut columns = Vec::new();
864
865    let eids: Vec<Eid> = expansions
866        .iter()
867        .map(|(_, _, eid, _)| Eid::from(*eid))
868        .collect();
869    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
870    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
871
872    // Edge _type column
873    {
874        let uni_schema = graph_ctx.storage().schema_manager().schema();
875        let mut type_builder = arrow_array::builder::StringBuilder::new();
876        for (_, _, _, edge_type_id) in expansions {
877            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
878                type_builder.append_value(&name);
879            } else {
880                type_builder.append_null();
881            }
882        }
883        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
884    }
885
886    if !edge_properties.is_empty() {
887        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
888        let property_manager = graph_ctx.property_manager();
889        let query_ctx = graph_ctx.query_context();
890
891        let props_map = property_manager
892            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
893            .await
894            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
895
896        let uni_schema = graph_ctx.storage().schema_manager().schema();
897        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
898        let edge_type_props = if merged_edge_props.is_empty() {
899            None
900        } else {
901            Some(&merged_edge_props)
902        };
903
904        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
905
906        for prop_name in edge_properties {
907            // System-managed edge timestamps live outside the property bag.
908            // Read them directly from L0 (earliest creation, latest update).
909            // Disk-only edges will surface as null — acceptable for the
910            // common case where the queried edges are L0-resident in the
911            // same session.
912            if prop_name == "_created_at" || prop_name == "_updated_at" {
913                let mut builder =
914                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
915                let l0_ctx = graph_ctx.l0_context();
916                for eid in &eids {
917                    let mut value: Option<i64> = None;
918                    for l0 in l0_ctx.iter_l0_buffers() {
919                        let guard = l0.read();
920                        let opt = if prop_name == "_created_at" {
921                            guard.edge_created_at.get(eid).copied()
922                        } else {
923                            guard.edge_updated_at.get(eid).copied()
924                        };
925                        if let Some(ts) = opt {
926                            value = Some(match value {
927                                None => ts,
928                                Some(cur) if prop_name == "_created_at" => cur.min(ts),
929                                Some(cur) => cur.max(ts),
930                            });
931                        }
932                    }
933                    match value {
934                        Some(ts) => builder.append_value(ts),
935                        None => builder.append_null(),
936                    }
937                }
938                columns.push(Arc::new(builder.finish()) as ArrayRef);
939                continue;
940            }
941            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
942            let column =
943                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
944            columns.push(column);
945        }
946    }
947
948    Ok(columns)
949}
950
951/// Build the output batch with target vertex properties.
952///
953/// This is a standalone async function so it can be boxed into a `Send` future
954/// without borrowing from `GraphTraverseStream`.
955#[expect(
956    clippy::too_many_arguments,
957    reason = "Standalone async fn needs all context passed explicitly"
958)]
959async fn build_traverse_output_batch(
960    input: RecordBatch,
961    expansions: Vec<(usize, Vid, u64, u32)>,
962    schema: SchemaRef,
963    edge_variable: Option<String>,
964    edge_properties: Vec<String>,
965    edge_type_ids: Vec<u32>,
966    target_properties: Vec<String>,
967    target_label_name: Option<String>,
968    graph_ctx: Arc<GraphExecutionContext>,
969    optional: bool,
970    optional_pattern_vars: HashSet<String>,
971) -> DFResult<RecordBatch> {
972    if expansions.is_empty() {
973        if !optional {
974            return Ok(RecordBatch::new_empty(schema));
975        }
976        let unmatched_reps = collect_unmatched_optional_group_rows(
977            &input,
978            &HashSet::new(),
979            &schema,
980            &optional_pattern_vars,
981        )?;
982        if unmatched_reps.is_empty() {
983            return Ok(RecordBatch::new_empty(schema));
984        }
985        return build_optional_null_batch_for_rows_with_optional_vars(
986            &input,
987            &unmatched_reps,
988            &schema,
989            &optional_pattern_vars,
990        );
991    }
992
993    // Expand input columns via index array
994    let indices: Vec<u64> = expansions
995        .iter()
996        .map(|(idx, _, _, _)| *idx as u64)
997        .collect();
998    let indices_array = UInt64Array::from(indices);
999    let mut columns: Vec<ArrayRef> = input
1000        .columns()
1001        .iter()
1002        .map(|col| take(col.as_ref(), &indices_array, None))
1003        .collect::<Result<_, _>>()?;
1004
1005    // Target VID column
1006    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1007    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1008    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1009
1010    // Target labels column
1011    columns.push(build_target_labels_column(
1012        &target_vids,
1013        &target_label_name,
1014        &graph_ctx,
1015    ));
1016
1017    // Target vertex property columns
1018    if !target_properties.is_empty() {
1019        let prop_cols = build_target_property_columns(
1020            &target_vids,
1021            &target_properties,
1022            &target_label_name,
1023            &graph_ctx,
1024        )
1025        .await?;
1026        columns.extend(prop_cols);
1027    }
1028
1029    // Edge columns (bound or internal tracking)
1030    if edge_variable.is_some() {
1031        let edge_cols =
1032            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1033        columns.extend(edge_cols);
1034    } else {
1035        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1036        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1037    }
1038
1039    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1040
1041    // Append null rows for unmatched optional sources
1042    if optional {
1043        let matched_indices: HashSet<usize> =
1044            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1045        let unmatched = collect_unmatched_optional_group_rows(
1046            &input,
1047            &matched_indices,
1048            &schema,
1049            &optional_pattern_vars,
1050        )?;
1051
1052        if !unmatched.is_empty() {
1053            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1054                &input,
1055                &unmatched,
1056                &schema,
1057                &optional_pattern_vars,
1058            )?;
1059            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1060                .map_err(arrow_err)?;
1061            return Ok(combined);
1062        }
1063    }
1064
1065    Ok(expanded_batch)
1066}
1067
1068/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1069/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1070fn build_optional_null_batch_for_rows(
1071    input: &RecordBatch,
1072    unmatched_indices: &[usize],
1073    schema: &SchemaRef,
1074) -> DFResult<RecordBatch> {
1075    let num_rows = unmatched_indices.len();
1076    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1077    let indices_array = UInt64Array::from(indices);
1078
1079    // Take the unmatched input rows
1080    let mut columns: Vec<ArrayRef> = Vec::new();
1081    for col in input.columns() {
1082        let taken = take(col.as_ref(), &indices_array, None)?;
1083        columns.push(taken);
1084    }
1085    // Fill remaining columns with nulls
1086    for field in schema.fields().iter().skip(input.num_columns()) {
1087        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1088    }
1089    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1090}
1091
1092fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1093    optional_vars.contains(col_name)
1094        || optional_vars.iter().any(|var| {
1095            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1096                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1097        })
1098}
1099
1100fn collect_unmatched_optional_group_rows(
1101    input: &RecordBatch,
1102    matched_indices: &HashSet<usize>,
1103    schema: &SchemaRef,
1104    optional_vars: &HashSet<String>,
1105) -> DFResult<Vec<usize>> {
1106    if input.num_rows() == 0 {
1107        return Ok(Vec::new());
1108    }
1109
1110    if optional_vars.is_empty() {
1111        return Ok((0..input.num_rows())
1112            .filter(|idx| !matched_indices.contains(idx))
1113            .collect());
1114    }
1115
1116    let source_vid_indices: Vec<usize> = schema
1117        .fields()
1118        .iter()
1119        .enumerate()
1120        .filter_map(|(idx, field)| {
1121            if idx >= input.num_columns() {
1122                return None;
1123            }
1124            let name = field.name();
1125            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1126                Some(idx)
1127            } else {
1128                None
1129            }
1130        })
1131        .collect();
1132
1133    // Group rows by non-optional VID bindings and preserve group order.
1134    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1135    let mut group_order: Vec<Vec<u8>> = Vec::new();
1136
1137    for row_idx in 0..input.num_rows() {
1138        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1139        let entry = groups.entry(key.clone());
1140        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1141            group_order.push(key.clone());
1142        }
1143        let matched = matched_indices.contains(&row_idx);
1144        entry
1145            .and_modify(|(_, any_matched)| *any_matched |= matched)
1146            .or_insert((row_idx, matched));
1147    }
1148
1149    Ok(group_order
1150        .into_iter()
1151        .filter_map(|key| {
1152            groups
1153                .get(&key)
1154                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1155        })
1156        .collect())
1157}
1158
1159fn compute_optional_group_key(
1160    batch: &RecordBatch,
1161    row_idx: usize,
1162    source_vid_indices: &[usize],
1163) -> DFResult<Vec<u8>> {
1164    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1165    for &col_idx in source_vid_indices {
1166        let col = batch.column(col_idx);
1167        let vid_cow = column_as_vid_array(col.as_ref())?;
1168        let arr: &UInt64Array = &vid_cow;
1169        if arr.is_null(row_idx) {
1170            key.extend_from_slice(&u64::MAX.to_le_bytes());
1171        } else {
1172            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1173        }
1174    }
1175    Ok(key)
1176}
1177
1178fn build_optional_null_batch_for_rows_with_optional_vars(
1179    input: &RecordBatch,
1180    unmatched_indices: &[usize],
1181    schema: &SchemaRef,
1182    optional_vars: &HashSet<String>,
1183) -> DFResult<RecordBatch> {
1184    if optional_vars.is_empty() {
1185        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1186    }
1187
1188    let num_rows = unmatched_indices.len();
1189    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1190    let indices_array = UInt64Array::from(indices);
1191
1192    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1193    for (col_idx, field) in schema.fields().iter().enumerate() {
1194        if col_idx < input.num_columns() {
1195            if is_optional_column_for_vars(field.name(), optional_vars) {
1196                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1197            } else {
1198                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1199                columns.push(taken);
1200            }
1201        } else {
1202            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1203        }
1204    }
1205
1206    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1207}
1208
1209impl Stream for GraphTraverseStream {
1210    type Item = DFResult<RecordBatch>;
1211
1212    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1213        let metrics = self.metrics.clone();
1214        let _timer = metrics.elapsed_compute().timer();
1215        loop {
1216            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1217
1218            match state {
1219                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1220                    Poll::Ready(Ok(())) => {
1221                        self.state = TraverseStreamState::Reading;
1222                        // Continue loop to start reading
1223                    }
1224                    Poll::Ready(Err(e)) => {
1225                        self.state = TraverseStreamState::Done;
1226                        return Poll::Ready(Some(Err(e)));
1227                    }
1228                    Poll::Pending => {
1229                        self.state = TraverseStreamState::Warming(fut);
1230                        return Poll::Pending;
1231                    }
1232                },
1233                TraverseStreamState::Reading => {
1234                    // Check timeout
1235                    if let Err(e) = self.graph_ctx.check_timeout() {
1236                        return Poll::Ready(Some(Err(
1237                            datafusion::error::DataFusionError::Execution(e.to_string()),
1238                        )));
1239                    }
1240
1241                    match self.input.poll_next_unpin(cx) {
1242                        Poll::Ready(Some(Ok(batch))) => {
1243                            // Expand neighbors synchronously
1244                            let expansions = match self.expand_neighbors(&batch) {
1245                                Ok(exp) => exp,
1246                                Err(e) => {
1247                                    self.state = TraverseStreamState::Reading;
1248                                    return Poll::Ready(Some(Err(e)));
1249                                }
1250                            };
1251
1252                            // Build output synchronously only when no properties need async hydration
1253                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1254                            {
1255                                let result = build_traverse_output_batch_sync(
1256                                    &batch,
1257                                    &expansions,
1258                                    &self.schema,
1259                                    self.edge_variable.as_ref(),
1260                                    &self.graph_ctx,
1261                                    self.optional,
1262                                    &self.optional_pattern_vars,
1263                                );
1264                                self.state = TraverseStreamState::Reading;
1265                                if let Ok(ref r) = result {
1266                                    self.metrics.record_output(r.num_rows());
1267                                }
1268                                return Poll::Ready(Some(result));
1269                            }
1270
1271                            // Properties needed — create async future for hydration
1272                            let schema = self.schema.clone();
1273                            let edge_variable = self.edge_variable.clone();
1274                            let edge_properties = self.edge_properties.clone();
1275                            let edge_type_ids = self.edge_type_ids.clone();
1276                            let target_properties = self.target_properties.clone();
1277                            let target_label_name = self.target_label_name.clone();
1278                            let graph_ctx = self.graph_ctx.clone();
1279
1280                            let optional = self.optional;
1281                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1282
1283                            let fut = build_traverse_output_batch(
1284                                batch,
1285                                expansions,
1286                                schema,
1287                                edge_variable,
1288                                edge_properties,
1289                                edge_type_ids,
1290                                target_properties,
1291                                target_label_name,
1292                                graph_ctx,
1293                                optional,
1294                                optional_pattern_vars,
1295                            );
1296
1297                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1298                            // Continue loop to poll the future
1299                        }
1300                        Poll::Ready(Some(Err(e))) => {
1301                            self.state = TraverseStreamState::Done;
1302                            return Poll::Ready(Some(Err(e)));
1303                        }
1304                        Poll::Ready(None) => {
1305                            self.state = TraverseStreamState::Done;
1306                            return Poll::Ready(None);
1307                        }
1308                        Poll::Pending => {
1309                            self.state = TraverseStreamState::Reading;
1310                            return Poll::Pending;
1311                        }
1312                    }
1313                }
1314                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1315                    Poll::Ready(Ok(batch)) => {
1316                        self.state = TraverseStreamState::Reading;
1317                        self.metrics.record_output(batch.num_rows());
1318                        return Poll::Ready(Some(Ok(batch)));
1319                    }
1320                    Poll::Ready(Err(e)) => {
1321                        self.state = TraverseStreamState::Done;
1322                        return Poll::Ready(Some(Err(e)));
1323                    }
1324                    Poll::Pending => {
1325                        self.state = TraverseStreamState::Materializing(fut);
1326                        return Poll::Pending;
1327                    }
1328                },
1329                TraverseStreamState::Done => {
1330                    return Poll::Ready(None);
1331                }
1332            }
1333        }
1334    }
1335}
1336
1337/// Build output batch synchronously when no properties need async hydration.
1338///
1339/// Only called when both `target_properties` and `edge_properties` are empty,
1340/// so no property columns need to be materialized.
1341fn build_traverse_output_batch_sync(
1342    input: &RecordBatch,
1343    expansions: &[(usize, Vid, u64, u32)],
1344    schema: &SchemaRef,
1345    edge_variable: Option<&String>,
1346    graph_ctx: &GraphExecutionContext,
1347    optional: bool,
1348    optional_pattern_vars: &HashSet<String>,
1349) -> DFResult<RecordBatch> {
1350    if expansions.is_empty() {
1351        if !optional {
1352            return Ok(RecordBatch::new_empty(schema.clone()));
1353        }
1354        let unmatched_reps = collect_unmatched_optional_group_rows(
1355            input,
1356            &HashSet::new(),
1357            schema,
1358            optional_pattern_vars,
1359        )?;
1360        if unmatched_reps.is_empty() {
1361            return Ok(RecordBatch::new_empty(schema.clone()));
1362        }
1363        return build_optional_null_batch_for_rows_with_optional_vars(
1364            input,
1365            &unmatched_reps,
1366            schema,
1367            optional_pattern_vars,
1368        );
1369    }
1370
1371    let indices: Vec<u64> = expansions
1372        .iter()
1373        .map(|(idx, _, _, _)| *idx as u64)
1374        .collect();
1375    let indices_array = UInt64Array::from(indices);
1376
1377    let mut columns: Vec<ArrayRef> = Vec::new();
1378    for col in input.columns() {
1379        let expanded = take(col.as_ref(), &indices_array, None)?;
1380        columns.push(expanded);
1381    }
1382
1383    // Add target VID column
1384    let target_vids: Vec<u64> = expansions
1385        .iter()
1386        .map(|(_, vid, _, _)| vid.as_u64())
1387        .collect();
1388    columns.push(Arc::new(UInt64Array::from(target_vids)));
1389
1390    // Add target ._labels column (from L0 buffers)
1391    {
1392        use arrow_array::builder::{ListBuilder, StringBuilder};
1393        let l0_ctx = graph_ctx.l0_context();
1394        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1395        for (_, vid, _, _) in expansions {
1396            let mut row_labels: Vec<String> = Vec::new();
1397            for l0 in l0_ctx.iter_l0_buffers() {
1398                let guard = l0.read();
1399                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1400                    for lbl in l0_labels {
1401                        if !row_labels.contains(lbl) {
1402                            row_labels.push(lbl.clone());
1403                        }
1404                    }
1405                }
1406            }
1407            let values = labels_builder.values();
1408            for lbl in &row_labels {
1409                values.append_value(lbl);
1410            }
1411            labels_builder.append(true);
1412        }
1413        columns.push(Arc::new(labels_builder.finish()));
1414    }
1415
1416    // Add edge columns if edge is bound (no properties in sync path)
1417    if edge_variable.is_some() {
1418        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1419        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1420
1421        // Add edge _type column
1422        let uni_schema = graph_ctx.storage().schema_manager().schema();
1423        let mut type_builder = arrow_array::builder::StringBuilder::new();
1424        for (_, _, _, edge_type_id) in expansions {
1425            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1426                type_builder.append_value(&name);
1427            } else {
1428                type_builder.append_null();
1429            }
1430        }
1431        columns.push(Arc::new(type_builder.finish()));
1432    } else {
1433        // Internal EID column for relationship uniqueness tracking (matches schema)
1434        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1435        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1436    }
1437
1438    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1439
1440    if optional {
1441        let matched_indices: HashSet<usize> =
1442            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1443        let unmatched = collect_unmatched_optional_group_rows(
1444            input,
1445            &matched_indices,
1446            schema,
1447            optional_pattern_vars,
1448        )?;
1449
1450        if !unmatched.is_empty() {
1451            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1452                input,
1453                &unmatched,
1454                schema,
1455                optional_pattern_vars,
1456            )?;
1457            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1458                .map_err(|e| {
1459                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1460            })?;
1461            return Ok(combined);
1462        }
1463    }
1464
1465    Ok(expanded_batch)
1466}
1467
1468impl RecordBatchStream for GraphTraverseStream {
1469    fn schema(&self) -> SchemaRef {
1470        self.schema.clone()
1471    }
1472}
1473
1474/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1475///
1476/// Type name and properties are `Arc`-shared: an undirected (`Both`) traversal
1477/// stores every edge under both endpoints, which used to deep-clone the whole
1478/// property map per edge (review perf #5).
1479type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1480
1481/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1482///
1483/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1484/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1485///
1486/// # Example
1487///
1488/// ```ignore
1489/// // Traverse schemaless "CUSTOM" edges
1490/// let traverse = GraphTraverseMainExec::new(
1491///     input_plan,
1492///     "_vid",
1493///     "CUSTOM",
1494///     Direction::Outgoing,
1495///     "m",           // target variable
1496///     Some("r"),     // edge variable
1497///     vec![],        // edge properties
1498///     vec![],        // target properties
1499///     graph_ctx,
1500///     false,         // not optional
1501/// );
1502/// ```
1503pub struct GraphTraverseMainExec {
1504    /// Input execution plan.
1505    input: Arc<dyn ExecutionPlan>,
1506
1507    /// Column name containing source VIDs.
1508    source_column: String,
1509
1510    /// Edge type names (not IDs, since schemaless types may not have IDs).
1511    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1512    type_names: Vec<String>,
1513
1514    /// Traversal direction.
1515    direction: Direction,
1516
1517    /// Variable name for target vertex columns.
1518    target_variable: String,
1519
1520    /// Variable name for edge columns (if edge is bound).
1521    edge_variable: Option<String>,
1522
1523    /// Edge properties to materialize.
1524    edge_properties: Vec<String>,
1525
1526    /// Target vertex properties to materialize.
1527    target_properties: Vec<String>,
1528
1529    /// Graph execution context.
1530    graph_ctx: Arc<GraphExecutionContext>,
1531
1532    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1533    optional: bool,
1534
1535    /// Variables introduced by the OPTIONAL MATCH pattern.
1536    optional_pattern_vars: HashSet<String>,
1537
1538    /// Column name of an already-bound target VID (for patterns where target is in scope).
1539    /// When set, only traversals reaching this exact VID are included.
1540    bound_target_column: Option<String>,
1541
1542    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1543    /// Edges matching any of these IDs are excluded from traversal results.
1544    used_edge_columns: Vec<String>,
1545
1546    /// Output schema.
1547    schema: SchemaRef,
1548
1549    /// Cached plan properties.
1550    properties: Arc<PlanProperties>,
1551
1552    /// Execution metrics.
1553    metrics: ExecutionPlanMetricsSet,
1554}
1555
1556impl fmt::Debug for GraphTraverseMainExec {
1557    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558        f.debug_struct("GraphTraverseMainExec")
1559            .field("type_names", &self.type_names)
1560            .field("direction", &self.direction)
1561            .field("target_variable", &self.target_variable)
1562            .field("edge_variable", &self.edge_variable)
1563            .finish()
1564    }
1565}
1566
1567impl GraphTraverseMainExec {
1568    /// Create a new schemaless traversal executor.
1569    #[expect(clippy::too_many_arguments)]
1570    pub fn new(
1571        input: Arc<dyn ExecutionPlan>,
1572        source_column: impl Into<String>,
1573        type_names: Vec<String>,
1574        direction: Direction,
1575        target_variable: impl Into<String>,
1576        edge_variable: Option<String>,
1577        edge_properties: Vec<String>,
1578        target_properties: Vec<String>,
1579        graph_ctx: Arc<GraphExecutionContext>,
1580        optional: bool,
1581        optional_pattern_vars: HashSet<String>,
1582        bound_target_column: Option<String>,
1583        used_edge_columns: Vec<String>,
1584    ) -> Self {
1585        let source_column = source_column.into();
1586        let target_variable = target_variable.into();
1587
1588        // Build output schema
1589        let schema = Self::build_schema(
1590            &input.schema(),
1591            &target_variable,
1592            &edge_variable,
1593            &edge_properties,
1594            &target_properties,
1595            optional,
1596        );
1597
1598        let properties = compute_plan_properties(schema.clone());
1599
1600        Self {
1601            input,
1602            source_column,
1603            type_names,
1604            direction,
1605            target_variable,
1606            edge_variable,
1607            edge_properties,
1608            target_properties,
1609            graph_ctx,
1610            optional,
1611            optional_pattern_vars,
1612            bound_target_column,
1613            used_edge_columns,
1614            schema,
1615            properties,
1616            metrics: ExecutionPlanMetricsSet::new(),
1617        }
1618    }
1619
1620    /// Build output schema for traversal.
1621    fn build_schema(
1622        input_schema: &SchemaRef,
1623        target_variable: &str,
1624        edge_variable: &Option<String>,
1625        edge_properties: &[String],
1626        target_properties: &[String],
1627        optional: bool,
1628    ) -> SchemaRef {
1629        let mut fields: Vec<Field> = input_schema
1630            .fields()
1631            .iter()
1632            .map(|f| f.as_ref().clone())
1633            .collect();
1634
1635        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1636        let target_vid_name = format!("{}._vid", target_variable);
1637        if input_schema.column_with_name(&target_vid_name).is_none() {
1638            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1639        }
1640
1641        // Add target ._labels column (only if not already in input)
1642        let target_labels_name = format!("{}._labels", target_variable);
1643        if input_schema.column_with_name(&target_labels_name).is_none() {
1644            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1645        }
1646
1647        // Add edge columns if edge variable is bound
1648        if let Some(edge_var) = edge_variable {
1649            fields.push(Field::new(
1650                format!("{}._eid", edge_var),
1651                DataType::UInt64,
1652                optional,
1653            ));
1654
1655            // Add edge ._type column for type(r) support
1656            fields.push(Field::new(
1657                format!("{}._type", edge_var),
1658                DataType::Utf8,
1659                true,
1660            ));
1661
1662            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1663            // Schemaless edges store properties as CypherValue blobs so that
1664            // Int, Float, etc. round-trip correctly through Arrow.
1665            for prop in edge_properties {
1666                let col_name = format!("{}.{}", edge_var, prop);
1667                let mut metadata = std::collections::HashMap::new();
1668                metadata.insert("cv_encoded".to_string(), "true".to_string());
1669                fields.push(
1670                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1671                );
1672            }
1673        } else {
1674            // Add internal edge ID for anonymous relationships so BindPath can
1675            // reconstruct named paths (p = (a)-[:T]->(b)).
1676            fields.push(Field::new(
1677                format!("__eid_to_{}", target_variable),
1678                DataType::UInt64,
1679                optional,
1680            ));
1681        }
1682
1683        // Target properties: all as LargeBinary (deferred to PropertyManager)
1684        for prop in target_properties {
1685            fields.push(Field::new(
1686                format!("{}.{}", target_variable, prop),
1687                DataType::LargeBinary,
1688                true,
1689            ));
1690        }
1691
1692        Arc::new(Schema::new(fields))
1693    }
1694}
1695
1696impl DisplayAs for GraphTraverseMainExec {
1697    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698        write!(
1699            f,
1700            "GraphTraverseMainExec: types={:?}, direction={:?}",
1701            self.type_names, self.direction
1702        )
1703    }
1704}
1705
1706impl ExecutionPlan for GraphTraverseMainExec {
1707    fn name(&self) -> &str {
1708        "GraphTraverseMainExec"
1709    }
1710
1711    fn as_any(&self) -> &dyn Any {
1712        self
1713    }
1714
1715    fn schema(&self) -> SchemaRef {
1716        self.schema.clone()
1717    }
1718
1719    fn properties(&self) -> &Arc<PlanProperties> {
1720        &self.properties
1721    }
1722
1723    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1724        vec![&self.input]
1725    }
1726
1727    fn with_new_children(
1728        self: Arc<Self>,
1729        children: Vec<Arc<dyn ExecutionPlan>>,
1730    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1731        if children.len() != 1 {
1732            return Err(datafusion::error::DataFusionError::Plan(
1733                "GraphTraverseMainExec expects exactly one child".to_string(),
1734            ));
1735        }
1736
1737        Ok(Arc::new(Self {
1738            input: children[0].clone(),
1739            source_column: self.source_column.clone(),
1740            type_names: self.type_names.clone(),
1741            direction: self.direction,
1742            target_variable: self.target_variable.clone(),
1743            edge_variable: self.edge_variable.clone(),
1744            edge_properties: self.edge_properties.clone(),
1745            target_properties: self.target_properties.clone(),
1746            graph_ctx: self.graph_ctx.clone(),
1747            optional: self.optional,
1748            optional_pattern_vars: self.optional_pattern_vars.clone(),
1749            bound_target_column: self.bound_target_column.clone(),
1750            used_edge_columns: self.used_edge_columns.clone(),
1751            schema: self.schema.clone(),
1752            properties: self.properties.clone(),
1753            metrics: self.metrics.clone(),
1754        }))
1755    }
1756
1757    fn execute(
1758        &self,
1759        partition: usize,
1760        context: Arc<TaskContext>,
1761    ) -> DFResult<SendableRecordBatchStream> {
1762        let input_stream = self.input.execute(partition, context)?;
1763        let metrics = BaselineMetrics::new(&self.metrics, partition);
1764
1765        Ok(Box::pin(GraphTraverseMainStream::new(
1766            input_stream,
1767            self.source_column.clone(),
1768            self.type_names.clone(),
1769            self.direction,
1770            self.target_variable.clone(),
1771            self.edge_variable.clone(),
1772            self.edge_properties.clone(),
1773            self.target_properties.clone(),
1774            self.graph_ctx.clone(),
1775            self.optional,
1776            self.optional_pattern_vars.clone(),
1777            self.bound_target_column.clone(),
1778            self.used_edge_columns.clone(),
1779            self.schema.clone(),
1780            metrics,
1781        )))
1782    }
1783
1784    fn metrics(&self) -> Option<MetricsSet> {
1785        Some(self.metrics.clone_inner())
1786    }
1787}
1788
1789/// Maximum number of distinct source vids pushed into the edge scan; above
1790/// this a whole-type scan is cheaper than a giant `IN` filter (measured: a
1791/// 10k-vid IN over a 100k-edge table is ~2.6x slower than the full scan,
1792/// while small source sets are ~4.5x faster — see `schemaless_traversal`
1793/// bench).
1794const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1795
1796/// State machine for GraphTraverseMainStream.
1797///
1798/// The input is drained FIRST so the distinct source vids can be pushed into
1799/// the edge scan (review perf #5) — without this, a 1-source traversal
1800/// materialized every edge of the type. Memory note: buffering the input does
1801/// not change the operator's memory class; it already materialized the entire
1802/// edge type (and with pushdown materializes strictly less).
1803enum GraphTraverseMainState {
1804    /// Draining the input stream to learn the source-vid set.
1805    CollectingInput {
1806        input_stream: SendableRecordBatchStream,
1807        buffered: Vec<RecordBatch>,
1808    },
1809    /// Loading adjacency map from main edges table.
1810    LoadingEdges {
1811        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1812        buffered: Vec<RecordBatch>,
1813    },
1814    /// Replaying the buffered input batches against the loaded adjacency.
1815    Processing {
1816        adjacency: EdgeAdjacencyMap,
1817        buffered: std::vec::IntoIter<RecordBatch>,
1818    },
1819    /// Stream is done.
1820    Done,
1821}
1822
1823/// Stream that executes schemaless edge traversal.
1824struct GraphTraverseMainStream {
1825    /// Source column name.
1826    source_column: String,
1827
1828    /// Target variable name.
1829    target_variable: String,
1830
1831    /// Edge variable name.
1832    edge_variable: Option<String>,
1833
1834    /// Edge properties to materialize.
1835    edge_properties: Vec<String>,
1836
1837    /// Target properties to materialize.
1838    target_properties: Vec<String>,
1839
1840    /// Graph execution context.
1841    graph_ctx: Arc<GraphExecutionContext>,
1842
1843    /// Whether this is optional (preserve unmatched rows).
1844    optional: bool,
1845
1846    /// Variables introduced by OPTIONAL pattern.
1847    optional_pattern_vars: HashSet<String>,
1848
1849    /// Column name of an already-bound target VID (for filtering).
1850    bound_target_column: Option<String>,
1851
1852    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1853    used_edge_columns: Vec<String>,
1854
1855    /// Output schema.
1856    schema: SchemaRef,
1857
1858    /// Edge type names to traverse (retained for the deferred edge load).
1859    type_names: Vec<String>,
1860
1861    /// Traversal direction (retained for the deferred edge load).
1862    direction: Direction,
1863
1864    /// Stream state.
1865    state: GraphTraverseMainState,
1866
1867    /// Metrics.
1868    metrics: BaselineMetrics,
1869}
1870
1871impl GraphTraverseMainStream {
1872    /// Create a new traverse main stream.
1873    #[expect(clippy::too_many_arguments)]
1874    fn new(
1875        input_stream: SendableRecordBatchStream,
1876        source_column: String,
1877        type_names: Vec<String>,
1878        direction: Direction,
1879        target_variable: String,
1880        edge_variable: Option<String>,
1881        edge_properties: Vec<String>,
1882        target_properties: Vec<String>,
1883        graph_ctx: Arc<GraphExecutionContext>,
1884        optional: bool,
1885        optional_pattern_vars: HashSet<String>,
1886        bound_target_column: Option<String>,
1887        used_edge_columns: Vec<String>,
1888        schema: SchemaRef,
1889        metrics: BaselineMetrics,
1890    ) -> Self {
1891        // The input is drained first (CollectingInput) so the edge load can
1892        // push the bounded source-vid set into the scan.
1893        Self {
1894            source_column,
1895            target_variable,
1896            edge_variable,
1897            edge_properties,
1898            target_properties,
1899            graph_ctx,
1900            optional,
1901            optional_pattern_vars,
1902            bound_target_column,
1903            used_edge_columns,
1904            schema,
1905            type_names,
1906            direction,
1907            state: GraphTraverseMainState::CollectingInput {
1908                input_stream,
1909                buffered: Vec::new(),
1910            },
1911            metrics,
1912        }
1913    }
1914
1915    /// Distinct source vids across the buffered input, or `None` when the set
1916    /// exceeds [`TRAVERSE_PUSHDOWN_MAX_SOURCES`] (whole-type scan is cheaper)
1917    /// or the source column cannot be read (the edge load then behaves exactly
1918    /// as before pushdown existed).
1919    fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1920        let mut vids: HashSet<Vid> = HashSet::new();
1921        for batch in buffered {
1922            let col = batch.column_by_name(&self.source_column)?;
1923            let arr = column_as_vid_array(col.as_ref()).ok()?;
1924            for i in 0..arr.len() {
1925                if !arr.is_null(i) {
1926                    vids.insert(Vid::from(arr.value(i)));
1927                    if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1928                        return None;
1929                    }
1930                }
1931            }
1932        }
1933        Some(vids)
1934    }
1935
1936    /// Expand input batch using adjacency map (synchronous version).
1937    fn expand_batch(
1938        &self,
1939        input: &RecordBatch,
1940        adjacency: &EdgeAdjacencyMap,
1941    ) -> DFResult<RecordBatch> {
1942        // Extract source VIDs from source column
1943        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1944            datafusion::error::DataFusionError::Execution(format!(
1945                "Source column {} not found",
1946                self.source_column
1947            ))
1948        })?;
1949
1950        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1951        let source_vids: &UInt64Array = &source_vid_cow;
1952
1953        // Read bound target VIDs if column exists
1954        let bound_target_cow = self
1955            .bound_target_column
1956            .as_ref()
1957            .and_then(|col| input.column_by_name(col))
1958            .map(|c| column_as_vid_array(c.as_ref()))
1959            .transpose()?;
1960        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1961
1962        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1963        let used_edge_arrays: Vec<&UInt64Array> = self
1964            .used_edge_columns
1965            .iter()
1966            .filter_map(|col| {
1967                input
1968                    .column_by_name(col)
1969                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1970            })
1971            .collect();
1972
1973        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props).
1974        // Type/props are Arc-shared with the adjacency map (pointer clones).
1975        type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1976        let mut expansions: Vec<Expansion> = Vec::new();
1977
1978        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1979            if let Some(src_u64) = src_u64 {
1980                let src_vid = Vid::from(src_u64);
1981
1982                // Collect used edge IDs for this row from all previous hops
1983                let used_eids: HashSet<u64> = used_edge_arrays
1984                    .iter()
1985                    .filter_map(|arr| {
1986                        if arr.is_null(row_idx) {
1987                            None
1988                        } else {
1989                            Some(arr.value(row_idx))
1990                        }
1991                    })
1992                    .collect();
1993
1994                if let Some(neighbors) = adjacency.get(&src_vid) {
1995                    for (target_vid, eid, edge_type, props) in neighbors {
1996                        // Skip edges already used in previous hops (relationship uniqueness)
1997                        if used_eids.contains(&eid.as_u64()) {
1998                            continue;
1999                        }
2000
2001                        // Filter by bound target VID if set (for patterns where target is in scope).
2002                        // Only include traversals where the target matches the expected VID.
2003                        if let Some(targets) = expected_targets {
2004                            if targets.is_null(row_idx) {
2005                                continue;
2006                            }
2007                            let expected_vid = targets.value(row_idx);
2008                            if target_vid.as_u64() != expected_vid {
2009                                continue;
2010                            }
2011                        }
2012
2013                        expansions.push((
2014                            row_idx,
2015                            *target_vid,
2016                            *eid,
2017                            Arc::clone(edge_type),
2018                            Arc::clone(props),
2019                        ));
2020                    }
2021                }
2022            }
2023        }
2024
2025        // Handle OPTIONAL: preserve unmatched rows
2026        if expansions.is_empty() && self.optional {
2027            // No matches - return input with NULL columns appended
2028            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2029            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2030        }
2031
2032        if expansions.is_empty() {
2033            // No matches, not optional - return empty batch
2034            return Ok(RecordBatch::new_empty(self.schema.clone()));
2035        }
2036
2037        // Track matched rows for OPTIONAL handling
2038        let matched_rows: HashSet<usize> = if self.optional {
2039            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2040        } else {
2041            HashSet::new()
2042        };
2043
2044        // Expand input columns using Arrow take()
2045        let mut columns: Vec<ArrayRef> = Vec::new();
2046        let indices: Vec<u64> = expansions
2047            .iter()
2048            .map(|(idx, _, _, _, _)| *idx as u64)
2049            .collect();
2050        let indices_array = UInt64Array::from(indices);
2051
2052        for col in input.columns() {
2053            let expanded = take(col.as_ref(), &indices_array, None)?;
2054            columns.push(expanded);
2055        }
2056
2057        // Add target ._vid column (only if not already in input)
2058        let target_vid_name = format!("{}._vid", self.target_variable);
2059        let target_vids: Vec<u64> = expansions
2060            .iter()
2061            .map(|(_, vid, _, _, _)| vid.as_u64())
2062            .collect();
2063        if input.schema().column_with_name(&target_vid_name).is_none() {
2064            columns.push(Arc::new(UInt64Array::from(target_vids)));
2065        }
2066
2067        // Add target ._labels column (only if not already in input)
2068        let target_labels_name = format!("{}._labels", self.target_variable);
2069        if input
2070            .schema()
2071            .column_with_name(&target_labels_name)
2072            .is_none()
2073        {
2074            use arrow_array::builder::{ListBuilder, StringBuilder};
2075            let l0_ctx = self.graph_ctx.l0_context();
2076            let mut labels_builder = ListBuilder::new(StringBuilder::new());
2077            for (_, target_vid, _, _, _) in &expansions {
2078                let mut row_labels: Vec<String> = Vec::new();
2079                for l0 in l0_ctx.iter_l0_buffers() {
2080                    let guard = l0.read();
2081                    if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2082                        for lbl in l0_labels {
2083                            if !row_labels.contains(lbl) {
2084                                row_labels.push(lbl.clone());
2085                            }
2086                        }
2087                    }
2088                }
2089                let values = labels_builder.values();
2090                for lbl in &row_labels {
2091                    values.append_value(lbl);
2092                }
2093                labels_builder.append(true);
2094            }
2095            columns.push(Arc::new(labels_builder.finish()));
2096        }
2097
2098        // Add edge columns if edge variable is bound.
2099        // For anonymous relationships, emit internal edge IDs for BindPath.
2100        if self.edge_variable.is_some() {
2101            // Add edge ._eid column
2102            let eids: Vec<u64> = expansions
2103                .iter()
2104                .map(|(_, _, eid, _, _)| eid.as_u64())
2105                .collect();
2106            columns.push(Arc::new(UInt64Array::from(eids)));
2107
2108            // Add edge ._type column
2109            {
2110                let mut type_builder = arrow_array::builder::StringBuilder::new();
2111                for (_, _, _, edge_type, _) in &expansions {
2112                    type_builder.append_value(edge_type);
2113                }
2114                columns.push(Arc::new(type_builder.finish()));
2115            }
2116
2117            // Add edge property columns as cv_encoded LargeBinary to preserve types
2118            for prop_name in &self.edge_properties {
2119                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2120                if prop_name == "_all_props" {
2121                    // Serialize all edge properties to a CypherValue blob directly
2122                    // from `Value` so typed values (temporals) are preserved.
2123                    for (_, _, _, _, props) in &expansions {
2124                        if props.is_empty() {
2125                            builder.append_null();
2126                        } else {
2127                            let map: HashMap<String, uni_common::Value> =
2128                                props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2129                            builder.append_value(uni_common::cypher_value_codec::encode(
2130                                &uni_common::Value::Map(map),
2131                            ));
2132                        }
2133                    }
2134                } else {
2135                    // Named property as cv_encoded CypherValue
2136                    for (_, _, _, _, props) in &expansions {
2137                        match props.get(prop_name) {
2138                            Some(uni_common::Value::Null) | None => builder.append_null(),
2139                            Some(val) => {
2140                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2141                            }
2142                        }
2143                    }
2144                }
2145                columns.push(Arc::new(builder.finish()));
2146            }
2147        } else {
2148            let eids: Vec<u64> = expansions
2149                .iter()
2150                .map(|(_, _, eid, _, _)| eid.as_u64())
2151                .collect();
2152            columns.push(Arc::new(UInt64Array::from(eids)));
2153        }
2154
2155        // Add target property columns (hydrate from L0 buffers)
2156        {
2157            let l0_ctx = self.graph_ctx.l0_context();
2158
2159            for prop_name in &self.target_properties {
2160                if prop_name == "_all_props" {
2161                    // Build full CypherValue blob from all L0 vertex properties
2162                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2163                    for (_, target_vid, _, _, _) in &expansions {
2164                        // Merge in `Value` space so typed values (temporals) survive.
2165                        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2166                        for l0 in l0_ctx.iter_l0_buffers() {
2167                            let guard = l0.read();
2168                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2169                                for (k, v) in props.iter() {
2170                                    merged_props.insert(k.clone(), v.clone());
2171                                }
2172                            }
2173                        }
2174                        if merged_props.is_empty() {
2175                            builder.append_null();
2176                        } else {
2177                            builder.append_value(uni_common::cypher_value_codec::encode(
2178                                &uni_common::Value::Map(merged_props),
2179                            ));
2180                        }
2181                    }
2182                    columns.push(Arc::new(builder.finish()));
2183                } else {
2184                    // Extract individual property from L0 and encode as CypherValue
2185                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2186                    for (_, target_vid, _, _, _) in &expansions {
2187                        let mut found = false;
2188                        for l0 in l0_ctx.iter_l0_buffers() {
2189                            let guard = l0.read();
2190                            if let Some(props) = guard.vertex_properties.get(target_vid)
2191                                && let Some(val) = props.get(prop_name.as_str())
2192                                && !val.is_null()
2193                            {
2194                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2195                                found = true;
2196                                break;
2197                            }
2198                        }
2199                        if !found {
2200                            builder.append_null();
2201                        }
2202                    }
2203                    columns.push(Arc::new(builder.finish()));
2204                }
2205            }
2206        }
2207
2208        let matched_batch =
2209            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2210
2211        // Handle OPTIONAL: append unmatched rows with NULLs
2212        if self.optional {
2213            let unmatched = collect_unmatched_optional_group_rows(
2214                input,
2215                &matched_rows,
2216                &self.schema,
2217                &self.optional_pattern_vars,
2218            )?;
2219
2220            if unmatched.is_empty() {
2221                return Ok(matched_batch);
2222            }
2223
2224            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2225                input,
2226                &unmatched,
2227                &self.schema,
2228                &self.optional_pattern_vars,
2229            )?;
2230
2231            // Concatenate matched and unmatched batches
2232            use arrow::compute::concat_batches;
2233            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2234        } else {
2235            Ok(matched_batch)
2236        }
2237    }
2238}
2239
2240impl GraphExecutionContext {
2241    /// Records a schemaless traversal's full edge-type scan into the SSI read-set.
2242    ///
2243    /// `build_edge_adjacency_map` scans every edge of the traversed type(s) via
2244    /// `find_edges_by_type_names` up front, so the transaction's true read
2245    /// footprint is that whole adjacency — not just the neighbourhoods later
2246    /// expanded. Recording it here gives schemaless single-hop and
2247    /// variable-length traversal (which bypass the per-vertex `get_neighbors`
2248    /// path) the same item-level antidependency coverage the schema-typed
2249    /// `record_neighbor_reads` path provides. No-op for read-only transactions
2250    /// (`occ_read_set` is `Some` only for read-write transactions).
2251    fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2252        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2253            return;
2254        };
2255        let guard = tx_l0.read();
2256        let Some(read_set) = &guard.occ_read_set else {
2257            return;
2258        };
2259        let mut rs = read_set.lock();
2260        for (src, neighbors) in adjacency {
2261            rs.vertices.insert(*src);
2262            for (nbr, eid, _type, _props) in neighbors {
2263                rs.vertices.insert(*nbr);
2264                rs.edges.insert(*eid);
2265            }
2266        }
2267    }
2268}
2269
2270/// Build adjacency map from main edges table for given type names and direction.
2271///
2272/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2273/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2274/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2275///
2276/// `source_vids`, when supplied, is pushed into the persisted scan so only the
2277/// sources' incident edges are materialized instead of the whole edge type
2278/// (review perf #5). The L0 overlay below deliberately stays whole-type: L0 is
2279/// small, and unused adjacency entries are never expanded.
2280async fn build_edge_adjacency_map(
2281    graph_ctx: &GraphExecutionContext,
2282    type_names: &[String],
2283    direction: Direction,
2284    source_vids: Option<HashSet<Vid>>,
2285) -> DFResult<EdgeAdjacencyMap> {
2286    let storage = graph_ctx.storage();
2287    let l0_ctx = graph_ctx.l0_context();
2288
2289    // Step 1: Query main edges table for all type names, pushing the bounded
2290    // source set into the scan when present.
2291    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2292    let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2293    let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2294        let side = match direction {
2295            Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2296            Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2297            Direction::Both => uni_store::storage::EndpointSide::Either,
2298        };
2299        (side, vids)
2300    });
2301    let edges_with_type = storage
2302        .find_edges_by_type_names(&type_refs, endpoint_filter)
2303        .await
2304        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2305
2306    // Edge filter matching the pushed-down scan, applied to the L0 overlay
2307    // below so the map (and therefore the SSI read-set recorded from it) has
2308    // ONE consistent footprint across both storage tiers.
2309    let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2310        match &source_vids {
2311            None => true,
2312            Some(set) => match direction {
2313                Direction::Outgoing => set.contains(&src),
2314                Direction::Incoming => set.contains(&dst),
2315                Direction::Both => set.contains(&src) || set.contains(&dst),
2316            },
2317        }
2318    };
2319
2320    // Preserve edge type name in the adjacency map for type(r) support
2321    let mut edges: Vec<(
2322        uni_common::Eid,
2323        uni_common::Vid,
2324        uni_common::Vid,
2325        String,
2326        uni_common::Properties,
2327    )> = edges_with_type.into_iter().collect();
2328
2329    // Step 2: Overlay L0 buffers for all type names
2330    for l0 in l0_ctx.iter_l0_buffers() {
2331        let l0_guard = l0.read();
2332
2333        for type_name in type_names {
2334            let l0_eids = l0_guard.eids_for_type(type_name);
2335
2336            // For each L0 edge, extract its information
2337            for &eid in &l0_eids {
2338                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2339                    let src_vid = edge_ref.src_vid;
2340                    let dst_vid = edge_ref.dst_vid;
2341                    if !edge_in_scope(src_vid, dst_vid) {
2342                        continue;
2343                    }
2344
2345                    // Get properties for this edge from L0
2346                    let props = l0_guard
2347                        .edge_properties
2348                        .get(&eid)
2349                        .cloned()
2350                        .unwrap_or_default();
2351
2352                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2353                }
2354            }
2355        }
2356    }
2357
2358    // Step 3: Deduplicate by EID (L0 takes precedence)
2359    let mut seen_eids = HashSet::new();
2360    let mut unique_edges = Vec::new();
2361    for edge in edges.into_iter().rev() {
2362        if seen_eids.insert(edge.0) {
2363            unique_edges.push(edge);
2364        }
2365    }
2366    unique_edges.reverse();
2367
2368    // Step 4: Filter out edges tombstoned in any L0 buffer
2369    let mut tombstoned_eids = HashSet::new();
2370    for l0 in l0_ctx.iter_l0_buffers() {
2371        let l0_guard = l0.read();
2372        for eid in l0_guard.tombstones.keys() {
2373            tombstoned_eids.insert(*eid);
2374        }
2375    }
2376    if !tombstoned_eids.is_empty() {
2377        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2378    }
2379
2380    // Step 5: Build adjacency map based on direction. Type name and props are
2381    // Arc-shared so the `Both` arm clones two pointers, not the property map.
2382    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2383
2384    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2385        let edge_type: Arc<str> = edge_type.into();
2386        let props = Arc::new(props);
2387        match direction {
2388            Direction::Outgoing => {
2389                adjacency
2390                    .entry(src_vid)
2391                    .or_default()
2392                    .push((dst_vid, eid, edge_type, props));
2393            }
2394            Direction::Incoming => {
2395                adjacency
2396                    .entry(dst_vid)
2397                    .or_default()
2398                    .push((src_vid, eid, edge_type, props));
2399            }
2400            Direction::Both => {
2401                adjacency.entry(src_vid).or_default().push((
2402                    dst_vid,
2403                    eid,
2404                    Arc::clone(&edge_type),
2405                    Arc::clone(&props),
2406                ));
2407                adjacency
2408                    .entry(dst_vid)
2409                    .or_default()
2410                    .push((src_vid, eid, edge_type, props));
2411            }
2412        }
2413    }
2414
2415    // SSI: a schemaless traversal physically scans the edge type above —
2416    // whole-type, or with source pushdown every edge incident to every actual
2417    // source — so record that read footprint for read-write transactions
2418    // (no-op otherwise). With pushdown this records exactly what the
2419    // transaction logically read; the whole-type scan was incidentally
2420    // (not load-bearing) more conservative under the item-level,
2421    // phantoms-out-of-scope read-set contract.
2422    graph_ctx.record_edge_adjacency(&adjacency);
2423
2424    Ok(adjacency)
2425}
2426
2427impl Stream for GraphTraverseMainStream {
2428    type Item = DFResult<RecordBatch>;
2429
2430    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2431        let metrics = self.metrics.clone();
2432        let _timer = metrics.elapsed_compute().timer();
2433        loop {
2434            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2435
2436            match state {
2437                GraphTraverseMainState::CollectingInput {
2438                    mut input_stream,
2439                    mut buffered,
2440                } => match input_stream.poll_next_unpin(cx) {
2441                    Poll::Ready(Some(Ok(batch))) => {
2442                        buffered.push(batch);
2443                        self.state = GraphTraverseMainState::CollectingInput {
2444                            input_stream,
2445                            buffered,
2446                        };
2447                        // Continue loop to keep draining.
2448                    }
2449                    Poll::Ready(Some(Err(e))) => {
2450                        self.state = GraphTraverseMainState::Done;
2451                        return Poll::Ready(Some(Err(e)));
2452                    }
2453                    Poll::Ready(None) => {
2454                        // Input fully drained: kick off the edge load with the
2455                        // bounded source set pushed into the scan (when small).
2456                        let source_vids = self.collect_source_vids(&buffered);
2457                        let loading_ctx = self.graph_ctx.clone();
2458                        let loading_types = self.type_names.clone();
2459                        let direction = self.direction;
2460                        let fut = async move {
2461                            build_edge_adjacency_map(
2462                                &loading_ctx,
2463                                &loading_types,
2464                                direction,
2465                                source_vids,
2466                            )
2467                            .await
2468                        };
2469                        self.state = GraphTraverseMainState::LoadingEdges {
2470                            future: Box::pin(fut),
2471                            buffered,
2472                        };
2473                        // Continue loop to poll the load future.
2474                    }
2475                    Poll::Pending => {
2476                        self.state = GraphTraverseMainState::CollectingInput {
2477                            input_stream,
2478                            buffered,
2479                        };
2480                        return Poll::Pending;
2481                    }
2482                },
2483                GraphTraverseMainState::LoadingEdges {
2484                    mut future,
2485                    buffered,
2486                } => match future.as_mut().poll(cx) {
2487                    Poll::Ready(Ok(adjacency)) => {
2488                        // Move to processing state with loaded adjacency
2489                        self.state = GraphTraverseMainState::Processing {
2490                            adjacency,
2491                            buffered: buffered.into_iter(),
2492                        };
2493                        // Continue loop to start processing
2494                    }
2495                    Poll::Ready(Err(e)) => {
2496                        self.state = GraphTraverseMainState::Done;
2497                        return Poll::Ready(Some(Err(e)));
2498                    }
2499                    Poll::Pending => {
2500                        self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2501                        return Poll::Pending;
2502                    }
2503                },
2504                GraphTraverseMainState::Processing {
2505                    adjacency,
2506                    mut buffered,
2507                } => {
2508                    // Check timeout
2509                    if let Err(e) = self.graph_ctx.check_timeout() {
2510                        return Poll::Ready(Some(Err(
2511                            datafusion::error::DataFusionError::Execution(e.to_string()),
2512                        )));
2513                    }
2514
2515                    match buffered.next() {
2516                        Some(batch) => {
2517                            // Expand batch using adjacency map
2518                            let result = self.expand_batch(&batch, &adjacency);
2519
2520                            self.state = GraphTraverseMainState::Processing {
2521                                adjacency,
2522                                buffered,
2523                            };
2524
2525                            if let Ok(ref r) = result {
2526                                self.metrics.record_output(r.num_rows());
2527                            }
2528                            return Poll::Ready(Some(result));
2529                        }
2530                        None => {
2531                            self.state = GraphTraverseMainState::Done;
2532                            return Poll::Ready(None);
2533                        }
2534                    }
2535                }
2536                GraphTraverseMainState::Done => {
2537                    return Poll::Ready(None);
2538                }
2539            }
2540        }
2541    }
2542}
2543
2544impl RecordBatchStream for GraphTraverseMainStream {
2545    fn schema(&self) -> SchemaRef {
2546        self.schema.clone()
2547    }
2548}
2549
2550/// Variable-length graph traversal execution plan.
2551///
2552/// Performs BFS traversal from source vertices with configurable min/max hops.
2553/// Tracks visited nodes to avoid cycles.
2554///
2555/// # Example
2556///
2557/// ```ignore
2558/// // Find all nodes 1-3 hops away via KNOWS edges
2559/// let traverse = GraphVariableLengthTraverseExec::new(
2560///     input_plan,
2561///     "_vid",
2562///     knows_type_id,
2563///     Direction::Outgoing,
2564///     1,  // min_hops
2565///     3,  // max_hops
2566///     Some("p"), // path variable
2567///     graph_ctx,
2568/// );
2569/// ```
2570pub struct GraphVariableLengthTraverseExec {
2571    /// Input execution plan.
2572    input: Arc<dyn ExecutionPlan>,
2573
2574    /// Column name containing source VIDs.
2575    source_column: String,
2576
2577    /// Edge type IDs to traverse.
2578    edge_type_ids: Vec<u32>,
2579
2580    /// Traversal direction.
2581    direction: Direction,
2582
2583    /// Minimum number of hops.
2584    min_hops: usize,
2585
2586    /// Maximum number of hops.
2587    max_hops: usize,
2588
2589    /// Variable name for target vertex columns.
2590    target_variable: String,
2591
2592    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2593    step_variable: Option<String>,
2594
2595    /// Variable name for path (if path is bound).
2596    path_variable: Option<String>,
2597
2598    /// Target vertex properties to materialize.
2599    target_properties: Vec<String>,
2600
2601    /// Target label name for property type resolution.
2602    target_label_name: Option<String>,
2603
2604    /// Whether this is an optional match (LEFT JOIN semantics).
2605    is_optional: bool,
2606
2607    /// Column name of an already-bound target VID (for patterns where target is in scope).
2608    bound_target_column: Option<String>,
2609
2610    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2611    edge_lance_filter: Option<String>,
2612
2613    /// Simple property equality conditions for per-edge L0 checking during BFS.
2614    /// Each entry is (property_name, expected_value).
2615    edge_property_conditions: Vec<(String, UniValue)>,
2616
2617    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2618    used_edge_columns: Vec<String>,
2619
2620    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2621    path_mode: super::nfa::PathMode,
2622
2623    /// Output mode determining BFS strategy.
2624    output_mode: super::nfa::VlpOutputMode,
2625
2626    /// Compiled NFA for path pattern matching.
2627    nfa: Arc<PathNfa>,
2628
2629    /// Graph execution context.
2630    graph_ctx: Arc<GraphExecutionContext>,
2631
2632    /// Output schema.
2633    schema: SchemaRef,
2634
2635    /// Cached plan properties.
2636    properties: Arc<PlanProperties>,
2637
2638    /// Execution metrics.
2639    metrics: ExecutionPlanMetricsSet,
2640}
2641
2642impl fmt::Debug for GraphVariableLengthTraverseExec {
2643    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2644        f.debug_struct("GraphVariableLengthTraverseExec")
2645            .field("source_column", &self.source_column)
2646            .field("edge_type_ids", &self.edge_type_ids)
2647            .field("direction", &self.direction)
2648            .field("min_hops", &self.min_hops)
2649            .field("max_hops", &self.max_hops)
2650            .field("target_variable", &self.target_variable)
2651            .finish()
2652    }
2653}
2654
2655impl GraphVariableLengthTraverseExec {
2656    /// Create a new variable-length traversal plan.
2657    ///
2658    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2659    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2660    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2661    #[expect(clippy::too_many_arguments)]
2662    pub fn new(
2663        input: Arc<dyn ExecutionPlan>,
2664        source_column: impl Into<String>,
2665        edge_type_ids: Vec<u32>,
2666        direction: Direction,
2667        min_hops: usize,
2668        max_hops: usize,
2669        target_variable: impl Into<String>,
2670        step_variable: Option<String>,
2671        path_variable: Option<String>,
2672        target_properties: Vec<String>,
2673        target_label_name: Option<String>,
2674        graph_ctx: Arc<GraphExecutionContext>,
2675        is_optional: bool,
2676        bound_target_column: Option<String>,
2677        edge_lance_filter: Option<String>,
2678        edge_property_conditions: Vec<(String, UniValue)>,
2679        used_edge_columns: Vec<String>,
2680        path_mode: super::nfa::PathMode,
2681        output_mode: super::nfa::VlpOutputMode,
2682        qpp_nfa: Option<PathNfa>,
2683    ) -> Self {
2684        let source_column = source_column.into();
2685        let target_variable = target_variable.into();
2686
2687        // Resolve target property Arrow types from the schema
2688        let uni_schema = graph_ctx.storage().schema_manager().schema();
2689        let label_props = target_label_name
2690            .as_deref()
2691            .and_then(|ln| uni_schema.properties.get(ln));
2692
2693        // Build output schema
2694        let schema = Self::build_schema(
2695            input.schema(),
2696            &target_variable,
2697            step_variable.as_deref(),
2698            path_variable.as_deref(),
2699            &target_properties,
2700            label_props,
2701        );
2702        let properties = compute_plan_properties(schema.clone());
2703
2704        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2705        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2706            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2707        }));
2708
2709        Self {
2710            input,
2711            source_column,
2712            edge_type_ids,
2713            direction,
2714            min_hops,
2715            max_hops,
2716            target_variable,
2717            step_variable,
2718            path_variable,
2719            target_properties,
2720            target_label_name,
2721            is_optional,
2722            bound_target_column,
2723            edge_lance_filter,
2724            edge_property_conditions,
2725            used_edge_columns,
2726            path_mode,
2727            output_mode,
2728            nfa,
2729            graph_ctx,
2730            schema,
2731            properties,
2732            metrics: ExecutionPlanMetricsSet::new(),
2733        }
2734    }
2735
2736    /// Build output schema.
2737    fn build_schema(
2738        input_schema: SchemaRef,
2739        target_variable: &str,
2740        step_variable: Option<&str>,
2741        path_variable: Option<&str>,
2742        target_properties: &[String],
2743        label_props: Option<
2744            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2745        >,
2746    ) -> SchemaRef {
2747        let mut fields: Vec<Field> = input_schema
2748            .fields()
2749            .iter()
2750            .map(|f| f.as_ref().clone())
2751            .collect();
2752
2753        // Add target VID column (only if not already in input)
2754        let target_vid_name = format!("{}._vid", target_variable);
2755        if input_schema.column_with_name(&target_vid_name).is_none() {
2756            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2757        }
2758
2759        // Add target ._labels column (only if not already in input)
2760        let target_labels_name = format!("{}._labels", target_variable);
2761        if input_schema.column_with_name(&target_labels_name).is_none() {
2762            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2763        }
2764
2765        // Add target vertex property columns (skip if already in input)
2766        for prop_name in target_properties {
2767            let col_name = format!("{}.{}", target_variable, prop_name);
2768            if input_schema.column_with_name(&col_name).is_none() {
2769                let arrow_type = resolve_property_type(prop_name, label_props);
2770                fields.push(Field::new(&col_name, arrow_type, true));
2771            }
2772        }
2773
2774        // Add hop count
2775        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2776
2777        // Add step variable (edge list) if bound
2778        if let Some(step_var) = step_variable {
2779            fields.push(build_edge_list_field(step_var));
2780        }
2781
2782        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2783        if let Some(path_var) = path_variable
2784            && input_schema.column_with_name(path_var).is_none()
2785        {
2786            fields.push(build_path_struct_field(path_var));
2787        }
2788
2789        Arc::new(Schema::new(fields))
2790    }
2791}
2792
2793impl DisplayAs for GraphVariableLengthTraverseExec {
2794    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2795        write!(
2796            f,
2797            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2798            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2799        )
2800    }
2801}
2802
2803impl ExecutionPlan for GraphVariableLengthTraverseExec {
2804    fn name(&self) -> &str {
2805        "GraphVariableLengthTraverseExec"
2806    }
2807
2808    fn as_any(&self) -> &dyn Any {
2809        self
2810    }
2811
2812    fn schema(&self) -> SchemaRef {
2813        self.schema.clone()
2814    }
2815
2816    fn properties(&self) -> &Arc<PlanProperties> {
2817        &self.properties
2818    }
2819
2820    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2821        vec![&self.input]
2822    }
2823
2824    fn with_new_children(
2825        self: Arc<Self>,
2826        children: Vec<Arc<dyn ExecutionPlan>>,
2827    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2828        if children.len() != 1 {
2829            return Err(datafusion::error::DataFusionError::Plan(
2830                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2831            ));
2832        }
2833
2834        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2835        Ok(Arc::new(Self::new(
2836            children[0].clone(),
2837            self.source_column.clone(),
2838            self.edge_type_ids.clone(),
2839            self.direction,
2840            self.min_hops,
2841            self.max_hops,
2842            self.target_variable.clone(),
2843            self.step_variable.clone(),
2844            self.path_variable.clone(),
2845            self.target_properties.clone(),
2846            self.target_label_name.clone(),
2847            self.graph_ctx.clone(),
2848            self.is_optional,
2849            self.bound_target_column.clone(),
2850            self.edge_lance_filter.clone(),
2851            self.edge_property_conditions.clone(),
2852            self.used_edge_columns.clone(),
2853            self.path_mode.clone(),
2854            self.output_mode.clone(),
2855            Some((*self.nfa).clone()),
2856        )))
2857    }
2858
2859    fn execute(
2860        &self,
2861        partition: usize,
2862        context: Arc<TaskContext>,
2863    ) -> DFResult<SendableRecordBatchStream> {
2864        let input_stream = self.input.execute(partition, context)?;
2865
2866        let metrics = BaselineMetrics::new(&self.metrics, partition);
2867
2868        let warm_fut = self
2869            .graph_ctx
2870            .warming_future(self.edge_type_ids.clone(), self.direction);
2871
2872        Ok(Box::pin(GraphVariableLengthTraverseStream {
2873            input: input_stream,
2874            exec: Arc::new(self.clone_for_stream()),
2875            schema: self.schema.clone(),
2876            state: VarLengthStreamState::Warming(warm_fut),
2877            metrics,
2878        }))
2879    }
2880
2881    fn metrics(&self) -> Option<MetricsSet> {
2882        Some(self.metrics.clone_inner())
2883    }
2884}
2885
2886impl GraphVariableLengthTraverseExec {
2887    /// Clone fields needed for stream (avoids cloning the full struct).
2888    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2889        GraphVariableLengthTraverseExecData {
2890            source_column: self.source_column.clone(),
2891            edge_type_ids: self.edge_type_ids.clone(),
2892            direction: self.direction,
2893            min_hops: self.min_hops,
2894            max_hops: self.max_hops,
2895            target_variable: self.target_variable.clone(),
2896            step_variable: self.step_variable.clone(),
2897            path_variable: self.path_variable.clone(),
2898            target_properties: self.target_properties.clone(),
2899            target_label_name: self.target_label_name.clone(),
2900            is_optional: self.is_optional,
2901            bound_target_column: self.bound_target_column.clone(),
2902            edge_lance_filter: self.edge_lance_filter.clone(),
2903            edge_property_conditions: self.edge_property_conditions.clone(),
2904            used_edge_columns: self.used_edge_columns.clone(),
2905            path_mode: self.path_mode.clone(),
2906            output_mode: self.output_mode.clone(),
2907            nfa: self.nfa.clone(),
2908            graph_ctx: self.graph_ctx.clone(),
2909        }
2910    }
2911}
2912
2913/// Data needed by the stream (without ExecutionPlan overhead).
2914#[expect(
2915    dead_code,
2916    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2917)]
2918struct GraphVariableLengthTraverseExecData {
2919    source_column: String,
2920    edge_type_ids: Vec<u32>,
2921    direction: Direction,
2922    min_hops: usize,
2923    max_hops: usize,
2924    target_variable: String,
2925    step_variable: Option<String>,
2926    path_variable: Option<String>,
2927    target_properties: Vec<String>,
2928    target_label_name: Option<String>,
2929    is_optional: bool,
2930    bound_target_column: Option<String>,
2931    #[expect(dead_code, reason = "Used in Phase 3 warming")]
2932    edge_lance_filter: Option<String>,
2933    /// Simple property equality conditions for per-edge L0 checking during BFS.
2934    edge_property_conditions: Vec<(String, UniValue)>,
2935    used_edge_columns: Vec<String>,
2936    path_mode: super::nfa::PathMode,
2937    output_mode: super::nfa::VlpOutputMode,
2938    nfa: Arc<PathNfa>,
2939    graph_ctx: Arc<GraphExecutionContext>,
2940}
2941
2942/// Safety cap for frontier size to prevent OOM on pathological graphs.
2943const MAX_FRONTIER_SIZE: usize = 500_000;
2944/// Safety cap for predecessor pool size.
2945const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2946
2947impl GraphVariableLengthTraverseExecData {
2948    /// Check if a vertex passes the target label filter.
2949    fn check_target_label(&self, vid: Vid) -> bool {
2950        if let Some(ref label_name) = self.target_label_name {
2951            let query_ctx = self.graph_ctx.query_context();
2952            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2953                Some(labels) => labels.contains(label_name),
2954                None => true, // not in L0, trust storage
2955            }
2956        } else {
2957            true
2958        }
2959    }
2960
2961    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
2962    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2963        match constraint {
2964            super::nfa::VertexConstraint::Label(label_name) => {
2965                let query_ctx = self.graph_ctx.query_context();
2966                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2967                    Some(labels) => labels.contains(label_name),
2968                    None => true, // not in L0, trust storage
2969                }
2970            }
2971        }
2972    }
2973
2974    /// Expand neighbors from a vertex through all NFA transitions from the given state.
2975    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
2976    fn expand_neighbors(
2977        &self,
2978        vid: Vid,
2979        state: NfaStateId,
2980        eid_filter: &EidFilter,
2981        used_eids: &FxHashSet<u64>,
2982    ) -> Vec<(Vid, Eid, NfaStateId)> {
2983        let is_undirected = matches!(self.direction, Direction::Both);
2984        let mut results = Vec::new();
2985
2986        for transition in self.nfa.transitions_from(state) {
2987            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2988
2989            for &etype in &transition.edge_type_ids {
2990                for (neighbor, eid) in
2991                    self.graph_ctx
2992                        .get_neighbors(vid, etype, transition.direction)
2993                {
2994                    // Deduplicate edges for undirected patterns
2995                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
2996                        continue;
2997                    }
2998
2999                    // Check EidFilter (edge property bitmap preselection)
3000                    if !eid_filter.contains(eid) {
3001                        continue;
3002                    }
3003
3004                    // Check edge property conditions (L0 in-memory properties)
3005                    if !self.edge_property_conditions.is_empty() {
3006                        let query_ctx = self.graph_ctx.query_context();
3007                        let passes = if let Some(props) =
3008                            l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
3009                        {
3010                            self.edge_property_conditions
3011                                .iter()
3012                                .all(|(name, expected)| {
3013                                    props.get(name).is_some_and(|actual| actual == expected)
3014                                })
3015                        } else {
3016                            // Edge not in L0 (CSR/Lance) — relies on EidFilter
3017                            // for correctness. TODO: build EidFilter from Lance
3018                            // during warming for flushed edges.
3019                            true
3020                        };
3021                        if !passes {
3022                            continue;
3023                        }
3024                    }
3025
3026                    // Check cross-pattern relationship uniqueness
3027                    if used_eids.contains(&eid.as_u64()) {
3028                        continue;
3029                    }
3030
3031                    // Check NFA state constraint on the destination state (QPP label filters)
3032                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
3033                        && !self.check_state_constraint(neighbor, constraint)
3034                    {
3035                        continue;
3036                    }
3037
3038                    results.push((neighbor, eid, transition.to));
3039                }
3040            }
3041        }
3042
3043        results
3044    }
3045
3046    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
3047    ///
3048    /// Returns BFS results in the same format as the old bfs() for compatibility
3049    /// with build_output_batch.
3050    fn bfs_with_dag(
3051        &self,
3052        source: Vid,
3053        eid_filter: &EidFilter,
3054        used_eids: &FxHashSet<u64>,
3055        vid_filter: &VidFilter,
3056    ) -> Vec<BfsResult> {
3057        let nfa = &self.nfa;
3058        let selector = PathSelector::All;
3059        let mut dag = PredecessorDag::new(selector);
3060        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3061
3062        // Handle zero-length paths (min_hops == 0)
3063        if nfa.is_accepting(nfa.start_state())
3064            && self.check_target_label(source)
3065            && vid_filter.contains(source)
3066        {
3067            accepting.push((source, nfa.start_state(), 0));
3068        }
3069
3070        // Per-depth frontier BFS
3071        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3072        let mut depth: u32 = 0;
3073
3074        while !frontier.is_empty() && depth < self.max_hops as u32 {
3075            depth += 1;
3076            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3077            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3078
3079            for &(vid, state) in &frontier {
3080                for (neighbor, eid, dst_state) in
3081                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3082                {
3083                    // Record in predecessor DAG
3084                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3085
3086                    // Add to next frontier (deduplicated per depth)
3087                    if seen_at_depth.insert((neighbor, dst_state)) {
3088                        next_frontier.push((neighbor, dst_state));
3089
3090                        // Check if accepting
3091                        if nfa.is_accepting(dst_state)
3092                            && self.check_target_label(neighbor)
3093                            && vid_filter.contains(neighbor)
3094                        {
3095                            accepting.push((neighbor, dst_state, depth));
3096                        }
3097                    }
3098                }
3099            }
3100
3101            // Safety cap
3102            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3103                break;
3104            }
3105
3106            frontier = next_frontier;
3107        }
3108
3109        // Enumerate paths from DAG to produce BfsResult tuples
3110        let mut results: Vec<BfsResult> = Vec::new();
3111        for &(target, state, depth) in &accepting {
3112            dag.enumerate_paths(
3113                source,
3114                target,
3115                state,
3116                depth,
3117                depth,
3118                &self.path_mode,
3119                &mut |nodes, edges| {
3120                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3121                    std::ops::ControlFlow::Continue(())
3122                },
3123            );
3124        }
3125
3126        results
3127    }
3128
3129    /// NFA-driven BFS returning only endpoints and depths (Mode A).
3130    ///
3131    /// More efficient when no path/step variable is bound — skips full path enumeration.
3132    /// Uses lightweight trail verification via has_trail_valid_path().
3133    fn bfs_endpoints_only(
3134        &self,
3135        source: Vid,
3136        eid_filter: &EidFilter,
3137        used_eids: &FxHashSet<u64>,
3138        vid_filter: &VidFilter,
3139    ) -> Vec<(Vid, u32)> {
3140        let nfa = &self.nfa;
3141        let selector = PathSelector::Any; // Only need existence, not all paths
3142        let mut dag = PredecessorDag::new(selector);
3143        let mut results: Vec<(Vid, u32)> = Vec::new();
3144
3145        // Handle zero-length paths
3146        if nfa.is_accepting(nfa.start_state())
3147            && self.check_target_label(source)
3148            && vid_filter.contains(source)
3149        {
3150            results.push((source, 0));
3151        }
3152
3153        // Per-depth frontier BFS
3154        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3155        let mut depth: u32 = 0;
3156
3157        while !frontier.is_empty() && depth < self.max_hops as u32 {
3158            depth += 1;
3159            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3160            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3161
3162            for &(vid, state) in &frontier {
3163                for (neighbor, eid, dst_state) in
3164                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3165                {
3166                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3167
3168                    if seen_at_depth.insert((neighbor, dst_state)) {
3169                        next_frontier.push((neighbor, dst_state));
3170
3171                        // Check if accepting with trail verification
3172                        if nfa.is_accepting(dst_state)
3173                            && self.check_target_label(neighbor)
3174                            && vid_filter.contains(neighbor)
3175                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3176                        {
3177                            results.push((neighbor, depth));
3178                        }
3179                    }
3180                }
3181            }
3182
3183            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3184                break;
3185            }
3186
3187            frontier = next_frontier;
3188        }
3189
3190        results
3191    }
3192}
3193
3194/// State machine for variable-length traverse stream.
3195enum VarLengthStreamState {
3196    /// Warming adjacency CSRs before first batch.
3197    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3198    /// Processing input batches.
3199    Reading,
3200    /// Materializing target vertex properties asynchronously.
3201    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3202    /// Stream is done.
3203    Done,
3204}
3205
3206/// Stream for variable-length traversal.
3207struct GraphVariableLengthTraverseStream {
3208    input: SendableRecordBatchStream,
3209    exec: Arc<GraphVariableLengthTraverseExecData>,
3210    schema: SchemaRef,
3211    state: VarLengthStreamState,
3212    metrics: BaselineMetrics,
3213}
3214
3215impl Stream for GraphVariableLengthTraverseStream {
3216    type Item = DFResult<RecordBatch>;
3217
3218    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3219        let metrics = self.metrics.clone();
3220        let _timer = metrics.elapsed_compute().timer();
3221        loop {
3222            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3223
3224            match state {
3225                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3226                    Poll::Ready(Ok(())) => {
3227                        self.state = VarLengthStreamState::Reading;
3228                        // Continue loop to start reading
3229                    }
3230                    Poll::Ready(Err(e)) => {
3231                        self.state = VarLengthStreamState::Done;
3232                        return Poll::Ready(Some(Err(e)));
3233                    }
3234                    Poll::Pending => {
3235                        self.state = VarLengthStreamState::Warming(fut);
3236                        return Poll::Pending;
3237                    }
3238                },
3239                VarLengthStreamState::Reading => {
3240                    // Check timeout
3241                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3242                        return Poll::Ready(Some(Err(
3243                            datafusion::error::DataFusionError::Execution(e.to_string()),
3244                        )));
3245                    }
3246
3247                    match self.input.poll_next_unpin(cx) {
3248                        Poll::Ready(Some(Ok(batch))) => {
3249                            // Build base batch synchronously (BFS + expand)
3250                            // TODO(Phase 3.5): Build real EidFilter/VidFilter during warming
3251                            let eid_filter = EidFilter::AllAllowed;
3252                            let vid_filter = VidFilter::AllAllowed;
3253                            let base_result =
3254                                self.process_batch_base(batch, &eid_filter, &vid_filter);
3255                            let base_batch = match base_result {
3256                                Ok(b) => b,
3257                                Err(e) => {
3258                                    self.state = VarLengthStreamState::Reading;
3259                                    return Poll::Ready(Some(Err(e)));
3260                                }
3261                            };
3262
3263                            // If no properties need async hydration, return directly
3264                            if self.exec.target_properties.is_empty() {
3265                                self.state = VarLengthStreamState::Reading;
3266                                return Poll::Ready(Some(Ok(base_batch)));
3267                            }
3268
3269                            // Properties needed — create async future for hydration
3270                            let schema = self.schema.clone();
3271                            let target_variable = self.exec.target_variable.clone();
3272                            let target_properties = self.exec.target_properties.clone();
3273                            let target_label_name = self.exec.target_label_name.clone();
3274                            let graph_ctx = self.exec.graph_ctx.clone();
3275
3276                            let fut = hydrate_vlp_target_properties(
3277                                base_batch,
3278                                schema,
3279                                target_variable,
3280                                target_properties,
3281                                target_label_name,
3282                                graph_ctx,
3283                            );
3284
3285                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3286                            // Continue loop to poll the future
3287                        }
3288                        Poll::Ready(Some(Err(e))) => {
3289                            self.state = VarLengthStreamState::Done;
3290                            return Poll::Ready(Some(Err(e)));
3291                        }
3292                        Poll::Ready(None) => {
3293                            self.state = VarLengthStreamState::Done;
3294                            return Poll::Ready(None);
3295                        }
3296                        Poll::Pending => {
3297                            self.state = VarLengthStreamState::Reading;
3298                            return Poll::Pending;
3299                        }
3300                    }
3301                }
3302                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3303                    Poll::Ready(Ok(batch)) => {
3304                        self.state = VarLengthStreamState::Reading;
3305                        self.metrics.record_output(batch.num_rows());
3306                        return Poll::Ready(Some(Ok(batch)));
3307                    }
3308                    Poll::Ready(Err(e)) => {
3309                        self.state = VarLengthStreamState::Done;
3310                        return Poll::Ready(Some(Err(e)));
3311                    }
3312                    Poll::Pending => {
3313                        self.state = VarLengthStreamState::Materializing(fut);
3314                        return Poll::Pending;
3315                    }
3316                },
3317                VarLengthStreamState::Done => {
3318                    return Poll::Ready(None);
3319                }
3320            }
3321        }
3322    }
3323}
3324
3325impl GraphVariableLengthTraverseStream {
3326    fn process_batch_base(
3327        &self,
3328        batch: RecordBatch,
3329        eid_filter: &EidFilter,
3330        vid_filter: &VidFilter,
3331    ) -> DFResult<RecordBatch> {
3332        let source_col = batch
3333            .column_by_name(&self.exec.source_column)
3334            .ok_or_else(|| {
3335                datafusion::error::DataFusionError::Execution(format!(
3336                    "Source column '{}' not found",
3337                    self.exec.source_column
3338                ))
3339            })?;
3340
3341        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3342        let source_vids: &UInt64Array = &source_vid_cow;
3343
3344        // Read bound target VIDs if column exists
3345        let bound_target_cow = self
3346            .exec
3347            .bound_target_column
3348            .as_ref()
3349            .and_then(|col| batch.column_by_name(col))
3350            .map(|c| column_as_vid_array(c.as_ref()))
3351            .transpose()?;
3352        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3353
3354        // Extract used edge columns for cross-pattern relationship uniqueness
3355        let used_edge_arrays: Vec<&UInt64Array> = self
3356            .exec
3357            .used_edge_columns
3358            .iter()
3359            .filter_map(|col| {
3360                batch
3361                    .column_by_name(col)?
3362                    .as_any()
3363                    .downcast_ref::<UInt64Array>()
3364            })
3365            .collect();
3366
3367        // Collect all BFS results
3368        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3369
3370        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3371            let mut emitted_for_row = false;
3372
3373            if let Some(src) = source_vid {
3374                let vid = Vid::from(src);
3375
3376                // Collect used edge IDs from previous hops for this row
3377                let used_eids: FxHashSet<u64> = used_edge_arrays
3378                    .iter()
3379                    .filter_map(|arr| {
3380                        if arr.is_null(row_idx) {
3381                            None
3382                        } else {
3383                            Some(arr.value(row_idx))
3384                        }
3385                    })
3386                    .collect();
3387
3388                // Dispatch to appropriate BFS mode based on output_mode
3389                match &self.exec.output_mode {
3390                    VlpOutputMode::EndpointsOnly => {
3391                        let endpoints = self
3392                            .exec
3393                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3394                        for (target, depth) in endpoints {
3395                            // Filter by bound target VID
3396                            if let Some(targets) = expected_targets {
3397                                if targets.is_null(row_idx) {
3398                                    continue;
3399                                }
3400                                if target.as_u64() != targets.value(row_idx) {
3401                                    continue;
3402                                }
3403                            }
3404                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3405                            emitted_for_row = true;
3406                        }
3407                    }
3408                    _ => {
3409                        // FullPath, StepVariable, CountOnly, etc.
3410                        let bfs_results = self
3411                            .exec
3412                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3413                        for (target, hop_count, node_path, edge_path) in bfs_results {
3414                            // Filter by bound target VID
3415                            if let Some(targets) = expected_targets {
3416                                if targets.is_null(row_idx) {
3417                                    continue;
3418                                }
3419                                if target.as_u64() != targets.value(row_idx) {
3420                                    continue;
3421                                }
3422                            }
3423                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3424                            emitted_for_row = true;
3425                        }
3426                    }
3427                }
3428            }
3429
3430            if self.exec.is_optional && !emitted_for_row {
3431                // Preserve the source row with NULL optional bindings.
3432                // We use empty node/edge paths to mark unmatched rows.
3433                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3434            }
3435        }
3436
3437        self.build_output_batch(&batch, &expansions)
3438    }
3439
3440    fn build_output_batch(
3441        &self,
3442        input: &RecordBatch,
3443        expansions: &[VarLengthExpansion],
3444    ) -> DFResult<RecordBatch> {
3445        if expansions.is_empty() {
3446            return Ok(RecordBatch::new_empty(self.schema.clone()));
3447        }
3448
3449        let num_rows = expansions.len();
3450
3451        // Build index array
3452        let indices: Vec<u64> = expansions
3453            .iter()
3454            .map(|(idx, _, _, _, _)| *idx as u64)
3455            .collect();
3456        let indices_array = UInt64Array::from(indices);
3457
3458        // Expand input columns
3459        let mut columns: Vec<ArrayRef> = Vec::new();
3460        for col in input.columns() {
3461            let expanded = take(col.as_ref(), &indices_array, None)?;
3462            columns.push(expanded);
3463        }
3464
3465        // Collect target VIDs and unmatched markers for use in multiple places.
3466        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3467        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3468        let unmatched_rows: Vec<bool> = expansions
3469            .iter()
3470            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3471            .collect();
3472        let target_vids: Vec<Option<u64>> = expansions
3473            .iter()
3474            .zip(unmatched_rows.iter())
3475            .map(
3476                |((_, vid, _, _, _), unmatched)| {
3477                    if *unmatched { None } else { Some(vid.as_u64()) }
3478                },
3479            )
3480            .collect();
3481
3482        // Add target VID column (only if not already in input)
3483        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3484        if input.schema().column_with_name(&target_vid_name).is_none() {
3485            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3486        }
3487
3488        // Add target ._labels column (only if not already in input)
3489        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3490        if input
3491            .schema()
3492            .column_with_name(&target_labels_name)
3493            .is_none()
3494        {
3495            use arrow_array::builder::{ListBuilder, StringBuilder};
3496            let query_ctx = self.exec.graph_ctx.query_context();
3497            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3498            for target_vid in &target_vids {
3499                let Some(vid_u64) = target_vid else {
3500                    labels_builder.append(false);
3501                    continue;
3502                };
3503                let vid = Vid::from(*vid_u64);
3504                let row_labels: Vec<String> =
3505                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3506                        Some(labels) => {
3507                            // Vertex is in L0 — use actual labels only
3508                            labels
3509                        }
3510                        None => {
3511                            // Vertex not in L0 — trust schema label (storage already filtered)
3512                            if let Some(ref label_name) = self.exec.target_label_name {
3513                                vec![label_name.clone()]
3514                            } else {
3515                                vec![]
3516                            }
3517                        }
3518                    };
3519                let values = labels_builder.values();
3520                for lbl in &row_labels {
3521                    values.append_value(lbl);
3522                }
3523                labels_builder.append(true);
3524            }
3525            columns.push(Arc::new(labels_builder.finish()));
3526        }
3527
3528        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3529        for prop_name in &self.exec.target_properties {
3530            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3531            if input.schema().column_with_name(&full_prop_name).is_none() {
3532                let col_idx = columns.len();
3533                if col_idx < self.schema.fields().len() {
3534                    let field = self.schema.field(col_idx);
3535                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3536                }
3537            }
3538        }
3539
3540        // Add hop count column
3541        let hop_counts: Vec<u64> = expansions
3542            .iter()
3543            .map(|(_, _, hops, _, _)| *hops as u64)
3544            .collect();
3545        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3546
3547        // Add step variable (edge list) column if bound
3548        if self.exec.step_variable.is_some() {
3549            let mut edges_builder = new_edge_list_builder();
3550            let query_ctx = self.exec.graph_ctx.query_context();
3551
3552            for (_, _, _, node_path, edge_path) in expansions {
3553                if node_path.is_empty() && edge_path.is_empty() {
3554                    // Null row for OPTIONAL MATCH unmatched
3555                    edges_builder.append_null();
3556                } else if edge_path.is_empty() {
3557                    // Zero-hop match: empty list
3558                    edges_builder.append(true);
3559                } else {
3560                    for (i, eid) in edge_path.iter().enumerate() {
3561                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3562                            .unwrap_or_else(|| "UNKNOWN".to_string());
3563                        // Report the relationship's STORED direction, not the
3564                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
3565                        // Resolves flushed (L1-resident) edges too, where the L0
3566                        // chain no longer holds the stored endpoints.
3567                        let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3568                            *eid,
3569                            node_path[i],
3570                            node_path[i + 1],
3571                            &self.exec.edge_type_ids,
3572                        );
3573                        append_edge_to_struct(
3574                            edges_builder.values(),
3575                            *eid,
3576                            &type_name,
3577                            src,
3578                            dst,
3579                            &query_ctx,
3580                        );
3581                    }
3582                    edges_builder.append(true);
3583                }
3584            }
3585
3586            columns.push(Arc::new(edges_builder.finish()));
3587        }
3588
3589        // Add path variable column if bound.
3590        // For named paths, we output a Path struct with nodes and relationships arrays.
3591        // If a path column already exists in input (from a prior BindFixedPath), extend it
3592        // rather than building from scratch.
3593        if let Some(path_var_name) = &self.exec.path_variable {
3594            let existing_path_col_idx = input
3595                .schema()
3596                .column_with_name(path_var_name)
3597                .map(|(idx, _)| idx);
3598            // Clone the Arc so we can read existing path without borrowing `columns`
3599            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3600            let existing_path = existing_path_arc
3601                .as_ref()
3602                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3603
3604            let mut nodes_builder = new_node_list_builder();
3605            let mut rels_builder = new_edge_list_builder();
3606            let query_ctx = self.exec.graph_ctx.query_context();
3607            let mut path_validity = Vec::with_capacity(expansions.len());
3608
3609            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3610                if node_path.is_empty() && edge_path.is_empty() {
3611                    nodes_builder.append(false);
3612                    rels_builder.append(false);
3613                    path_validity.push(false);
3614                    continue;
3615                }
3616
3617                // Prepend existing path prefix if extending
3618                let skip_first_vlp_node = if let Some(existing) = existing_path {
3619                    if !existing.is_null(row_out_idx) {
3620                        prepend_existing_path(
3621                            existing,
3622                            row_out_idx,
3623                            &mut nodes_builder,
3624                            &mut rels_builder,
3625                            &query_ctx,
3626                        );
3627                        true
3628                    } else {
3629                        false
3630                    }
3631                } else {
3632                    false
3633                };
3634
3635                // Append VLP nodes (skip first if extending — it's the junction point)
3636                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3637                for vid in &node_path[start_idx..] {
3638                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3639                }
3640                nodes_builder.append(true);
3641
3642                for (i, eid) in edge_path.iter().enumerate() {
3643                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3644                        .unwrap_or_else(|| "UNKNOWN".to_string());
3645                    // Report the relationship's STORED direction, not the traversal
3646                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
3647                    // (L1-resident) edges too, where the L0 chain no longer holds
3648                    // the stored endpoints.
3649                    let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3650                        *eid,
3651                        node_path[i],
3652                        node_path[i + 1],
3653                        &self.exec.edge_type_ids,
3654                    );
3655                    append_edge_to_struct(
3656                        rels_builder.values(),
3657                        *eid,
3658                        &type_name,
3659                        src,
3660                        dst,
3661                        &query_ctx,
3662                    );
3663                }
3664                rels_builder.append(true);
3665                path_validity.push(true);
3666            }
3667
3668            // Finish builders and get ListArrays
3669            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3670            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3671
3672            // Build the path struct fields
3673            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3674            let rels_field = Arc::new(Field::new(
3675                "relationships",
3676                rels_array.data_type().clone(),
3677                true,
3678            ));
3679
3680            // Create the path struct array
3681            let path_struct = arrow_array::StructArray::try_new(
3682                vec![nodes_field, rels_field].into(),
3683                vec![nodes_array, rels_array],
3684                Some(arrow::buffer::NullBuffer::from(path_validity)),
3685            )
3686            .map_err(arrow_err)?;
3687
3688            if let Some(idx) = existing_path_col_idx {
3689                columns[idx] = Arc::new(path_struct);
3690            } else {
3691                columns.push(Arc::new(path_struct));
3692            }
3693        }
3694
3695        self.metrics.record_output(num_rows);
3696
3697        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3698    }
3699}
3700
3701impl RecordBatchStream for GraphVariableLengthTraverseStream {
3702    fn schema(&self) -> SchemaRef {
3703        self.schema.clone()
3704    }
3705}
3706
3707/// Hydrate target vertex properties into a VLP batch.
3708///
3709/// The base batch already has null placeholder columns for target properties.
3710/// This function replaces them with actual property values fetched from storage.
3711async fn hydrate_vlp_target_properties(
3712    base_batch: RecordBatch,
3713    schema: SchemaRef,
3714    target_variable: String,
3715    target_properties: Vec<String>,
3716    target_label_name: Option<String>,
3717    graph_ctx: Arc<GraphExecutionContext>,
3718) -> DFResult<RecordBatch> {
3719    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3720        return Ok(base_batch);
3721    }
3722
3723    // Find the target VID column by exact name.
3724    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3725    //
3726    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3727    // clauses referencing the same variable), there may be duplicate column names. We need
3728    // the LAST occurrence of target._vid, which is the one added by the VLP.
3729    let target_vid_col_name = format!("{}._vid", target_variable);
3730    let vid_col_idx = schema
3731        .fields()
3732        .iter()
3733        .enumerate()
3734        .rev()
3735        .find(|(_, f)| f.name() == &target_vid_col_name)
3736        .map(|(i, _)| i);
3737
3738    let Some(vid_col_idx) = vid_col_idx else {
3739        return Ok(base_batch);
3740    };
3741
3742    let vid_col = base_batch.column(vid_col_idx);
3743    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3744    let target_vid_array: &UInt64Array = &target_vid_cow;
3745
3746    let target_vids: Vec<Vid> = target_vid_array
3747        .iter()
3748        // Preserve null rows by mapping them to a sentinel VID that never resolves
3749        // to stored properties. The output property columns remain NULL for these rows.
3750        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3751        .collect();
3752
3753    // Fetch properties from storage
3754    let mut property_columns: Vec<ArrayRef> = Vec::new();
3755
3756    if let Some(ref label_name) = target_label_name {
3757        let property_manager = graph_ctx.property_manager();
3758        let query_ctx = graph_ctx.query_context();
3759
3760        let props_map = property_manager
3761            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3762            .await
3763            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3764
3765        let uni_schema = graph_ctx.storage().schema_manager().schema();
3766        let label_props = uni_schema.properties.get(label_name.as_str());
3767
3768        for prop_name in &target_properties {
3769            let data_type = resolve_property_type(prop_name, label_props);
3770            let column =
3771                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3772            property_columns.push(column);
3773        }
3774    } else {
3775        // No label name — use label-agnostic property lookup.
3776        // This scans all label datasets, slower but correct for label-less traversals.
3777        let non_internal_props: Vec<&str> = target_properties
3778            .iter()
3779            .filter(|p| *p != "_all_props")
3780            .map(|s| s.as_str())
3781            .collect();
3782        let property_manager = graph_ctx.property_manager();
3783        let query_ctx = graph_ctx.query_context();
3784
3785        let props_map = if !non_internal_props.is_empty() {
3786            property_manager
3787                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3788                .await
3789                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3790        } else {
3791            std::collections::HashMap::new()
3792        };
3793
3794        for prop_name in &target_properties {
3795            if prop_name == "_all_props" {
3796                // Build CypherValue blob from all vertex properties (L0 + storage),
3797                // staying in `Value` space so typed values (temporals) are preserved.
3798                use arrow_array::builder::LargeBinaryBuilder;
3799
3800                let mut builder = LargeBinaryBuilder::new();
3801                let l0_ctx = graph_ctx.l0_context();
3802                for vid in &target_vids {
3803                    let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3804                    // Collect from storage-hydrated props
3805                    if let Some(vid_props) = props_map.get(vid) {
3806                        for (k, v) in vid_props.iter() {
3807                            merged_props.insert(k.clone(), v.clone());
3808                        }
3809                    }
3810                    // Overlay L0 properties
3811                    for l0 in l0_ctx.iter_l0_buffers() {
3812                        let guard = l0.read();
3813                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3814                            for (k, v) in l0_props.iter() {
3815                                merged_props.insert(k.clone(), v.clone());
3816                            }
3817                        }
3818                    }
3819                    if merged_props.is_empty() {
3820                        builder.append_null();
3821                    } else {
3822                        builder.append_value(uni_common::cypher_value_codec::encode(
3823                            &uni_common::Value::Map(merged_props),
3824                        ));
3825                    }
3826                }
3827                property_columns.push(Arc::new(builder.finish()));
3828            } else {
3829                let column = build_property_column_static(
3830                    &target_vids,
3831                    &props_map,
3832                    prop_name,
3833                    &arrow::datatypes::DataType::LargeBinary,
3834                )?;
3835                property_columns.push(column);
3836            }
3837        }
3838    }
3839
3840    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3841    // Find each property column by name — works regardless of column ordering
3842    // (schema-aware puts props before _hop_count; schemaless puts them after).
3843    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3844    // not pre-existing input columns with the same name (duplicate variable binding).
3845    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3846    let mut prop_idx = 0;
3847    for (col_idx, field) in schema.fields().iter().enumerate() {
3848        let is_target_prop = col_idx > vid_col_idx
3849            && target_properties
3850                .iter()
3851                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3852        if is_target_prop && prop_idx < property_columns.len() {
3853            new_columns.push(property_columns[prop_idx].clone());
3854            prop_idx += 1;
3855        } else {
3856            new_columns.push(base_batch.column(col_idx).clone());
3857        }
3858    }
3859
3860    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3861}
3862
3863// ============================================================================
3864// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3865// ============================================================================
3866
3867/// Execution plan for variable-length path traversal on schemaless edge types.
3868///
3869/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3870/// that don't have schema-defined IDs. It queries the main edges table by type name.
3871/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3872pub struct GraphVariableLengthTraverseMainExec {
3873    /// Input execution plan.
3874    input: Arc<dyn ExecutionPlan>,
3875
3876    /// Column name containing source VIDs.
3877    source_column: String,
3878
3879    /// Edge type names (not IDs, since schemaless types may not have IDs).
3880    type_names: Vec<String>,
3881
3882    /// Traversal direction.
3883    direction: Direction,
3884
3885    /// Minimum number of hops.
3886    min_hops: usize,
3887
3888    /// Maximum number of hops.
3889    max_hops: usize,
3890
3891    /// Variable name for target vertex columns.
3892    target_variable: String,
3893
3894    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3895    step_variable: Option<String>,
3896
3897    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3898    path_variable: Option<String>,
3899
3900    /// Target vertex properties to materialize.
3901    target_properties: Vec<String>,
3902
3903    /// Whether this is an optional match (LEFT JOIN semantics).
3904    is_optional: bool,
3905
3906    /// Column name of an already-bound target VID (for patterns where target is in scope).
3907    bound_target_column: Option<String>,
3908
3909    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
3910    edge_lance_filter: Option<String>,
3911
3912    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
3913    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
3914    edge_property_conditions: Vec<(String, UniValue)>,
3915
3916    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
3917    used_edge_columns: Vec<String>,
3918
3919    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
3920    path_mode: super::nfa::PathMode,
3921
3922    /// Output mode determining BFS strategy.
3923    output_mode: super::nfa::VlpOutputMode,
3924
3925    /// Graph execution context.
3926    graph_ctx: Arc<GraphExecutionContext>,
3927
3928    /// Output schema.
3929    schema: SchemaRef,
3930
3931    /// Cached plan properties.
3932    properties: Arc<PlanProperties>,
3933
3934    /// Execution metrics.
3935    metrics: ExecutionPlanMetricsSet,
3936}
3937
3938impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3939    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3940        f.debug_struct("GraphVariableLengthTraverseMainExec")
3941            .field("source_column", &self.source_column)
3942            .field("type_names", &self.type_names)
3943            .field("direction", &self.direction)
3944            .field("min_hops", &self.min_hops)
3945            .field("max_hops", &self.max_hops)
3946            .field("target_variable", &self.target_variable)
3947            .finish()
3948    }
3949}
3950
3951impl GraphVariableLengthTraverseMainExec {
3952    /// Create a new variable-length traversal plan for schemaless edges.
3953    #[expect(clippy::too_many_arguments)]
3954    pub fn new(
3955        input: Arc<dyn ExecutionPlan>,
3956        source_column: impl Into<String>,
3957        type_names: Vec<String>,
3958        direction: Direction,
3959        min_hops: usize,
3960        max_hops: usize,
3961        target_variable: impl Into<String>,
3962        step_variable: Option<String>,
3963        path_variable: Option<String>,
3964        target_properties: Vec<String>,
3965        graph_ctx: Arc<GraphExecutionContext>,
3966        is_optional: bool,
3967        bound_target_column: Option<String>,
3968        edge_lance_filter: Option<String>,
3969        edge_property_conditions: Vec<(String, UniValue)>,
3970        used_edge_columns: Vec<String>,
3971        path_mode: super::nfa::PathMode,
3972        output_mode: super::nfa::VlpOutputMode,
3973    ) -> Self {
3974        let source_column = source_column.into();
3975        let target_variable = target_variable.into();
3976
3977        // Build output schema
3978        let schema = Self::build_schema(
3979            input.schema(),
3980            &target_variable,
3981            step_variable.as_deref(),
3982            path_variable.as_deref(),
3983            &target_properties,
3984        );
3985        let properties = compute_plan_properties(schema.clone());
3986
3987        Self {
3988            input,
3989            source_column,
3990            type_names,
3991            direction,
3992            min_hops,
3993            max_hops,
3994            target_variable,
3995            step_variable,
3996            path_variable,
3997            target_properties,
3998            is_optional,
3999            bound_target_column,
4000            edge_lance_filter,
4001            edge_property_conditions,
4002            used_edge_columns,
4003            path_mode,
4004            output_mode,
4005            graph_ctx,
4006            schema,
4007            properties,
4008            metrics: ExecutionPlanMetricsSet::new(),
4009        }
4010    }
4011
4012    /// Build output schema.
4013    fn build_schema(
4014        input_schema: SchemaRef,
4015        target_variable: &str,
4016        step_variable: Option<&str>,
4017        path_variable: Option<&str>,
4018        target_properties: &[String],
4019    ) -> SchemaRef {
4020        let mut fields: Vec<Field> = input_schema
4021            .fields()
4022            .iter()
4023            .map(|f| f.as_ref().clone())
4024            .collect();
4025
4026        // Add target VID column (only if not already in input)
4027        let target_vid_name = format!("{}._vid", target_variable);
4028        if input_schema.column_with_name(&target_vid_name).is_none() {
4029            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4030        }
4031
4032        // Add target ._labels column (only if not already in input)
4033        let target_labels_name = format!("{}._labels", target_variable);
4034        if input_schema.column_with_name(&target_labels_name).is_none() {
4035            fields.push(Field::new(target_labels_name, labels_data_type(), true));
4036        }
4037
4038        // Add hop count
4039        fields.push(Field::new("_hop_count", DataType::UInt64, false));
4040
4041        // Add step variable column (list of edge structs) if bound
4042        // This is the relationship variable like `r` in `[r*1..3]`
4043        if let Some(step_var) = step_variable {
4044            fields.push(build_edge_list_field(step_var));
4045        }
4046
4047        // Add path struct if bound (only if not already in input from prior BindFixedPath)
4048        if let Some(path_var) = path_variable
4049            && input_schema.column_with_name(path_var).is_none()
4050        {
4051            fields.push(build_path_struct_field(path_var));
4052        }
4053
4054        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
4055        // Skip properties that are already in the input schema
4056        for prop in target_properties {
4057            let prop_name = format!("{}.{}", target_variable, prop);
4058            if input_schema.column_with_name(&prop_name).is_none() {
4059                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4060            }
4061        }
4062
4063        Arc::new(Schema::new(fields))
4064    }
4065}
4066
4067impl DisplayAs for GraphVariableLengthTraverseMainExec {
4068    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4069        write!(
4070            f,
4071            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4072            self.source_column, self.type_names, self.min_hops, self.max_hops
4073        )
4074    }
4075}
4076
4077impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4078    fn name(&self) -> &str {
4079        "GraphVariableLengthTraverseMainExec"
4080    }
4081
4082    fn as_any(&self) -> &dyn Any {
4083        self
4084    }
4085
4086    fn schema(&self) -> SchemaRef {
4087        self.schema.clone()
4088    }
4089
4090    fn properties(&self) -> &Arc<PlanProperties> {
4091        &self.properties
4092    }
4093
4094    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4095        vec![&self.input]
4096    }
4097
4098    fn with_new_children(
4099        self: Arc<Self>,
4100        children: Vec<Arc<dyn ExecutionPlan>>,
4101    ) -> DFResult<Arc<dyn ExecutionPlan>> {
4102        if children.len() != 1 {
4103            return Err(datafusion::error::DataFusionError::Plan(
4104                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4105            ));
4106        }
4107
4108        Ok(Arc::new(Self::new(
4109            children[0].clone(),
4110            self.source_column.clone(),
4111            self.type_names.clone(),
4112            self.direction,
4113            self.min_hops,
4114            self.max_hops,
4115            self.target_variable.clone(),
4116            self.step_variable.clone(),
4117            self.path_variable.clone(),
4118            self.target_properties.clone(),
4119            self.graph_ctx.clone(),
4120            self.is_optional,
4121            self.bound_target_column.clone(),
4122            self.edge_lance_filter.clone(),
4123            self.edge_property_conditions.clone(),
4124            self.used_edge_columns.clone(),
4125            self.path_mode.clone(),
4126            self.output_mode.clone(),
4127        )))
4128    }
4129
4130    fn execute(
4131        &self,
4132        partition: usize,
4133        context: Arc<TaskContext>,
4134    ) -> DFResult<SendableRecordBatchStream> {
4135        let input_stream = self.input.execute(partition, context)?;
4136        let metrics = BaselineMetrics::new(&self.metrics, partition);
4137
4138        // Build adjacency map from main edges table (async)
4139        let graph_ctx = self.graph_ctx.clone();
4140        let type_names = self.type_names.clone();
4141        let direction = self.direction;
4142        let load_fut = async move {
4143            // VLP frontier is unknown ahead of time: no source pushdown (whole-type).
4144            build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4145        };
4146
4147        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4148            input: input_stream,
4149            source_column: self.source_column.clone(),
4150            type_names: self.type_names.clone(),
4151            direction: self.direction,
4152            min_hops: self.min_hops,
4153            max_hops: self.max_hops,
4154            target_variable: self.target_variable.clone(),
4155            step_variable: self.step_variable.clone(),
4156            path_variable: self.path_variable.clone(),
4157            target_properties: self.target_properties.clone(),
4158            graph_ctx: self.graph_ctx.clone(),
4159            is_optional: self.is_optional,
4160            bound_target_column: self.bound_target_column.clone(),
4161            edge_lance_filter: self.edge_lance_filter.clone(),
4162            edge_property_conditions: self.edge_property_conditions.clone(),
4163            used_edge_columns: self.used_edge_columns.clone(),
4164            path_mode: self.path_mode.clone(),
4165            output_mode: self.output_mode.clone(),
4166            schema: self.schema.clone(),
4167            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4168            metrics,
4169        }))
4170    }
4171
4172    fn metrics(&self) -> Option<MetricsSet> {
4173        Some(self.metrics.clone_inner())
4174    }
4175}
4176
4177/// State machine for VLP schemaless stream.
4178enum VarLengthMainStreamState {
4179    /// Loading adjacency map from main edges table.
4180    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4181    /// Processing input batches with loaded adjacency.
4182    Processing(EdgeAdjacencyMap),
4183    /// Materializing properties for a batch.
4184    Materializing {
4185        adjacency: EdgeAdjacencyMap,
4186        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4187    },
4188    /// Stream is done.
4189    Done,
4190}
4191
4192/// Stream for variable-length traversal on schemaless edges.
4193#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4194struct GraphVariableLengthTraverseMainStream {
4195    input: SendableRecordBatchStream,
4196    source_column: String,
4197    type_names: Vec<String>,
4198    direction: Direction,
4199    min_hops: usize,
4200    max_hops: usize,
4201    target_variable: String,
4202    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4203    step_variable: Option<String>,
4204    path_variable: Option<String>,
4205    target_properties: Vec<String>,
4206    graph_ctx: Arc<GraphExecutionContext>,
4207    is_optional: bool,
4208    bound_target_column: Option<String>,
4209    edge_lance_filter: Option<String>,
4210    /// Edge property conditions to check during BFS.
4211    edge_property_conditions: Vec<(String, UniValue)>,
4212    used_edge_columns: Vec<String>,
4213    path_mode: super::nfa::PathMode,
4214    output_mode: super::nfa::VlpOutputMode,
4215    schema: SchemaRef,
4216    state: VarLengthMainStreamState,
4217    metrics: BaselineMetrics,
4218}
4219
4220/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4221type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4222
4223impl GraphVariableLengthTraverseMainStream {
4224    /// Perform BFS from a source vertex using the adjacency map.
4225    ///
4226    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4227    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4228    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4229    fn bfs(
4230        &self,
4231        source: Vid,
4232        adjacency: &EdgeAdjacencyMap,
4233        used_eids: &FxHashSet<u64>,
4234    ) -> Vec<MainBfsResult> {
4235        let mut results = Vec::new();
4236        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4237
4238        queue.push_back((source, 0, vec![source], vec![]));
4239
4240        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4241            // Emit result if within hop range (including zero-length patterns)
4242            if depth >= self.min_hops && depth <= self.max_hops {
4243                results.push((current, depth, node_path.clone(), edge_path.clone()));
4244            }
4245
4246            // Stop if at max depth
4247            if depth >= self.max_hops {
4248                continue;
4249            }
4250
4251            // Get neighbors from adjacency map
4252            if let Some(neighbors) = adjacency.get(&current) {
4253                let is_undirected = matches!(self.direction, Direction::Both);
4254                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4255
4256                for (neighbor, eid, _edge_type, props) in neighbors {
4257                    // Deduplicate edges for undirected patterns
4258                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4259                        continue;
4260                    }
4261
4262                    // Enforce relationship uniqueness per-path (Cypher semantics).
4263                    if edge_path.contains(eid) {
4264                        continue;
4265                    }
4266
4267                    // Enforce cross-pattern relationship uniqueness: skip edges
4268                    // already bound by earlier pattern elements in the same MATCH.
4269                    if used_eids.contains(&eid.as_u64()) {
4270                        continue;
4271                    }
4272
4273                    // Check edge property conditions (e.g., {year: 1988}).
4274                    if !self.edge_property_conditions.is_empty() {
4275                        let passes =
4276                            self.edge_property_conditions
4277                                .iter()
4278                                .all(|(name, expected)| {
4279                                    props.get(name).is_some_and(|actual| actual == expected)
4280                                });
4281                        if !passes {
4282                            continue;
4283                        }
4284                    }
4285
4286                    let mut new_node_path = node_path.clone();
4287                    new_node_path.push(*neighbor);
4288                    let mut new_edge_path = edge_path.clone();
4289                    new_edge_path.push(*eid);
4290                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4291                }
4292            }
4293        }
4294
4295        results
4296    }
4297
4298    /// Process a batch using the adjacency map.
4299    fn process_batch(
4300        &self,
4301        batch: RecordBatch,
4302        adjacency: &EdgeAdjacencyMap,
4303    ) -> DFResult<RecordBatch> {
4304        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4305            datafusion::error::DataFusionError::Execution(format!(
4306                "Source column '{}' not found in input batch",
4307                self.source_column
4308            ))
4309        })?;
4310
4311        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4312        let source_vids: &UInt64Array = &source_vid_cow;
4313
4314        // Read bound target VIDs if column exists
4315        let bound_target_cow = self
4316            .bound_target_column
4317            .as_ref()
4318            .and_then(|col| batch.column_by_name(col))
4319            .map(|c| column_as_vid_array(c.as_ref()))
4320            .transpose()?;
4321        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4322
4323        // Extract used edge columns for cross-pattern relationship uniqueness
4324        let used_edge_arrays: Vec<&UInt64Array> = self
4325            .used_edge_columns
4326            .iter()
4327            .filter_map(|col| {
4328                batch
4329                    .column_by_name(col)?
4330                    .as_any()
4331                    .downcast_ref::<UInt64Array>()
4332            })
4333            .collect();
4334
4335        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4336        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4337
4338        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4339            let mut emitted_for_row = false;
4340
4341            if let Some(source_u64) = source_opt {
4342                let source = Vid::from(source_u64);
4343
4344                // Collect used edge IDs from previous hops for this row
4345                let used_eids: FxHashSet<u64> = used_edge_arrays
4346                    .iter()
4347                    .filter_map(|arr| {
4348                        if arr.is_null(row_idx) {
4349                            None
4350                        } else {
4351                            Some(arr.value(row_idx))
4352                        }
4353                    })
4354                    .collect();
4355
4356                let bfs_results = self.bfs(source, adjacency, &used_eids);
4357
4358                for (target, hops, node_path, edge_path) in bfs_results {
4359                    // Filter by bound target VID if set (for patterns where target is in scope).
4360                    // NULL bound targets do not match anything.
4361                    if let Some(targets) = expected_targets {
4362                        if targets.is_null(row_idx) {
4363                            continue;
4364                        }
4365                        let expected_vid = targets.value(row_idx);
4366                        if target.as_u64() != expected_vid {
4367                            continue;
4368                        }
4369                    }
4370
4371                    expansions.push((row_idx, target, hops, node_path, edge_path));
4372                    emitted_for_row = true;
4373                }
4374            }
4375
4376            if self.is_optional && !emitted_for_row {
4377                // Preserve source row with NULL optional bindings.
4378                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4379            }
4380        }
4381
4382        if expansions.is_empty() {
4383            if self.is_optional {
4384                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4385                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4386            }
4387            return Ok(RecordBatch::new_empty(self.schema.clone()));
4388        }
4389
4390        let num_rows = expansions.len();
4391        self.metrics.record_output(num_rows);
4392
4393        // Build output columns
4394        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4395
4396        // Resolve this operator's edge type names to numeric ids once, so path
4397        // builders can recover the STORED orientation of flushed (L1-resident)
4398        // relationships via a directed-outgoing adjacency probe.
4399        let edge_type_ids: Vec<u32> = {
4400            let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4401            self.type_names
4402                .iter()
4403                .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4404                .collect()
4405        };
4406
4407        // Expand input columns
4408        for col_idx in 0..batch.num_columns() {
4409            let array = batch.column(col_idx);
4410            let indices: Vec<u64> = expansions
4411                .iter()
4412                .map(|(idx, _, _, _, _)| *idx as u64)
4413                .collect();
4414            let take_indices = UInt64Array::from(indices);
4415            let expanded = arrow::compute::take(array, &take_indices, None)?;
4416            columns.push(expanded);
4417        }
4418
4419        // Add target VID column (only if not already in input)
4420        let target_vid_name = format!("{}._vid", self.target_variable);
4421        if batch.schema().column_with_name(&target_vid_name).is_none() {
4422            let target_vids: Vec<Option<u64>> = expansions
4423                .iter()
4424                .map(|(_, vid, _, node_path, edge_path)| {
4425                    if node_path.is_empty() && edge_path.is_empty() {
4426                        None
4427                    } else {
4428                        Some(vid.as_u64())
4429                    }
4430                })
4431                .collect();
4432            columns.push(Arc::new(UInt64Array::from(target_vids)));
4433        }
4434
4435        // Add target ._labels column (only if not already in input)
4436        let target_labels_name = format!("{}._labels", self.target_variable);
4437        if batch
4438            .schema()
4439            .column_with_name(&target_labels_name)
4440            .is_none()
4441        {
4442            use arrow_array::builder::{ListBuilder, StringBuilder};
4443            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4444            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4445                if node_path.is_empty() && edge_path.is_empty() {
4446                    labels_builder.append(false);
4447                    continue;
4448                }
4449                let mut row_labels: Vec<String> = Vec::new();
4450                let labels =
4451                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4452                for lbl in &labels {
4453                    if !row_labels.contains(lbl) {
4454                        row_labels.push(lbl.clone());
4455                    }
4456                }
4457                let values = labels_builder.values();
4458                for lbl in &row_labels {
4459                    values.append_value(lbl);
4460                }
4461                labels_builder.append(true);
4462            }
4463            columns.push(Arc::new(labels_builder.finish()));
4464        }
4465
4466        // Add hop count column
4467        let hop_counts: Vec<u64> = expansions
4468            .iter()
4469            .map(|(_, _, hops, _, _)| *hops as u64)
4470            .collect();
4471        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4472
4473        // Add step variable column if bound (list of edge structs).
4474        if self.step_variable.is_some() {
4475            let mut edges_builder = new_edge_list_builder();
4476            let query_ctx = self.graph_ctx.query_context();
4477            let type_names_str = self.type_names.join("|");
4478
4479            for (_, _, _, node_path, edge_path) in expansions.iter() {
4480                if node_path.is_empty() && edge_path.is_empty() {
4481                    edges_builder.append_null();
4482                } else if edge_path.is_empty() {
4483                    // Zero-hop match: empty list.
4484                    edges_builder.append(true);
4485                } else {
4486                    for (i, eid) in edge_path.iter().enumerate() {
4487                        // Report the relationship's STORED direction, not the
4488                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
4489                        // Resolves flushed (L1-resident) edges too, where the L0
4490                        // chain no longer holds the stored endpoints.
4491                        let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4492                            *eid,
4493                            node_path[i],
4494                            node_path[i + 1],
4495                            &edge_type_ids,
4496                        );
4497                        append_edge_to_struct(
4498                            edges_builder.values(),
4499                            *eid,
4500                            &type_names_str,
4501                            src,
4502                            dst,
4503                            &query_ctx,
4504                        );
4505                    }
4506                    edges_builder.append(true);
4507                }
4508            }
4509
4510            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4511        }
4512
4513        // Add path variable column if bound.
4514        // If a path column already exists in input (from a prior BindFixedPath), extend it
4515        // rather than building from scratch.
4516        if let Some(path_var_name) = &self.path_variable {
4517            let existing_path_col_idx = batch
4518                .schema()
4519                .column_with_name(path_var_name)
4520                .map(|(idx, _)| idx);
4521            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4522            let existing_path = existing_path_arc
4523                .as_ref()
4524                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4525
4526            let mut nodes_builder = new_node_list_builder();
4527            let mut rels_builder = new_edge_list_builder();
4528            let query_ctx = self.graph_ctx.query_context();
4529            let type_names_str = self.type_names.join("|");
4530            let mut path_validity = Vec::with_capacity(expansions.len());
4531
4532            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4533                if node_path.is_empty() && edge_path.is_empty() {
4534                    nodes_builder.append(false);
4535                    rels_builder.append(false);
4536                    path_validity.push(false);
4537                    continue;
4538                }
4539
4540                // Prepend existing path prefix if extending
4541                let skip_first_vlp_node = if let Some(existing) = existing_path {
4542                    if !existing.is_null(row_out_idx) {
4543                        prepend_existing_path(
4544                            existing,
4545                            row_out_idx,
4546                            &mut nodes_builder,
4547                            &mut rels_builder,
4548                            &query_ctx,
4549                        );
4550                        true
4551                    } else {
4552                        false
4553                    }
4554                } else {
4555                    false
4556                };
4557
4558                // Append VLP nodes (skip first if extending — it's the junction point)
4559                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4560                for vid in &node_path[start_idx..] {
4561                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4562                }
4563                nodes_builder.append(true);
4564
4565                for (i, eid) in edge_path.iter().enumerate() {
4566                    // Report the relationship's STORED direction, not the traversal
4567                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
4568                    // (L1-resident) edges too, where the L0 chain no longer holds
4569                    // the stored endpoints.
4570                    let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4571                        *eid,
4572                        node_path[i],
4573                        node_path[i + 1],
4574                        &edge_type_ids,
4575                    );
4576                    append_edge_to_struct(
4577                        rels_builder.values(),
4578                        *eid,
4579                        &type_names_str,
4580                        src,
4581                        dst,
4582                        &query_ctx,
4583                    );
4584                }
4585                rels_builder.append(true);
4586                path_validity.push(true);
4587            }
4588
4589            // Finish the builders to get the arrays
4590            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4591            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4592
4593            // Build the path struct with nodes and relationships fields
4594            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4595            let rels_field = Arc::new(Field::new(
4596                "relationships",
4597                rels_array.data_type().clone(),
4598                true,
4599            ));
4600
4601            // Create the path struct array
4602            let path_struct = arrow_array::StructArray::try_new(
4603                vec![nodes_field, rels_field].into(),
4604                vec![nodes_array, rels_array],
4605                Some(arrow::buffer::NullBuffer::from(path_validity)),
4606            )
4607            .map_err(arrow_err)?;
4608
4609            if let Some(idx) = existing_path_col_idx {
4610                columns[idx] = Arc::new(path_struct);
4611            } else {
4612                columns.push(Arc::new(path_struct));
4613            }
4614        }
4615
4616        // Add target property columns as NULL for now (skip if already in input).
4617        // Property hydration happens via PropertyManager in the query execution pipeline.
4618        for prop_name in &self.target_properties {
4619            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4620            if batch.schema().column_with_name(&full_prop_name).is_none() {
4621                columns.push(arrow_array::new_null_array(
4622                    &DataType::LargeBinary,
4623                    num_rows,
4624                ));
4625            }
4626        }
4627
4628        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4629    }
4630}
4631
4632impl Stream for GraphVariableLengthTraverseMainStream {
4633    type Item = DFResult<RecordBatch>;
4634
4635    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4636        let metrics = self.metrics.clone();
4637        let _timer = metrics.elapsed_compute().timer();
4638        loop {
4639            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4640
4641            match state {
4642                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4643                    Poll::Ready(Ok(adjacency)) => {
4644                        self.state = VarLengthMainStreamState::Processing(adjacency);
4645                        // Continue loop to start processing
4646                    }
4647                    Poll::Ready(Err(e)) => {
4648                        self.state = VarLengthMainStreamState::Done;
4649                        return Poll::Ready(Some(Err(e)));
4650                    }
4651                    Poll::Pending => {
4652                        self.state = VarLengthMainStreamState::Loading(fut);
4653                        return Poll::Pending;
4654                    }
4655                },
4656                VarLengthMainStreamState::Processing(adjacency) => {
4657                    match self.input.poll_next_unpin(cx) {
4658                        Poll::Ready(Some(Ok(batch))) => {
4659                            let base_batch = match self.process_batch(batch, &adjacency) {
4660                                Ok(b) => b,
4661                                Err(e) => {
4662                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4663                                    return Poll::Ready(Some(Err(e)));
4664                                }
4665                            };
4666
4667                            // If no properties need async hydration, return directly
4668                            if self.target_properties.is_empty() {
4669                                self.state = VarLengthMainStreamState::Processing(adjacency);
4670                                return Poll::Ready(Some(Ok(base_batch)));
4671                            }
4672
4673                            // Create async hydration future
4674                            let schema = self.schema.clone();
4675                            let target_variable = self.target_variable.clone();
4676                            let target_properties = self.target_properties.clone();
4677                            let graph_ctx = self.graph_ctx.clone();
4678
4679                            let fut = hydrate_vlp_target_properties(
4680                                base_batch,
4681                                schema,
4682                                target_variable,
4683                                target_properties,
4684                                None, // schemaless — no label name
4685                                graph_ctx,
4686                            );
4687
4688                            self.state = VarLengthMainStreamState::Materializing {
4689                                adjacency,
4690                                fut: Box::pin(fut),
4691                            };
4692                            // Continue loop to poll the future
4693                        }
4694                        Poll::Ready(Some(Err(e))) => {
4695                            self.state = VarLengthMainStreamState::Done;
4696                            return Poll::Ready(Some(Err(e)));
4697                        }
4698                        Poll::Ready(None) => {
4699                            self.state = VarLengthMainStreamState::Done;
4700                            return Poll::Ready(None);
4701                        }
4702                        Poll::Pending => {
4703                            self.state = VarLengthMainStreamState::Processing(adjacency);
4704                            return Poll::Pending;
4705                        }
4706                    }
4707                }
4708                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4709                    match fut.as_mut().poll(cx) {
4710                        Poll::Ready(Ok(batch)) => {
4711                            self.state = VarLengthMainStreamState::Processing(adjacency);
4712                            return Poll::Ready(Some(Ok(batch)));
4713                        }
4714                        Poll::Ready(Err(e)) => {
4715                            self.state = VarLengthMainStreamState::Done;
4716                            return Poll::Ready(Some(Err(e)));
4717                        }
4718                        Poll::Pending => {
4719                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4720                            return Poll::Pending;
4721                        }
4722                    }
4723                }
4724                VarLengthMainStreamState::Done => {
4725                    return Poll::Ready(None);
4726                }
4727            }
4728        }
4729    }
4730}
4731
4732impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4733    fn schema(&self) -> SchemaRef {
4734        self.schema.clone()
4735    }
4736}
4737
4738#[cfg(test)]
4739mod tests {
4740    use super::*;
4741
4742    #[test]
4743    fn test_traverse_schema_without_edge() {
4744        let input_schema = Arc::new(Schema::new(vec![Field::new(
4745            "a._vid",
4746            DataType::UInt64,
4747            false,
4748        )]));
4749
4750        let output_schema =
4751            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4752
4753        // Schema: input + target VID + target _labels + internal edge ID
4754        assert_eq!(output_schema.fields().len(), 4);
4755        assert_eq!(output_schema.field(0).name(), "a._vid");
4756        assert_eq!(output_schema.field(1).name(), "m._vid");
4757        assert_eq!(output_schema.field(2).name(), "m._labels");
4758        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4759    }
4760
4761    #[test]
4762    fn test_traverse_schema_with_edge() {
4763        let input_schema = Arc::new(Schema::new(vec![Field::new(
4764            "a._vid",
4765            DataType::UInt64,
4766            false,
4767        )]));
4768
4769        let output_schema = GraphTraverseExec::build_schema(
4770            input_schema,
4771            "m",
4772            Some("r"),
4773            &[],
4774            &[],
4775            None,
4776            None,
4777            false,
4778        );
4779
4780        // Schema: input + target VID + target _labels + edge EID + edge _type
4781        assert_eq!(output_schema.fields().len(), 5);
4782        assert_eq!(output_schema.field(0).name(), "a._vid");
4783        assert_eq!(output_schema.field(1).name(), "m._vid");
4784        assert_eq!(output_schema.field(2).name(), "m._labels");
4785        assert_eq!(output_schema.field(3).name(), "r._eid");
4786        assert_eq!(output_schema.field(4).name(), "r._type");
4787    }
4788
4789    #[test]
4790    fn test_traverse_schema_with_target_properties() {
4791        let input_schema = Arc::new(Schema::new(vec![Field::new(
4792            "a._vid",
4793            DataType::UInt64,
4794            false,
4795        )]));
4796
4797        let target_props = vec!["name".to_string(), "age".to_string()];
4798        let output_schema = GraphTraverseExec::build_schema(
4799            input_schema,
4800            "m",
4801            Some("r"),
4802            &[],
4803            &target_props,
4804            None,
4805            None,
4806            false,
4807        );
4808
4809        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4810        assert_eq!(output_schema.fields().len(), 7);
4811        assert_eq!(output_schema.field(0).name(), "a._vid");
4812        assert_eq!(output_schema.field(1).name(), "m._vid");
4813        assert_eq!(output_schema.field(2).name(), "m._labels");
4814        assert_eq!(output_schema.field(3).name(), "m.name");
4815        assert_eq!(output_schema.field(4).name(), "m.age");
4816        assert_eq!(output_schema.field(5).name(), "r._eid");
4817        assert_eq!(output_schema.field(6).name(), "r._type");
4818    }
4819
4820    #[test]
4821    fn test_variable_length_schema() {
4822        let input_schema = Arc::new(Schema::new(vec![Field::new(
4823            "a._vid",
4824            DataType::UInt64,
4825            false,
4826        )]));
4827
4828        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4829            input_schema,
4830            "b",
4831            None,
4832            Some("p"),
4833            &[],
4834            None,
4835        );
4836
4837        assert_eq!(output_schema.fields().len(), 5);
4838        assert_eq!(output_schema.field(0).name(), "a._vid");
4839        assert_eq!(output_schema.field(1).name(), "b._vid");
4840        assert_eq!(output_schema.field(2).name(), "b._labels");
4841        assert_eq!(output_schema.field(3).name(), "_hop_count");
4842        assert_eq!(output_schema.field(4).name(), "p");
4843    }
4844
4845    #[test]
4846    fn test_traverse_main_schema_without_edge() {
4847        let input_schema = Arc::new(Schema::new(vec![Field::new(
4848            "a._vid",
4849            DataType::UInt64,
4850            false,
4851        )]));
4852
4853        let output_schema =
4854            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4855
4856        // a._vid, m._vid, m._labels, __eid_to_m
4857        assert_eq!(output_schema.fields().len(), 4);
4858        assert_eq!(output_schema.field(0).name(), "a._vid");
4859        assert_eq!(output_schema.field(1).name(), "m._vid");
4860        assert_eq!(output_schema.field(2).name(), "m._labels");
4861        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4862    }
4863
4864    #[test]
4865    fn test_traverse_main_schema_with_edge() {
4866        let input_schema = Arc::new(Schema::new(vec![Field::new(
4867            "a._vid",
4868            DataType::UInt64,
4869            false,
4870        )]));
4871
4872        let output_schema = GraphTraverseMainExec::build_schema(
4873            &input_schema,
4874            "m",
4875            &Some("r".to_string()),
4876            &[],
4877            &[],
4878            false,
4879        );
4880
4881        // a._vid, m._vid, m._labels, r._eid, r._type
4882        assert_eq!(output_schema.fields().len(), 5);
4883        assert_eq!(output_schema.field(0).name(), "a._vid");
4884        assert_eq!(output_schema.field(1).name(), "m._vid");
4885        assert_eq!(output_schema.field(2).name(), "m._labels");
4886        assert_eq!(output_schema.field(3).name(), "r._eid");
4887        assert_eq!(output_schema.field(4).name(), "r._type");
4888    }
4889
4890    #[test]
4891    fn test_traverse_main_schema_with_edge_properties() {
4892        let input_schema = Arc::new(Schema::new(vec![Field::new(
4893            "a._vid",
4894            DataType::UInt64,
4895            false,
4896        )]));
4897
4898        let edge_props = vec!["weight".to_string(), "since".to_string()];
4899        let output_schema = GraphTraverseMainExec::build_schema(
4900            &input_schema,
4901            "m",
4902            &Some("r".to_string()),
4903            &edge_props,
4904            &[],
4905            false,
4906        );
4907
4908        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
4909        assert_eq!(output_schema.fields().len(), 7);
4910        assert_eq!(output_schema.field(0).name(), "a._vid");
4911        assert_eq!(output_schema.field(1).name(), "m._vid");
4912        assert_eq!(output_schema.field(2).name(), "m._labels");
4913        assert_eq!(output_schema.field(3).name(), "r._eid");
4914        assert_eq!(output_schema.field(4).name(), "r._type");
4915        assert_eq!(output_schema.field(5).name(), "r.weight");
4916        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4917        assert_eq!(output_schema.field(6).name(), "r.since");
4918        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4919    }
4920
4921    #[test]
4922    fn test_traverse_main_schema_with_target_properties() {
4923        let input_schema = Arc::new(Schema::new(vec![Field::new(
4924            "a._vid",
4925            DataType::UInt64,
4926            false,
4927        )]));
4928
4929        let target_props = vec!["name".to_string(), "age".to_string()];
4930        let output_schema = GraphTraverseMainExec::build_schema(
4931            &input_schema,
4932            "m",
4933            &Some("r".to_string()),
4934            &[],
4935            &target_props,
4936            false,
4937        );
4938
4939        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
4940        assert_eq!(output_schema.fields().len(), 7);
4941        assert_eq!(output_schema.field(0).name(), "a._vid");
4942        assert_eq!(output_schema.field(1).name(), "m._vid");
4943        assert_eq!(output_schema.field(2).name(), "m._labels");
4944        assert_eq!(output_schema.field(3).name(), "r._eid");
4945        assert_eq!(output_schema.field(4).name(), "r._type");
4946        assert_eq!(output_schema.field(5).name(), "m.name");
4947        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4948        assert_eq!(output_schema.field(6).name(), "m.age");
4949        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4950    }
4951}