Skip to main content

uni_query/query/df_graph/
traverse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph traversal execution plans for DataFusion.
5//!
6//! This module provides graph traversal operators as DataFusion [`ExecutionPlan`]s:
7//!
8//! - [`GraphTraverseExec`]: Single-hop edge traversal
9//! - [`GraphVariableLengthTraverseExec`]: Multi-hop BFS traversal (min..max hops)
10//!
11//! # Traversal Algorithm
12//!
13//! Traversal uses the CSR adjacency cache for O(1) neighbor lookups:
14//!
15//! ```text
16//! Input Stream (source VIDs)
17//!        │
18//!        ▼
19//! ┌──────────────────┐
20//! │ For each batch:  │
21//! │  1. Extract VIDs │
22//! │  2. get_neighbors│
23//! │  3. Expand rows  │
24//! └──────────────────┘
25//!        │
26//!        ▼
27//! Output Stream (source, edge, target)
28//! ```
29//!
30//! L0 buffers are automatically overlaid for MVCC visibility.
31
32use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35    append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36    build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37    new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{
42    build_property_column_static, property_field, resolve_property_type,
43};
44use arrow::compute::take;
45use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
46use arrow_schema::{DataType, Field, Schema, SchemaRef};
47use datafusion::common::Result as DFResult;
48use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
49use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
50use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
51use futures::{Stream, StreamExt};
52use fxhash::FxHashSet;
53use std::any::Any;
54use std::collections::{HashMap, HashSet, VecDeque};
55use std::fmt;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use uni_common::Value as UniValue;
60use uni_common::core::id::{Eid, Vid};
61use uni_store::runtime::l0_visibility;
62use uni_store::storage::direction::Direction;
63
64/// BFS result: (target_vid, hop_count, node_path, edge_path)
65type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
66
67/// Expansion record: (original_row_idx, target_vid, hop_count, node_path, edge_path)
68type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
69
70/// Prepend nodes and edges from an existing path struct column into builders.
71///
72/// Used when a VLP extends a path that was partially built by a prior `BindFixedPath`.
73/// Reads the nodes and relationships from the existing path at `row_idx` and appends
74/// them to the provided builders. The caller should then skip the first VLP node
75/// (which is the junction point already present in the existing path).
76fn prepend_existing_path(
77    existing_path: &arrow_array::StructArray,
78    row_idx: usize,
79    nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
80    rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
81    query_ctx: &uni_store::runtime::context::QueryContext,
82) {
83    // Read existing nodes
84    let nodes_list = existing_path
85        .column(0)
86        .as_any()
87        .downcast_ref::<arrow_array::ListArray>()
88        .unwrap();
89    let node_values = nodes_list.value(row_idx);
90    let node_struct = node_values
91        .as_any()
92        .downcast_ref::<arrow_array::StructArray>()
93        .unwrap();
94    let vid_col = node_struct
95        .column(0)
96        .as_any()
97        .downcast_ref::<UInt64Array>()
98        .unwrap();
99    for i in 0..vid_col.len() {
100        append_node_to_struct(
101            nodes_builder.values(),
102            Vid::from(vid_col.value(i)),
103            query_ctx,
104        );
105    }
106
107    // Read existing edges
108    let rels_list = existing_path
109        .column(1)
110        .as_any()
111        .downcast_ref::<arrow_array::ListArray>()
112        .unwrap();
113    let edge_values = rels_list.value(row_idx);
114    let edge_struct = edge_values
115        .as_any()
116        .downcast_ref::<arrow_array::StructArray>()
117        .unwrap();
118    let eid_col = edge_struct
119        .column(0)
120        .as_any()
121        .downcast_ref::<UInt64Array>()
122        .unwrap();
123    let type_col = edge_struct
124        .column(1)
125        .as_any()
126        .downcast_ref::<arrow_array::StringArray>()
127        .unwrap();
128    let src_col = edge_struct
129        .column(2)
130        .as_any()
131        .downcast_ref::<UInt64Array>()
132        .unwrap();
133    let dst_col = edge_struct
134        .column(3)
135        .as_any()
136        .downcast_ref::<UInt64Array>()
137        .unwrap();
138    for i in 0..eid_col.len() {
139        append_edge_to_struct(
140            rels_builder.values(),
141            Eid::from(eid_col.value(i)),
142            type_col.value(i),
143            src_col.value(i),
144            dst_col.value(i),
145            query_ctx,
146        );
147    }
148}
149
150/// Resolve edge property Arrow type, falling back to `LargeBinary` (CypherValue) for
151/// schemaless properties. Unlike vertex properties, schemaless edge properties must
152/// preserve original JSON value types (int, float, etc.) since edge types commonly
153/// lack explicit property definitions.
154fn resolve_edge_property_type(
155    prop: &str,
156    schema_props: Option<
157        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
158    >,
159) -> DataType {
160    if prop == "overflow_json" {
161        DataType::LargeBinary
162    } else if prop == "_created_at" || prop == "_updated_at" {
163        // System-managed timestamps surfaced via `created_at(r)` /
164        // `updated_at(r)`. Stored on every edge by the L0 buffer and
165        // the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
166        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
167    } else {
168        schema_props
169            .and_then(|props| props.get(prop))
170            .map(|meta| meta.r#type.to_arrow())
171            .unwrap_or(DataType::LargeBinary)
172    }
173}
174
175use crate::query::df_graph::common::merged_edge_schema_props;
176
177/// Expansion tuple for variable-length traversal: (input_row_idx, target_vid, hop_count, node_path, edge_path)
178type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
179
180/// Single-hop graph traversal execution plan.
181///
182/// Expands each input row by traversing edges to find neighbors.
183/// For each (source, edge, target) triple, produces one output row
184/// containing the input columns plus target vertex and edge columns.
185///
186/// # Example
187///
188/// ```ignore
189/// // Input: batch with _vid column
190/// // Traverse KNOWS edges outgoing
191/// let traverse = GraphTraverseExec::new(
192///     input_plan,
193///     "_vid",
194///     vec![knows_type_id],
195///     Direction::Outgoing,
196///     "m",           // target variable
197///     Some("r"),     // edge variable
198///     None,          // no target label filter
199///     graph_ctx,
200/// );
201///
202/// // Output: input columns + m._vid + r._eid
203/// ```
204pub struct GraphTraverseExec {
205    /// Input execution plan.
206    input: Arc<dyn ExecutionPlan>,
207
208    /// Column name containing source VIDs.
209    source_column: String,
210
211    /// Edge type IDs to traverse.
212    edge_type_ids: Vec<u32>,
213
214    /// Traversal direction.
215    direction: Direction,
216
217    /// Variable name for target vertex columns.
218    target_variable: String,
219
220    /// Variable name for edge columns (if edge is bound).
221    edge_variable: Option<String>,
222
223    /// Edge properties to materialize (for pushdown hydration).
224    edge_properties: Vec<String>,
225
226    /// Target vertex properties to materialize.
227    target_properties: Vec<String>,
228
229    /// Target label name for property type resolution.
230    target_label_name: Option<String>,
231
232    /// Optional target label filter.
233    target_label_id: Option<u16>,
234
235    /// Graph execution context.
236    graph_ctx: Arc<GraphExecutionContext>,
237
238    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
239    optional: bool,
240
241    /// Variables introduced by the OPTIONAL MATCH pattern.
242    /// Used to determine which columns should be null-extended on failure.
243    optional_pattern_vars: HashSet<String>,
244
245    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
246    /// When set, only traversals that reach this VID are included.
247    bound_target_column: Option<String>,
248
249    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
250    /// Edges matching any of these IDs are excluded from traversal results.
251    used_edge_columns: Vec<String>,
252
253    /// Output schema.
254    schema: SchemaRef,
255
256    /// Cached plan properties.
257    properties: Arc<PlanProperties>,
258
259    /// Execution metrics.
260    metrics: ExecutionPlanMetricsSet,
261}
262
263impl fmt::Debug for GraphTraverseExec {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        f.debug_struct("GraphTraverseExec")
266            .field("source_column", &self.source_column)
267            .field("edge_type_ids", &self.edge_type_ids)
268            .field("direction", &self.direction)
269            .field("target_variable", &self.target_variable)
270            .field("edge_variable", &self.edge_variable)
271            .finish()
272    }
273}
274
275impl GraphTraverseExec {
276    /// Create a new single-hop traversal plan.
277    ///
278    /// # Arguments
279    ///
280    /// * `input` - Input plan providing source vertices
281    /// * `source_column` - Column name containing source VIDs
282    /// * `edge_type_ids` - Edge types to traverse
283    /// * `direction` - Traversal direction
284    /// * `target_variable` - Variable name for target vertices
285    /// * `edge_variable` - Optional variable name for edges
286    /// * `edge_properties` - Edge properties to materialize
287    /// * `target_label_id` - Optional target label filter
288    /// * `graph_ctx` - Graph execution context
289    /// * `bound_target_column` - Column with already-bound target VID (for cycle patterns)
290    /// * `used_edge_columns` - Columns with edge IDs to exclude (relationship uniqueness)
291    #[expect(clippy::too_many_arguments)]
292    pub fn new(
293        input: Arc<dyn ExecutionPlan>,
294        source_column: impl Into<String>,
295        edge_type_ids: Vec<u32>,
296        direction: Direction,
297        target_variable: impl Into<String>,
298        edge_variable: Option<String>,
299        edge_properties: Vec<String>,
300        target_properties: Vec<String>,
301        target_label_name: Option<String>,
302        target_label_id: Option<u16>,
303        graph_ctx: Arc<GraphExecutionContext>,
304        optional: bool,
305        optional_pattern_vars: HashSet<String>,
306        bound_target_column: Option<String>,
307        used_edge_columns: Vec<String>,
308    ) -> Self {
309        let source_column = source_column.into();
310        let target_variable = target_variable.into();
311
312        // Resolve target property Arrow types from the schema
313        let uni_schema = graph_ctx.storage().schema_manager().schema();
314        let label_props = target_label_name
315            .as_deref()
316            .and_then(|ln| uni_schema.properties.get(ln));
317        let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
318        let edge_props = if merged_edge_props.is_empty() {
319            None
320        } else {
321            Some(&merged_edge_props)
322        };
323
324        // Build output schema: input schema + target VID + target props + optional edge ID + edge properties
325        let schema = Self::build_schema(
326            input.schema(),
327            &target_variable,
328            edge_variable.as_deref(),
329            &edge_properties,
330            &target_properties,
331            label_props,
332            edge_props,
333            optional,
334        );
335
336        let properties = compute_plan_properties(schema.clone());
337
338        Self {
339            input,
340            source_column,
341            edge_type_ids,
342            direction,
343            target_variable,
344            edge_variable,
345            edge_properties,
346            target_properties,
347            target_label_name,
348            target_label_id,
349            graph_ctx,
350            optional,
351            optional_pattern_vars,
352            bound_target_column,
353            used_edge_columns,
354            schema,
355            properties,
356            metrics: ExecutionPlanMetricsSet::new(),
357        }
358    }
359
360    /// Build output schema.
361    #[expect(
362        clippy::too_many_arguments,
363        reason = "Schema construction needs all field metadata"
364    )]
365    fn build_schema(
366        input_schema: SchemaRef,
367        target_variable: &str,
368        edge_variable: Option<&str>,
369        edge_properties: &[String],
370        target_properties: &[String],
371        label_props: Option<
372            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
373        >,
374        edge_props: Option<
375            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
376        >,
377        optional: bool,
378    ) -> SchemaRef {
379        let mut fields: Vec<Field> = input_schema
380            .fields()
381            .iter()
382            .map(|f| f.as_ref().clone())
383            .collect();
384
385        // Add target VID column (nullable when optional — unmatched rows get NULL)
386        let target_vid_name = format!("{}._vid", target_variable);
387        fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
388
389        // Add target ._labels column (List(Utf8)) for labels() and structural projection support
390        fields.push(Field::new(
391            format!("{}._labels", target_variable),
392            labels_data_type(),
393            true,
394        ));
395
396        // Add target vertex property columns
397        for prop_name in target_properties {
398            let col_name = format!("{}.{}", target_variable, prop_name);
399            let arrow_type = resolve_property_type(prop_name, label_props);
400            let uni_type = label_props
401                .and_then(|p| p.get(prop_name))
402                .map(|m| &m.r#type);
403            fields.push(property_field(&col_name, arrow_type, uni_type));
404        }
405
406        // Add edge ID column if edge variable is bound
407        if let Some(edge_var) = edge_variable {
408            let edge_id_name = format!("{}._eid", edge_var);
409            fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
410
411            // Add edge _type column for type(r) support
412            fields.push(Field::new(
413                format!("{}._type", edge_var),
414                DataType::Utf8,
415                true,
416            ));
417
418            // Add edge property columns with types resolved from schema
419            for prop_name in edge_properties {
420                let prop_col_name = format!("{}.{}", edge_var, prop_name);
421                let arrow_type = resolve_edge_property_type(prop_name, edge_props);
422                let uni_type = edge_props.and_then(|p| p.get(prop_name)).map(|m| &m.r#type);
423                fields.push(property_field(&prop_col_name, arrow_type, uni_type));
424            }
425        } else {
426            // Add internal edge ID column for relationship uniqueness tracking
427            // even when edge variable is not explicitly bound.
428            let internal_eid_name = format!("__eid_to_{}", target_variable);
429            fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
430        }
431
432        Arc::new(Schema::new(fields))
433    }
434}
435
436impl DisplayAs for GraphTraverseExec {
437    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        write!(
439            f,
440            "GraphTraverseExec: {} --[{:?}]--> {}",
441            self.source_column, self.edge_type_ids, self.target_variable
442        )?;
443        if let Some(ref edge_var) = self.edge_variable {
444            write!(f, " as {}", edge_var)?;
445        }
446        Ok(())
447    }
448}
449
450impl ExecutionPlan for GraphTraverseExec {
451    fn name(&self) -> &str {
452        "GraphTraverseExec"
453    }
454
455    fn as_any(&self) -> &dyn Any {
456        self
457    }
458
459    fn schema(&self) -> SchemaRef {
460        self.schema.clone()
461    }
462
463    fn properties(&self) -> &Arc<PlanProperties> {
464        &self.properties
465    }
466
467    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
468        vec![&self.input]
469    }
470
471    fn with_new_children(
472        self: Arc<Self>,
473        children: Vec<Arc<dyn ExecutionPlan>>,
474    ) -> DFResult<Arc<dyn ExecutionPlan>> {
475        if children.len() != 1 {
476            return Err(datafusion::error::DataFusionError::Plan(
477                "GraphTraverseExec requires exactly one child".to_string(),
478            ));
479        }
480
481        Ok(Arc::new(Self::new(
482            children[0].clone(),
483            self.source_column.clone(),
484            self.edge_type_ids.clone(),
485            self.direction,
486            self.target_variable.clone(),
487            self.edge_variable.clone(),
488            self.edge_properties.clone(),
489            self.target_properties.clone(),
490            self.target_label_name.clone(),
491            self.target_label_id,
492            self.graph_ctx.clone(),
493            self.optional,
494            self.optional_pattern_vars.clone(),
495            self.bound_target_column.clone(),
496            self.used_edge_columns.clone(),
497        )))
498    }
499
500    fn execute(
501        &self,
502        partition: usize,
503        context: Arc<TaskContext>,
504    ) -> DFResult<SendableRecordBatchStream> {
505        let input_stream = self.input.execute(partition, context)?;
506
507        let metrics = BaselineMetrics::new(&self.metrics, partition);
508
509        let warm_fut = self
510            .graph_ctx
511            .warming_future(self.edge_type_ids.clone(), self.direction);
512
513        Ok(Box::pin(GraphTraverseStream {
514            input: input_stream,
515            source_column: self.source_column.clone(),
516            edge_type_ids: self.edge_type_ids.clone(),
517            direction: self.direction,
518            target_variable: self.target_variable.clone(),
519            edge_variable: self.edge_variable.clone(),
520            edge_properties: self.edge_properties.clone(),
521            target_properties: self.target_properties.clone(),
522            target_label_name: self.target_label_name.clone(),
523            graph_ctx: self.graph_ctx.clone(),
524            optional: self.optional,
525            optional_pattern_vars: self.optional_pattern_vars.clone(),
526            bound_target_column: self.bound_target_column.clone(),
527            used_edge_columns: self.used_edge_columns.clone(),
528            schema: self.schema.clone(),
529            state: TraverseStreamState::Warming(warm_fut),
530            metrics,
531        }))
532    }
533
534    fn metrics(&self) -> Option<MetricsSet> {
535        Some(self.metrics.clone_inner())
536    }
537}
538
539/// State machine for traverse stream execution.
540enum TraverseStreamState {
541    /// Warming adjacency CSRs before first batch.
542    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
543    /// Polling the input stream for batches.
544    Reading,
545    /// Materializing target vertex properties asynchronously.
546    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
547    /// Stream is done.
548    Done,
549}
550
551/// Stream that performs single-hop traversal with async property materialization.
552struct GraphTraverseStream {
553    /// Input stream.
554    input: SendableRecordBatchStream,
555
556    /// Column name containing source VIDs.
557    source_column: String,
558
559    /// Edge type IDs to traverse.
560    edge_type_ids: Vec<u32>,
561
562    /// Traversal direction.
563    direction: Direction,
564
565    /// Variable name for target vertex (retained for diagnostics).
566    #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
567    target_variable: String,
568
569    /// Variable name for edge (if bound).
570    edge_variable: Option<String>,
571
572    /// Edge properties to materialize.
573    edge_properties: Vec<String>,
574
575    /// Target vertex properties to materialize.
576    target_properties: Vec<String>,
577
578    /// Target label name for property resolution and filtering.
579    target_label_name: Option<String>,
580
581    /// Graph execution context.
582    graph_ctx: Arc<GraphExecutionContext>,
583
584    /// Whether this is an OPTIONAL MATCH.
585    optional: bool,
586
587    /// Variables introduced by the OPTIONAL MATCH pattern.
588    optional_pattern_vars: HashSet<String>,
589
590    /// Column name of an already-bound target VID (for cycle patterns like n-->k<--n).
591    bound_target_column: Option<String>,
592
593    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
594    used_edge_columns: Vec<String>,
595
596    /// Output schema.
597    schema: SchemaRef,
598
599    /// Stream state.
600    state: TraverseStreamState,
601
602    /// Metrics.
603    metrics: BaselineMetrics,
604}
605
606impl GraphTraverseStream {
607    /// Expand neighbors synchronously and return expansions.
608    /// Returns (row_idx, target_vid, eid_u64, edge_type_id).
609    fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
610        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
611            datafusion::error::DataFusionError::Execution(format!(
612                "Source column '{}' not found",
613                self.source_column
614            ))
615        })?;
616
617        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
618        let source_vids: &UInt64Array = &source_vid_cow;
619
620        // If bound_target_column is set, get the expected target VIDs for each row.
621        // This is used for cycle patterns like n-->k<--n where the target must match.
622        let bound_target_cow = self
623            .bound_target_column
624            .as_ref()
625            .and_then(|col| batch.column_by_name(col))
626            .map(|c| column_as_vid_array(c.as_ref()))
627            .transpose()?;
628        let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
629
630        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
631        let used_edge_arrays: Vec<&UInt64Array> = self
632            .used_edge_columns
633            .iter()
634            .filter_map(|col| {
635                batch
636                    .column_by_name(col)
637                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
638            })
639            .collect();
640
641        let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
642        let is_undirected = matches!(self.direction, Direction::Both);
643
644        for (row_idx, source_vid) in source_vids.iter().enumerate() {
645            let Some(src) = source_vid else {
646                continue;
647            };
648
649            // Get expected target VID if this is a bound target pattern.
650            // Distinguish between:
651            // - no bound target column (no filtering),
652            // - bound target present but NULL for this row (must produce no expansion),
653            // - bound target present with VID.
654            let expected_target = bound_target_vids.map(|arr| {
655                if arr.is_null(row_idx) {
656                    None
657                } else {
658                    Some(arr.value(row_idx))
659                }
660            });
661
662            // Collect used edge IDs for this row from all previous hops
663            let used_eids: HashSet<u64> = used_edge_arrays
664                .iter()
665                .filter_map(|arr| {
666                    if arr.is_null(row_idx) {
667                        None
668                    } else {
669                        Some(arr.value(row_idx))
670                    }
671                })
672                .collect();
673
674            let vid = Vid::from(src);
675            // For Direction::Both, deduplicate edges by eid within each source.
676            // This prevents the same edge being counted twice (once outgoing, once incoming).
677            let mut seen_edges: HashSet<u64> = HashSet::new();
678
679            for &edge_type in &self.edge_type_ids {
680                let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
681
682                for (target_vid, eid) in neighbors {
683                    let eid_u64 = eid.as_u64();
684
685                    // Skip edges already used in previous hops (relationship uniqueness)
686                    if used_eids.contains(&eid_u64) {
687                        continue;
688                    }
689
690                    // Deduplicate edges for undirected patterns
691                    if is_undirected && !seen_edges.insert(eid_u64) {
692                        continue;
693                    }
694
695                    // Filter by bound target VID if set (for cycle patterns).
696                    // NULL bound targets do not match anything.
697                    if let Some(expected_opt) = expected_target {
698                        let Some(expected) = expected_opt else {
699                            continue;
700                        };
701                        if target_vid.as_u64() != expected {
702                            continue;
703                        }
704                    }
705
706                    // Filter by target label. VIDs no longer embed label
707                    // information, so we resolve labels from the L0 chain and
708                    // then the persisted VidLabelsIndex (the latter covers
709                    // Lance-only vertices, e.g. on forks).
710                    if let Some(ref label_name) = self.target_label_name {
711                        let query_ctx = self.graph_ctx.query_context();
712                        if let Some(vertex_labels) =
713                            self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
714                        {
715                            // Labels resolved — require an actual match.
716                            if !vertex_labels.contains(label_name) {
717                                continue;
718                            }
719                        }
720                        // else: unknown to L0 and the index → trust storage-level filtering
721                    }
722
723                    expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
724                }
725            }
726        }
727
728        Ok(expanded_rows)
729    }
730}
731
732/// Build the target-vertex labels column for `hasLabel` filters.
733///
734/// Resolves each target's labels from the L0 chain and then the persisted
735/// `VidLabelsIndex`, so `hasLabel(b, "B")` evaluates correctly for vertices
736/// that live only in Lance storage (e.g. on a fork). Only when a vertex is
737/// absent from both does it fall back to the label that scoped the traversal.
738fn build_target_labels_column(
739    target_vids: &[Vid],
740    target_label_name: &Option<String>,
741    graph_ctx: &GraphExecutionContext,
742) -> ArrayRef {
743    use arrow_array::builder::{ListBuilder, StringBuilder};
744    let mut labels_builder = ListBuilder::new(StringBuilder::new());
745    let query_ctx = graph_ctx.query_context();
746    for vid in target_vids {
747        let row_labels: Vec<String> = match graph_ctx.resolve_vertex_labels(*vid, &query_ctx) {
748            Some(labels) => labels,
749            None => {
750                // Unknown to both L0 and the index — trust the schema label
751                // that scoped the traversal (storage already filtered to it).
752                if let Some(label_name) = target_label_name {
753                    vec![label_name.clone()]
754                } else {
755                    vec![]
756                }
757            }
758        };
759        let values = labels_builder.values();
760        for lbl in &row_labels {
761            values.append_value(lbl);
762        }
763        labels_builder.append(true);
764    }
765    Arc::new(labels_builder.finish())
766}
767
768/// Build target vertex property columns from storage and L0.
769async fn build_target_property_columns(
770    target_vids: &[Vid],
771    target_properties: &[String],
772    target_label_name: &Option<String>,
773    graph_ctx: &Arc<GraphExecutionContext>,
774) -> DFResult<Vec<ArrayRef>> {
775    let mut columns = Vec::new();
776
777    if let Some(label_name) = target_label_name {
778        let property_manager = graph_ctx.property_manager();
779        let query_ctx = graph_ctx.query_context();
780
781        let props_map = property_manager
782            .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
783            .await
784            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
785
786        let uni_schema = graph_ctx.storage().schema_manager().schema();
787        let label_props = uni_schema.properties.get(label_name.as_str());
788
789        for prop_name in target_properties {
790            let data_type = resolve_property_type(prop_name, label_props);
791            let column =
792                build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
793            columns.push(column);
794        }
795    } else {
796        // No label name — use label-agnostic property lookup.
797        let non_internal_props: Vec<&str> = target_properties
798            .iter()
799            .filter(|p| *p != "_all_props")
800            .map(|s| s.as_str())
801            .collect();
802        let property_manager = graph_ctx.property_manager();
803        let query_ctx = graph_ctx.query_context();
804
805        let props_map = if !non_internal_props.is_empty() {
806            property_manager
807                .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
808                .await
809                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
810        } else {
811            std::collections::HashMap::new()
812        };
813
814        for prop_name in target_properties {
815            if prop_name == "_all_props" {
816                columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
817            } else {
818                let column = build_property_column_static(
819                    target_vids,
820                    &props_map,
821                    prop_name,
822                    &arrow::datatypes::DataType::LargeBinary,
823                )?;
824                columns.push(column);
825            }
826        }
827    }
828
829    Ok(columns)
830}
831
832/// Build a CypherValue blob column from all vertex properties (L0 + storage).
833fn build_all_props_column(
834    target_vids: &[Vid],
835    props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
836    graph_ctx: &Arc<GraphExecutionContext>,
837) -> ArrayRef {
838    use arrow_array::builder::LargeBinaryBuilder;
839
840    let mut builder = LargeBinaryBuilder::new();
841    let l0_ctx = graph_ctx.l0_context();
842    for vid in target_vids {
843        // Merge in `Value` space so typed values (temporals) are preserved.
844        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
845        if let Some(vid_props) = props_map.get(vid) {
846            for (k, v) in vid_props.iter() {
847                merged_props.insert(k.clone(), v.clone());
848            }
849        }
850        for l0 in l0_ctx.iter_l0_buffers() {
851            let guard = l0.read();
852            if let Some(l0_props) = guard.vertex_properties.get(vid) {
853                for (k, v) in l0_props.iter() {
854                    merged_props.insert(k.clone(), v.clone());
855                }
856            }
857        }
858        if merged_props.is_empty() {
859            builder.append_null();
860        } else {
861            builder.append_value(uni_common::cypher_value_codec::encode(
862                &uni_common::Value::Map(merged_props),
863            ));
864        }
865    }
866    Arc::new(builder.finish())
867}
868
869/// Build edge ID, type, and property columns for bound edge variables.
870async fn build_edge_columns(
871    expansions: &[(usize, Vid, u64, u32)],
872    edge_properties: &[String],
873    edge_type_ids: &[u32],
874    graph_ctx: &Arc<GraphExecutionContext>,
875) -> DFResult<Vec<ArrayRef>> {
876    let mut columns = Vec::new();
877
878    let eids: Vec<Eid> = expansions
879        .iter()
880        .map(|(_, _, eid, _)| Eid::from(*eid))
881        .collect();
882    let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
883    columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
884
885    // Edge _type column
886    {
887        let uni_schema = graph_ctx.storage().schema_manager().schema();
888        let mut type_builder = arrow_array::builder::StringBuilder::new();
889        for (_, _, _, edge_type_id) in expansions {
890            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
891                type_builder.append_value(&name);
892            } else {
893                type_builder.append_null();
894            }
895        }
896        columns.push(Arc::new(type_builder.finish()) as ArrayRef);
897    }
898
899    if !edge_properties.is_empty() {
900        let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
901        let property_manager = graph_ctx.property_manager();
902        let query_ctx = graph_ctx.query_context();
903
904        let props_map = property_manager
905            .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
906            .await
907            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
908
909        let uni_schema = graph_ctx.storage().schema_manager().schema();
910        let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
911        let edge_type_props = if merged_edge_props.is_empty() {
912            None
913        } else {
914            Some(&merged_edge_props)
915        };
916
917        let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
918
919        for prop_name in edge_properties {
920            // System-managed edge timestamps live outside the property bag.
921            // Read them directly from L0 (earliest creation, latest update).
922            // Disk-only edges will surface as null — acceptable for the
923            // common case where the queried edges are L0-resident in the
924            // same session.
925            if prop_name == "_created_at" || prop_name == "_updated_at" {
926                let mut builder =
927                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
928                let l0_ctx = graph_ctx.l0_context();
929                for eid in &eids {
930                    let mut value: Option<i64> = None;
931                    for l0 in l0_ctx.iter_l0_buffers() {
932                        let guard = l0.read();
933                        let opt = if prop_name == "_created_at" {
934                            guard.edge_created_at.get(eid).copied()
935                        } else {
936                            guard.edge_updated_at.get(eid).copied()
937                        };
938                        if let Some(ts) = opt {
939                            value = Some(match value {
940                                None => ts,
941                                Some(cur) if prop_name == "_created_at" => cur.min(ts),
942                                Some(cur) => cur.max(ts),
943                            });
944                        }
945                    }
946                    match value {
947                        Some(ts) => builder.append_value(ts),
948                        None => builder.append_null(),
949                    }
950                }
951                columns.push(Arc::new(builder.finish()) as ArrayRef);
952                continue;
953            }
954            let data_type = resolve_edge_property_type(prop_name, edge_type_props);
955            let column =
956                build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
957            columns.push(column);
958        }
959    }
960
961    Ok(columns)
962}
963
964/// Build the output batch with target vertex properties.
965///
966/// This is a standalone async function so it can be boxed into a `Send` future
967/// without borrowing from `GraphTraverseStream`.
968#[expect(
969    clippy::too_many_arguments,
970    reason = "Standalone async fn needs all context passed explicitly"
971)]
972async fn build_traverse_output_batch(
973    input: RecordBatch,
974    expansions: Vec<(usize, Vid, u64, u32)>,
975    schema: SchemaRef,
976    edge_variable: Option<String>,
977    edge_properties: Vec<String>,
978    edge_type_ids: Vec<u32>,
979    target_properties: Vec<String>,
980    target_label_name: Option<String>,
981    graph_ctx: Arc<GraphExecutionContext>,
982    optional: bool,
983    optional_pattern_vars: HashSet<String>,
984) -> DFResult<RecordBatch> {
985    if expansions.is_empty() {
986        if !optional {
987            return Ok(RecordBatch::new_empty(schema));
988        }
989        let unmatched_reps = collect_unmatched_optional_group_rows(
990            &input,
991            &HashSet::new(),
992            &schema,
993            &optional_pattern_vars,
994        )?;
995        if unmatched_reps.is_empty() {
996            return Ok(RecordBatch::new_empty(schema));
997        }
998        return build_optional_null_batch_for_rows_with_optional_vars(
999            &input,
1000            &unmatched_reps,
1001            &schema,
1002            &optional_pattern_vars,
1003        );
1004    }
1005
1006    // Expand input columns via index array
1007    let indices: Vec<u64> = expansions
1008        .iter()
1009        .map(|(idx, _, _, _)| *idx as u64)
1010        .collect();
1011    let indices_array = UInt64Array::from(indices);
1012    let mut columns: Vec<ArrayRef> = input
1013        .columns()
1014        .iter()
1015        .map(|col| take(col.as_ref(), &indices_array, None))
1016        .collect::<Result<_, _>>()?;
1017
1018    // Target VID column
1019    let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1020    let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1021    columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1022
1023    // Target labels column
1024    columns.push(build_target_labels_column(
1025        &target_vids,
1026        &target_label_name,
1027        &graph_ctx,
1028    ));
1029
1030    // Target vertex property columns
1031    if !target_properties.is_empty() {
1032        let prop_cols = build_target_property_columns(
1033            &target_vids,
1034            &target_properties,
1035            &target_label_name,
1036            &graph_ctx,
1037        )
1038        .await?;
1039        columns.extend(prop_cols);
1040    }
1041
1042    // Edge columns (bound or internal tracking)
1043    if edge_variable.is_some() {
1044        let edge_cols =
1045            build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1046        columns.extend(edge_cols);
1047    } else {
1048        let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1049        columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1050    }
1051
1052    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1053
1054    // Append null rows for unmatched optional sources
1055    if optional {
1056        let matched_indices: HashSet<usize> =
1057            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1058        let unmatched = collect_unmatched_optional_group_rows(
1059            &input,
1060            &matched_indices,
1061            &schema,
1062            &optional_pattern_vars,
1063        )?;
1064
1065        if !unmatched.is_empty() {
1066            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1067                &input,
1068                &unmatched,
1069                &schema,
1070                &optional_pattern_vars,
1071            )?;
1072            let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1073                .map_err(arrow_err)?;
1074            return Ok(combined);
1075        }
1076    }
1077
1078    Ok(expanded_batch)
1079}
1080
1081/// Build a batch for specific unmatched source rows with NULL target/edge columns.
1082/// Used when OPTIONAL MATCH has some expansions but some source rows had none.
1083fn build_optional_null_batch_for_rows(
1084    input: &RecordBatch,
1085    unmatched_indices: &[usize],
1086    schema: &SchemaRef,
1087) -> DFResult<RecordBatch> {
1088    let num_rows = unmatched_indices.len();
1089    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1090    let indices_array = UInt64Array::from(indices);
1091
1092    // Take the unmatched input rows
1093    let mut columns: Vec<ArrayRef> = Vec::new();
1094    for col in input.columns() {
1095        let taken = take(col.as_ref(), &indices_array, None)?;
1096        columns.push(taken);
1097    }
1098    // Fill remaining columns with nulls
1099    for field in schema.fields().iter().skip(input.num_columns()) {
1100        columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1101    }
1102    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1103}
1104
1105fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1106    optional_vars.contains(col_name)
1107        || optional_vars.iter().any(|var| {
1108            (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1109                || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1110        })
1111}
1112
1113fn collect_unmatched_optional_group_rows(
1114    input: &RecordBatch,
1115    matched_indices: &HashSet<usize>,
1116    schema: &SchemaRef,
1117    optional_vars: &HashSet<String>,
1118) -> DFResult<Vec<usize>> {
1119    if input.num_rows() == 0 {
1120        return Ok(Vec::new());
1121    }
1122
1123    if optional_vars.is_empty() {
1124        return Ok((0..input.num_rows())
1125            .filter(|idx| !matched_indices.contains(idx))
1126            .collect());
1127    }
1128
1129    let source_vid_indices: Vec<usize> = schema
1130        .fields()
1131        .iter()
1132        .enumerate()
1133        .filter_map(|(idx, field)| {
1134            if idx >= input.num_columns() {
1135                return None;
1136            }
1137            let name = field.name();
1138            if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1139                Some(idx)
1140            } else {
1141                None
1142            }
1143        })
1144        .collect();
1145
1146    // Group rows by non-optional VID bindings and preserve group order.
1147    let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); // (first_row_idx, any_matched)
1148    let mut group_order: Vec<Vec<u8>> = Vec::new();
1149
1150    for row_idx in 0..input.num_rows() {
1151        let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1152        let entry = groups.entry(key.clone());
1153        if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1154            group_order.push(key.clone());
1155        }
1156        let matched = matched_indices.contains(&row_idx);
1157        entry
1158            .and_modify(|(_, any_matched)| *any_matched |= matched)
1159            .or_insert((row_idx, matched));
1160    }
1161
1162    Ok(group_order
1163        .into_iter()
1164        .filter_map(|key| {
1165            groups
1166                .get(&key)
1167                .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1168        })
1169        .collect())
1170}
1171
1172fn compute_optional_group_key(
1173    batch: &RecordBatch,
1174    row_idx: usize,
1175    source_vid_indices: &[usize],
1176) -> DFResult<Vec<u8>> {
1177    let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1178    for &col_idx in source_vid_indices {
1179        let col = batch.column(col_idx);
1180        let vid_cow = column_as_vid_array(col.as_ref())?;
1181        let arr: &UInt64Array = &vid_cow;
1182        if arr.is_null(row_idx) {
1183            key.extend_from_slice(&u64::MAX.to_le_bytes());
1184        } else {
1185            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1186        }
1187    }
1188    Ok(key)
1189}
1190
1191fn build_optional_null_batch_for_rows_with_optional_vars(
1192    input: &RecordBatch,
1193    unmatched_indices: &[usize],
1194    schema: &SchemaRef,
1195    optional_vars: &HashSet<String>,
1196) -> DFResult<RecordBatch> {
1197    if optional_vars.is_empty() {
1198        return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1199    }
1200
1201    let num_rows = unmatched_indices.len();
1202    let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1203    let indices_array = UInt64Array::from(indices);
1204
1205    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1206    for (col_idx, field) in schema.fields().iter().enumerate() {
1207        if col_idx < input.num_columns() {
1208            if is_optional_column_for_vars(field.name(), optional_vars) {
1209                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1210            } else {
1211                let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1212                columns.push(taken);
1213            }
1214        } else {
1215            columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1216        }
1217    }
1218
1219    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1220}
1221
1222impl Stream for GraphTraverseStream {
1223    type Item = DFResult<RecordBatch>;
1224
1225    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1226        let metrics = self.metrics.clone();
1227        let _timer = metrics.elapsed_compute().timer();
1228        loop {
1229            let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1230
1231            match state {
1232                TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1233                    Poll::Ready(Ok(())) => {
1234                        self.state = TraverseStreamState::Reading;
1235                        // Continue loop to start reading
1236                    }
1237                    Poll::Ready(Err(e)) => {
1238                        self.state = TraverseStreamState::Done;
1239                        return Poll::Ready(Some(Err(e)));
1240                    }
1241                    Poll::Pending => {
1242                        self.state = TraverseStreamState::Warming(fut);
1243                        return Poll::Pending;
1244                    }
1245                },
1246                TraverseStreamState::Reading => {
1247                    // Check timeout
1248                    if let Err(e) = self.graph_ctx.check_timeout() {
1249                        return Poll::Ready(Some(Err(
1250                            datafusion::error::DataFusionError::Execution(e.to_string()),
1251                        )));
1252                    }
1253
1254                    match self.input.poll_next_unpin(cx) {
1255                        Poll::Ready(Some(Ok(batch))) => {
1256                            // Expand neighbors synchronously
1257                            let expansions = match self.expand_neighbors(&batch) {
1258                                Ok(exp) => exp,
1259                                Err(e) => {
1260                                    self.state = TraverseStreamState::Reading;
1261                                    return Poll::Ready(Some(Err(e)));
1262                                }
1263                            };
1264
1265                            // Build output synchronously only when no properties need async hydration
1266                            if self.target_properties.is_empty() && self.edge_properties.is_empty()
1267                            {
1268                                let result = build_traverse_output_batch_sync(
1269                                    &batch,
1270                                    &expansions,
1271                                    &self.schema,
1272                                    self.edge_variable.as_ref(),
1273                                    &self.graph_ctx,
1274                                    self.optional,
1275                                    &self.optional_pattern_vars,
1276                                );
1277                                self.state = TraverseStreamState::Reading;
1278                                if let Ok(ref r) = result {
1279                                    self.metrics.record_output(r.num_rows());
1280                                }
1281                                return Poll::Ready(Some(result));
1282                            }
1283
1284                            // Properties needed — create async future for hydration
1285                            let schema = self.schema.clone();
1286                            let edge_variable = self.edge_variable.clone();
1287                            let edge_properties = self.edge_properties.clone();
1288                            let edge_type_ids = self.edge_type_ids.clone();
1289                            let target_properties = self.target_properties.clone();
1290                            let target_label_name = self.target_label_name.clone();
1291                            let graph_ctx = self.graph_ctx.clone();
1292
1293                            let optional = self.optional;
1294                            let optional_pattern_vars = self.optional_pattern_vars.clone();
1295
1296                            let fut = build_traverse_output_batch(
1297                                batch,
1298                                expansions,
1299                                schema,
1300                                edge_variable,
1301                                edge_properties,
1302                                edge_type_ids,
1303                                target_properties,
1304                                target_label_name,
1305                                graph_ctx,
1306                                optional,
1307                                optional_pattern_vars,
1308                            );
1309
1310                            self.state = TraverseStreamState::Materializing(Box::pin(fut));
1311                            // Continue loop to poll the future
1312                        }
1313                        Poll::Ready(Some(Err(e))) => {
1314                            self.state = TraverseStreamState::Done;
1315                            return Poll::Ready(Some(Err(e)));
1316                        }
1317                        Poll::Ready(None) => {
1318                            self.state = TraverseStreamState::Done;
1319                            return Poll::Ready(None);
1320                        }
1321                        Poll::Pending => {
1322                            self.state = TraverseStreamState::Reading;
1323                            return Poll::Pending;
1324                        }
1325                    }
1326                }
1327                TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1328                    Poll::Ready(Ok(batch)) => {
1329                        self.state = TraverseStreamState::Reading;
1330                        self.metrics.record_output(batch.num_rows());
1331                        return Poll::Ready(Some(Ok(batch)));
1332                    }
1333                    Poll::Ready(Err(e)) => {
1334                        self.state = TraverseStreamState::Done;
1335                        return Poll::Ready(Some(Err(e)));
1336                    }
1337                    Poll::Pending => {
1338                        self.state = TraverseStreamState::Materializing(fut);
1339                        return Poll::Pending;
1340                    }
1341                },
1342                TraverseStreamState::Done => {
1343                    return Poll::Ready(None);
1344                }
1345            }
1346        }
1347    }
1348}
1349
1350/// Build output batch synchronously when no properties need async hydration.
1351///
1352/// Only called when both `target_properties` and `edge_properties` are empty,
1353/// so no property columns need to be materialized.
1354fn build_traverse_output_batch_sync(
1355    input: &RecordBatch,
1356    expansions: &[(usize, Vid, u64, u32)],
1357    schema: &SchemaRef,
1358    edge_variable: Option<&String>,
1359    graph_ctx: &GraphExecutionContext,
1360    optional: bool,
1361    optional_pattern_vars: &HashSet<String>,
1362) -> DFResult<RecordBatch> {
1363    if expansions.is_empty() {
1364        if !optional {
1365            return Ok(RecordBatch::new_empty(schema.clone()));
1366        }
1367        let unmatched_reps = collect_unmatched_optional_group_rows(
1368            input,
1369            &HashSet::new(),
1370            schema,
1371            optional_pattern_vars,
1372        )?;
1373        if unmatched_reps.is_empty() {
1374            return Ok(RecordBatch::new_empty(schema.clone()));
1375        }
1376        return build_optional_null_batch_for_rows_with_optional_vars(
1377            input,
1378            &unmatched_reps,
1379            schema,
1380            optional_pattern_vars,
1381        );
1382    }
1383
1384    let indices: Vec<u64> = expansions
1385        .iter()
1386        .map(|(idx, _, _, _)| *idx as u64)
1387        .collect();
1388    let indices_array = UInt64Array::from(indices);
1389
1390    let mut columns: Vec<ArrayRef> = Vec::new();
1391    for col in input.columns() {
1392        let expanded = take(col.as_ref(), &indices_array, None)?;
1393        columns.push(expanded);
1394    }
1395
1396    // Add target VID column
1397    let target_vids: Vec<u64> = expansions
1398        .iter()
1399        .map(|(_, vid, _, _)| vid.as_u64())
1400        .collect();
1401    columns.push(Arc::new(UInt64Array::from(target_vids)));
1402
1403    // Add target ._labels column (from L0 buffers)
1404    {
1405        use arrow_array::builder::{ListBuilder, StringBuilder};
1406        let l0_ctx = graph_ctx.l0_context();
1407        let mut labels_builder = ListBuilder::new(StringBuilder::new());
1408        for (_, vid, _, _) in expansions {
1409            let mut row_labels: Vec<String> = Vec::new();
1410            for l0 in l0_ctx.iter_l0_buffers() {
1411                let guard = l0.read();
1412                if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1413                    for lbl in l0_labels {
1414                        if !row_labels.contains(lbl) {
1415                            row_labels.push(lbl.clone());
1416                        }
1417                    }
1418                }
1419            }
1420            let values = labels_builder.values();
1421            for lbl in &row_labels {
1422                values.append_value(lbl);
1423            }
1424            labels_builder.append(true);
1425        }
1426        columns.push(Arc::new(labels_builder.finish()));
1427    }
1428
1429    // Add edge columns if edge is bound (no properties in sync path)
1430    if edge_variable.is_some() {
1431        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1432        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1433
1434        // Add edge _type column
1435        let uni_schema = graph_ctx.storage().schema_manager().schema();
1436        let mut type_builder = arrow_array::builder::StringBuilder::new();
1437        for (_, _, _, edge_type_id) in expansions {
1438            if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1439                type_builder.append_value(&name);
1440            } else {
1441                type_builder.append_null();
1442            }
1443        }
1444        columns.push(Arc::new(type_builder.finish()));
1445    } else {
1446        // Internal EID column for relationship uniqueness tracking (matches schema)
1447        let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1448        columns.push(Arc::new(UInt64Array::from(edge_ids)));
1449    }
1450
1451    let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1452
1453    if optional {
1454        let matched_indices: HashSet<usize> =
1455            expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1456        let unmatched = collect_unmatched_optional_group_rows(
1457            input,
1458            &matched_indices,
1459            schema,
1460            optional_pattern_vars,
1461        )?;
1462
1463        if !unmatched.is_empty() {
1464            let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1465                input,
1466                &unmatched,
1467                schema,
1468                optional_pattern_vars,
1469            )?;
1470            let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1471                .map_err(|e| {
1472                datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1473            })?;
1474            return Ok(combined);
1475        }
1476    }
1477
1478    Ok(expanded_batch)
1479}
1480
1481impl RecordBatchStream for GraphTraverseStream {
1482    fn schema(&self) -> SchemaRef {
1483        self.schema.clone()
1484    }
1485}
1486
1487/// Adjacency map type: maps source VID to list of (target_vid, eid, edge_type_name, properties).
1488///
1489/// Type name and properties are `Arc`-shared: an undirected (`Both`) traversal
1490/// stores every edge under both endpoints, which used to deep-clone the whole
1491/// property map per edge (review perf #5).
1492type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1493
1494/// Graph traversal execution plan for schemaless edge types (TraverseMainByType).
1495///
1496/// Unlike GraphTraverseExec which uses CSR adjacency for known types, this operator
1497/// queries the main edges table for schemaless types and builds an in-memory adjacency map.
1498///
1499/// # Example
1500///
1501/// ```ignore
1502/// // Traverse schemaless "CUSTOM" edges
1503/// let traverse = GraphTraverseMainExec::new(
1504///     input_plan,
1505///     "_vid",
1506///     "CUSTOM",
1507///     Direction::Outgoing,
1508///     "m",           // target variable
1509///     Some("r"),     // edge variable
1510///     vec![],        // edge properties
1511///     vec![],        // target properties
1512///     graph_ctx,
1513///     false,         // not optional
1514/// );
1515/// ```
1516pub struct GraphTraverseMainExec {
1517    /// Input execution plan.
1518    input: Arc<dyn ExecutionPlan>,
1519
1520    /// Column name containing source VIDs.
1521    source_column: String,
1522
1523    /// Edge type names (not IDs, since schemaless types may not have IDs).
1524    /// Supports OR relationship types like `[:KNOWS|HATES]`.
1525    type_names: Vec<String>,
1526
1527    /// Traversal direction.
1528    direction: Direction,
1529
1530    /// Variable name for target vertex columns.
1531    target_variable: String,
1532
1533    /// Variable name for edge columns (if edge is bound).
1534    edge_variable: Option<String>,
1535
1536    /// Edge properties to materialize.
1537    edge_properties: Vec<String>,
1538
1539    /// Target vertex properties to materialize.
1540    target_properties: Vec<String>,
1541
1542    /// Graph execution context.
1543    graph_ctx: Arc<GraphExecutionContext>,
1544
1545    /// Whether this is an OPTIONAL MATCH (preserve unmatched source rows with NULLs).
1546    optional: bool,
1547
1548    /// Variables introduced by the OPTIONAL MATCH pattern.
1549    optional_pattern_vars: HashSet<String>,
1550
1551    /// Column name of an already-bound target VID (for patterns where target is in scope).
1552    /// When set, only traversals reaching this exact VID are included.
1553    bound_target_column: Option<String>,
1554
1555    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1556    /// Edges matching any of these IDs are excluded from traversal results.
1557    used_edge_columns: Vec<String>,
1558
1559    /// Output schema.
1560    schema: SchemaRef,
1561
1562    /// Cached plan properties.
1563    properties: Arc<PlanProperties>,
1564
1565    /// Execution metrics.
1566    metrics: ExecutionPlanMetricsSet,
1567}
1568
1569impl fmt::Debug for GraphTraverseMainExec {
1570    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1571        f.debug_struct("GraphTraverseMainExec")
1572            .field("type_names", &self.type_names)
1573            .field("direction", &self.direction)
1574            .field("target_variable", &self.target_variable)
1575            .field("edge_variable", &self.edge_variable)
1576            .finish()
1577    }
1578}
1579
1580impl GraphTraverseMainExec {
1581    /// Create a new schemaless traversal executor.
1582    #[expect(clippy::too_many_arguments)]
1583    pub fn new(
1584        input: Arc<dyn ExecutionPlan>,
1585        source_column: impl Into<String>,
1586        type_names: Vec<String>,
1587        direction: Direction,
1588        target_variable: impl Into<String>,
1589        edge_variable: Option<String>,
1590        edge_properties: Vec<String>,
1591        target_properties: Vec<String>,
1592        graph_ctx: Arc<GraphExecutionContext>,
1593        optional: bool,
1594        optional_pattern_vars: HashSet<String>,
1595        bound_target_column: Option<String>,
1596        used_edge_columns: Vec<String>,
1597    ) -> Self {
1598        let source_column = source_column.into();
1599        let target_variable = target_variable.into();
1600
1601        // Build output schema
1602        let schema = Self::build_schema(
1603            &input.schema(),
1604            &target_variable,
1605            &edge_variable,
1606            &edge_properties,
1607            &target_properties,
1608            optional,
1609        );
1610
1611        let properties = compute_plan_properties(schema.clone());
1612
1613        Self {
1614            input,
1615            source_column,
1616            type_names,
1617            direction,
1618            target_variable,
1619            edge_variable,
1620            edge_properties,
1621            target_properties,
1622            graph_ctx,
1623            optional,
1624            optional_pattern_vars,
1625            bound_target_column,
1626            used_edge_columns,
1627            schema,
1628            properties,
1629            metrics: ExecutionPlanMetricsSet::new(),
1630        }
1631    }
1632
1633    /// Build output schema for traversal.
1634    fn build_schema(
1635        input_schema: &SchemaRef,
1636        target_variable: &str,
1637        edge_variable: &Option<String>,
1638        edge_properties: &[String],
1639        target_properties: &[String],
1640        optional: bool,
1641    ) -> SchemaRef {
1642        let mut fields: Vec<Field> = input_schema
1643            .fields()
1644            .iter()
1645            .map(|f| f.as_ref().clone())
1646            .collect();
1647
1648        // Add target ._vid column (only if not already in input, nullable for OPTIONAL MATCH)
1649        let target_vid_name = format!("{}._vid", target_variable);
1650        if input_schema.column_with_name(&target_vid_name).is_none() {
1651            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1652        }
1653
1654        // Add target ._labels column (only if not already in input)
1655        let target_labels_name = format!("{}._labels", target_variable);
1656        if input_schema.column_with_name(&target_labels_name).is_none() {
1657            fields.push(Field::new(target_labels_name, labels_data_type(), true));
1658        }
1659
1660        // Add edge columns if edge variable is bound
1661        if let Some(edge_var) = edge_variable {
1662            fields.push(Field::new(
1663                format!("{}._eid", edge_var),
1664                DataType::UInt64,
1665                optional,
1666            ));
1667
1668            // Add edge ._type column for type(r) support
1669            fields.push(Field::new(
1670                format!("{}._type", edge_var),
1671                DataType::Utf8,
1672                true,
1673            ));
1674
1675            // Edge properties: LargeBinary (cv_encoded) to preserve value types.
1676            // Schemaless edges store properties as CypherValue blobs so that
1677            // Int, Float, etc. round-trip correctly through Arrow.
1678            for prop in edge_properties {
1679                let col_name = format!("{}.{}", edge_var, prop);
1680                let mut metadata = std::collections::HashMap::new();
1681                metadata.insert("cv_encoded".to_string(), "true".to_string());
1682                fields.push(
1683                    Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1684                );
1685            }
1686        } else {
1687            // Add internal edge ID for anonymous relationships so BindPath can
1688            // reconstruct named paths (p = (a)-[:T]->(b)).
1689            fields.push(Field::new(
1690                format!("__eid_to_{}", target_variable),
1691                DataType::UInt64,
1692                optional,
1693            ));
1694        }
1695
1696        // Target properties: all as LargeBinary (deferred to PropertyManager)
1697        for prop in target_properties {
1698            fields.push(Field::new(
1699                format!("{}.{}", target_variable, prop),
1700                DataType::LargeBinary,
1701                true,
1702            ));
1703        }
1704
1705        Arc::new(Schema::new(fields))
1706    }
1707}
1708
1709impl DisplayAs for GraphTraverseMainExec {
1710    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1711        write!(
1712            f,
1713            "GraphTraverseMainExec: types={:?}, direction={:?}",
1714            self.type_names, self.direction
1715        )
1716    }
1717}
1718
1719impl ExecutionPlan for GraphTraverseMainExec {
1720    fn name(&self) -> &str {
1721        "GraphTraverseMainExec"
1722    }
1723
1724    fn as_any(&self) -> &dyn Any {
1725        self
1726    }
1727
1728    fn schema(&self) -> SchemaRef {
1729        self.schema.clone()
1730    }
1731
1732    fn properties(&self) -> &Arc<PlanProperties> {
1733        &self.properties
1734    }
1735
1736    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1737        vec![&self.input]
1738    }
1739
1740    fn with_new_children(
1741        self: Arc<Self>,
1742        children: Vec<Arc<dyn ExecutionPlan>>,
1743    ) -> DFResult<Arc<dyn ExecutionPlan>> {
1744        if children.len() != 1 {
1745            return Err(datafusion::error::DataFusionError::Plan(
1746                "GraphTraverseMainExec expects exactly one child".to_string(),
1747            ));
1748        }
1749
1750        Ok(Arc::new(Self {
1751            input: children[0].clone(),
1752            source_column: self.source_column.clone(),
1753            type_names: self.type_names.clone(),
1754            direction: self.direction,
1755            target_variable: self.target_variable.clone(),
1756            edge_variable: self.edge_variable.clone(),
1757            edge_properties: self.edge_properties.clone(),
1758            target_properties: self.target_properties.clone(),
1759            graph_ctx: self.graph_ctx.clone(),
1760            optional: self.optional,
1761            optional_pattern_vars: self.optional_pattern_vars.clone(),
1762            bound_target_column: self.bound_target_column.clone(),
1763            used_edge_columns: self.used_edge_columns.clone(),
1764            schema: self.schema.clone(),
1765            properties: self.properties.clone(),
1766            metrics: self.metrics.clone(),
1767        }))
1768    }
1769
1770    fn execute(
1771        &self,
1772        partition: usize,
1773        context: Arc<TaskContext>,
1774    ) -> DFResult<SendableRecordBatchStream> {
1775        let input_stream = self.input.execute(partition, context)?;
1776        let metrics = BaselineMetrics::new(&self.metrics, partition);
1777
1778        Ok(Box::pin(GraphTraverseMainStream::new(
1779            input_stream,
1780            self.source_column.clone(),
1781            self.type_names.clone(),
1782            self.direction,
1783            self.target_variable.clone(),
1784            self.edge_variable.clone(),
1785            self.edge_properties.clone(),
1786            self.target_properties.clone(),
1787            self.graph_ctx.clone(),
1788            self.optional,
1789            self.optional_pattern_vars.clone(),
1790            self.bound_target_column.clone(),
1791            self.used_edge_columns.clone(),
1792            self.schema.clone(),
1793            metrics,
1794        )))
1795    }
1796
1797    fn metrics(&self) -> Option<MetricsSet> {
1798        Some(self.metrics.clone_inner())
1799    }
1800}
1801
1802/// Maximum number of distinct source vids pushed into the edge scan; above
1803/// this a whole-type scan is cheaper than a giant `IN` filter (measured: a
1804/// 10k-vid IN over a 100k-edge table is ~2.6x slower than the full scan,
1805/// while small source sets are ~4.5x faster — see `schemaless_traversal`
1806/// bench).
1807const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1808
1809/// State machine for GraphTraverseMainStream.
1810///
1811/// The input is drained FIRST so the distinct source vids can be pushed into
1812/// the edge scan (review perf #5) — without this, a 1-source traversal
1813/// materialized every edge of the type. Memory note: buffering the input does
1814/// not change the operator's memory class; it already materialized the entire
1815/// edge type (and with pushdown materializes strictly less).
1816enum GraphTraverseMainState {
1817    /// Draining the input stream to learn the source-vid set.
1818    CollectingInput {
1819        input_stream: SendableRecordBatchStream,
1820        buffered: Vec<RecordBatch>,
1821    },
1822    /// Loading adjacency map from main edges table.
1823    LoadingEdges {
1824        future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1825        buffered: Vec<RecordBatch>,
1826    },
1827    /// Replaying the buffered input batches against the loaded adjacency.
1828    Processing {
1829        adjacency: EdgeAdjacencyMap,
1830        buffered: std::vec::IntoIter<RecordBatch>,
1831    },
1832    /// Stream is done.
1833    Done,
1834}
1835
1836/// Stream that executes schemaless edge traversal.
1837struct GraphTraverseMainStream {
1838    /// Source column name.
1839    source_column: String,
1840
1841    /// Target variable name.
1842    target_variable: String,
1843
1844    /// Edge variable name.
1845    edge_variable: Option<String>,
1846
1847    /// Edge properties to materialize.
1848    edge_properties: Vec<String>,
1849
1850    /// Target properties to materialize.
1851    target_properties: Vec<String>,
1852
1853    /// Graph execution context.
1854    graph_ctx: Arc<GraphExecutionContext>,
1855
1856    /// Whether this is optional (preserve unmatched rows).
1857    optional: bool,
1858
1859    /// Variables introduced by OPTIONAL pattern.
1860    optional_pattern_vars: HashSet<String>,
1861
1862    /// Column name of an already-bound target VID (for filtering).
1863    bound_target_column: Option<String>,
1864
1865    /// Columns containing edge IDs from previous hops (for relationship uniqueness).
1866    used_edge_columns: Vec<String>,
1867
1868    /// Output schema.
1869    schema: SchemaRef,
1870
1871    /// Edge type names to traverse (retained for the deferred edge load).
1872    type_names: Vec<String>,
1873
1874    /// Traversal direction (retained for the deferred edge load).
1875    direction: Direction,
1876
1877    /// Stream state.
1878    state: GraphTraverseMainState,
1879
1880    /// Metrics.
1881    metrics: BaselineMetrics,
1882}
1883
1884impl GraphTraverseMainStream {
1885    /// Create a new traverse main stream.
1886    #[expect(clippy::too_many_arguments)]
1887    fn new(
1888        input_stream: SendableRecordBatchStream,
1889        source_column: String,
1890        type_names: Vec<String>,
1891        direction: Direction,
1892        target_variable: String,
1893        edge_variable: Option<String>,
1894        edge_properties: Vec<String>,
1895        target_properties: Vec<String>,
1896        graph_ctx: Arc<GraphExecutionContext>,
1897        optional: bool,
1898        optional_pattern_vars: HashSet<String>,
1899        bound_target_column: Option<String>,
1900        used_edge_columns: Vec<String>,
1901        schema: SchemaRef,
1902        metrics: BaselineMetrics,
1903    ) -> Self {
1904        // The input is drained first (CollectingInput) so the edge load can
1905        // push the bounded source-vid set into the scan.
1906        Self {
1907            source_column,
1908            target_variable,
1909            edge_variable,
1910            edge_properties,
1911            target_properties,
1912            graph_ctx,
1913            optional,
1914            optional_pattern_vars,
1915            bound_target_column,
1916            used_edge_columns,
1917            schema,
1918            type_names,
1919            direction,
1920            state: GraphTraverseMainState::CollectingInput {
1921                input_stream,
1922                buffered: Vec::new(),
1923            },
1924            metrics,
1925        }
1926    }
1927
1928    /// Distinct source vids across the buffered input, or `None` when the set
1929    /// exceeds [`TRAVERSE_PUSHDOWN_MAX_SOURCES`] (whole-type scan is cheaper)
1930    /// or the source column cannot be read (the edge load then behaves exactly
1931    /// as before pushdown existed).
1932    fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1933        let mut vids: HashSet<Vid> = HashSet::new();
1934        for batch in buffered {
1935            let col = batch.column_by_name(&self.source_column)?;
1936            let arr = column_as_vid_array(col.as_ref()).ok()?;
1937            for i in 0..arr.len() {
1938                if !arr.is_null(i) {
1939                    vids.insert(Vid::from(arr.value(i)));
1940                    if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1941                        return None;
1942                    }
1943                }
1944            }
1945        }
1946        Some(vids)
1947    }
1948
1949    /// Expand input batch using adjacency map (synchronous version).
1950    fn expand_batch(
1951        &self,
1952        input: &RecordBatch,
1953        adjacency: &EdgeAdjacencyMap,
1954    ) -> DFResult<RecordBatch> {
1955        // Extract source VIDs from source column
1956        let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1957            datafusion::error::DataFusionError::Execution(format!(
1958                "Source column {} not found",
1959                self.source_column
1960            ))
1961        })?;
1962
1963        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1964        let source_vids: &UInt64Array = &source_vid_cow;
1965
1966        // Read bound target VIDs if column exists
1967        let bound_target_cow = self
1968            .bound_target_column
1969            .as_ref()
1970            .and_then(|col| input.column_by_name(col))
1971            .map(|c| column_as_vid_array(c.as_ref()))
1972            .transpose()?;
1973        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1974
1975        // Collect edge ID arrays from previous hops for relationship uniqueness filtering.
1976        let used_edge_arrays: Vec<&UInt64Array> = self
1977            .used_edge_columns
1978            .iter()
1979            .filter_map(|col| {
1980                input
1981                    .column_by_name(col)
1982                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1983            })
1984            .collect();
1985
1986        // Build expansions: (input_row_idx, target_vid, eid, edge_type, edge_props).
1987        // Type/props are Arc-shared with the adjacency map (pointer clones).
1988        type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1989        let mut expansions: Vec<Expansion> = Vec::new();
1990
1991        for (row_idx, src_u64) in source_vids.iter().enumerate() {
1992            if let Some(src_u64) = src_u64 {
1993                let src_vid = Vid::from(src_u64);
1994
1995                // Collect used edge IDs for this row from all previous hops
1996                let used_eids: HashSet<u64> = used_edge_arrays
1997                    .iter()
1998                    .filter_map(|arr| {
1999                        if arr.is_null(row_idx) {
2000                            None
2001                        } else {
2002                            Some(arr.value(row_idx))
2003                        }
2004                    })
2005                    .collect();
2006
2007                if let Some(neighbors) = adjacency.get(&src_vid) {
2008                    for (target_vid, eid, edge_type, props) in neighbors {
2009                        // Skip edges already used in previous hops (relationship uniqueness)
2010                        if used_eids.contains(&eid.as_u64()) {
2011                            continue;
2012                        }
2013
2014                        // Filter by bound target VID if set (for patterns where target is in scope).
2015                        // Only include traversals where the target matches the expected VID.
2016                        if let Some(targets) = expected_targets {
2017                            if targets.is_null(row_idx) {
2018                                continue;
2019                            }
2020                            let expected_vid = targets.value(row_idx);
2021                            if target_vid.as_u64() != expected_vid {
2022                                continue;
2023                            }
2024                        }
2025
2026                        expansions.push((
2027                            row_idx,
2028                            *target_vid,
2029                            *eid,
2030                            Arc::clone(edge_type),
2031                            Arc::clone(props),
2032                        ));
2033                    }
2034                }
2035            }
2036        }
2037
2038        // Handle OPTIONAL: preserve unmatched rows
2039        if expansions.is_empty() && self.optional {
2040            // No matches - return input with NULL columns appended
2041            let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2042            return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2043        }
2044
2045        if expansions.is_empty() {
2046            // No matches, not optional - return empty batch
2047            return Ok(RecordBatch::new_empty(self.schema.clone()));
2048        }
2049
2050        // Track matched rows for OPTIONAL handling
2051        let matched_rows: HashSet<usize> = if self.optional {
2052            expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2053        } else {
2054            HashSet::new()
2055        };
2056
2057        // Expand input columns using Arrow take()
2058        let mut columns: Vec<ArrayRef> = Vec::new();
2059        let indices: Vec<u64> = expansions
2060            .iter()
2061            .map(|(idx, _, _, _, _)| *idx as u64)
2062            .collect();
2063        let indices_array = UInt64Array::from(indices);
2064
2065        for col in input.columns() {
2066            let expanded = take(col.as_ref(), &indices_array, None)?;
2067            columns.push(expanded);
2068        }
2069
2070        // Add target ._vid column (only if not already in input)
2071        let target_vid_name = format!("{}._vid", self.target_variable);
2072        let target_vids: Vec<u64> = expansions
2073            .iter()
2074            .map(|(_, vid, _, _, _)| vid.as_u64())
2075            .collect();
2076        if input.schema().column_with_name(&target_vid_name).is_none() {
2077            columns.push(Arc::new(UInt64Array::from(target_vids)));
2078        }
2079
2080        // Add target ._labels column (only if not already in input)
2081        let target_labels_name = format!("{}._labels", self.target_variable);
2082        if input
2083            .schema()
2084            .column_with_name(&target_labels_name)
2085            .is_none()
2086        {
2087            use arrow_array::builder::{ListBuilder, StringBuilder};
2088            // Resolve labels from the L0 chain and then the persisted
2089            // VidLabelsIndex. The index covers vertices that live only in Lance
2090            // storage — notably on a fork, whose data is flushed to Lance before
2091            // branching — so a `hasLabel(b, "B")` filter over this column matches
2092            // there too. (GitHub #99)
2093            let query_ctx = self.graph_ctx.query_context();
2094            let mut labels_builder = ListBuilder::new(StringBuilder::new());
2095            for (_, target_vid, _, _, _) in &expansions {
2096                let row_labels = self
2097                    .graph_ctx
2098                    .resolve_vertex_labels(*target_vid, &query_ctx)
2099                    .unwrap_or_default();
2100                let values = labels_builder.values();
2101                for lbl in &row_labels {
2102                    values.append_value(lbl);
2103                }
2104                labels_builder.append(true);
2105            }
2106            columns.push(Arc::new(labels_builder.finish()));
2107        }
2108
2109        // Add edge columns if edge variable is bound.
2110        // For anonymous relationships, emit internal edge IDs for BindPath.
2111        if self.edge_variable.is_some() {
2112            // Add edge ._eid column
2113            let eids: Vec<u64> = expansions
2114                .iter()
2115                .map(|(_, _, eid, _, _)| eid.as_u64())
2116                .collect();
2117            columns.push(Arc::new(UInt64Array::from(eids)));
2118
2119            // Add edge ._type column
2120            {
2121                let mut type_builder = arrow_array::builder::StringBuilder::new();
2122                for (_, _, _, edge_type, _) in &expansions {
2123                    type_builder.append_value(edge_type);
2124                }
2125                columns.push(Arc::new(type_builder.finish()));
2126            }
2127
2128            // Add edge property columns as cv_encoded LargeBinary to preserve types
2129            for prop_name in &self.edge_properties {
2130                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2131                if prop_name == "_all_props" {
2132                    // Serialize all edge properties to a CypherValue blob directly
2133                    // from `Value` so typed values (temporals) are preserved.
2134                    for (_, _, _, _, props) in &expansions {
2135                        if props.is_empty() {
2136                            builder.append_null();
2137                        } else {
2138                            let map: HashMap<String, uni_common::Value> =
2139                                props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2140                            builder.append_value(uni_common::cypher_value_codec::encode(
2141                                &uni_common::Value::Map(map),
2142                            ));
2143                        }
2144                    }
2145                } else {
2146                    // Named property as cv_encoded CypherValue
2147                    for (_, _, _, _, props) in &expansions {
2148                        match props.get(prop_name) {
2149                            Some(uni_common::Value::Null) | None => builder.append_null(),
2150                            Some(val) => {
2151                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2152                            }
2153                        }
2154                    }
2155                }
2156                columns.push(Arc::new(builder.finish()));
2157            }
2158        } else {
2159            let eids: Vec<u64> = expansions
2160                .iter()
2161                .map(|(_, _, eid, _, _)| eid.as_u64())
2162                .collect();
2163            columns.push(Arc::new(UInt64Array::from(eids)));
2164        }
2165
2166        // Add target property columns (hydrate from L0 buffers)
2167        {
2168            let l0_ctx = self.graph_ctx.l0_context();
2169
2170            for prop_name in &self.target_properties {
2171                if prop_name == "_all_props" {
2172                    // Build full CypherValue blob from all L0 vertex properties
2173                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2174                    for (_, target_vid, _, _, _) in &expansions {
2175                        // Merge in `Value` space so typed values (temporals) survive.
2176                        let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2177                        for l0 in l0_ctx.iter_l0_buffers() {
2178                            let guard = l0.read();
2179                            if let Some(props) = guard.vertex_properties.get(target_vid) {
2180                                for (k, v) in props.iter() {
2181                                    merged_props.insert(k.clone(), v.clone());
2182                                }
2183                            }
2184                        }
2185                        if merged_props.is_empty() {
2186                            builder.append_null();
2187                        } else {
2188                            builder.append_value(uni_common::cypher_value_codec::encode(
2189                                &uni_common::Value::Map(merged_props),
2190                            ));
2191                        }
2192                    }
2193                    columns.push(Arc::new(builder.finish()));
2194                } else {
2195                    // Extract individual property from L0 and encode as CypherValue
2196                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2197                    for (_, target_vid, _, _, _) in &expansions {
2198                        let mut found = false;
2199                        for l0 in l0_ctx.iter_l0_buffers() {
2200                            let guard = l0.read();
2201                            if let Some(props) = guard.vertex_properties.get(target_vid)
2202                                && let Some(val) = props.get(prop_name.as_str())
2203                                && !val.is_null()
2204                            {
2205                                builder.append_value(uni_common::cypher_value_codec::encode(val));
2206                                found = true;
2207                                break;
2208                            }
2209                        }
2210                        if !found {
2211                            builder.append_null();
2212                        }
2213                    }
2214                    columns.push(Arc::new(builder.finish()));
2215                }
2216            }
2217        }
2218
2219        let matched_batch =
2220            RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2221
2222        // Handle OPTIONAL: append unmatched rows with NULLs
2223        if self.optional {
2224            let unmatched = collect_unmatched_optional_group_rows(
2225                input,
2226                &matched_rows,
2227                &self.schema,
2228                &self.optional_pattern_vars,
2229            )?;
2230
2231            if unmatched.is_empty() {
2232                return Ok(matched_batch);
2233            }
2234
2235            let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2236                input,
2237                &unmatched,
2238                &self.schema,
2239                &self.optional_pattern_vars,
2240            )?;
2241
2242            // Concatenate matched and unmatched batches
2243            use arrow::compute::concat_batches;
2244            concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2245        } else {
2246            Ok(matched_batch)
2247        }
2248    }
2249}
2250
2251impl GraphExecutionContext {
2252    /// Records a schemaless traversal's full edge-type scan into the SSI read-set.
2253    ///
2254    /// `build_edge_adjacency_map` scans every edge of the traversed type(s) via
2255    /// `find_edges_by_type_names` up front, so the transaction's true read
2256    /// footprint is that whole adjacency — not just the neighbourhoods later
2257    /// expanded. Recording it here gives schemaless single-hop and
2258    /// variable-length traversal (which bypass the per-vertex `get_neighbors`
2259    /// path) the same item-level antidependency coverage the schema-typed
2260    /// `record_neighbor_reads` path provides. No-op for read-only transactions
2261    /// (`occ_read_set` is `Some` only for read-write transactions).
2262    fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2263        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2264            return;
2265        };
2266        let guard = tx_l0.read();
2267        let Some(read_set) = &guard.occ_read_set else {
2268            return;
2269        };
2270        let mut rs = read_set.lock();
2271        for (src, neighbors) in adjacency {
2272            rs.vertices.insert(*src);
2273            for (nbr, eid, _type, _props) in neighbors {
2274                rs.vertices.insert(*nbr);
2275                rs.edges.insert(*eid);
2276            }
2277        }
2278    }
2279}
2280
2281/// Build adjacency map from main edges table for given type names and direction.
2282///
2283/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
2284/// Returns a HashMap mapping source VID -> Vec<(target_vid, eid, properties)>
2285/// Direction determines the key: Outgoing uses src_vid, Incoming uses dst_vid, Both adds entries for both.
2286///
2287/// `source_vids`, when supplied, is pushed into the persisted scan so only the
2288/// sources' incident edges are materialized instead of the whole edge type
2289/// (review perf #5). The L0 overlay below deliberately stays whole-type: L0 is
2290/// small, and unused adjacency entries are never expanded.
2291async fn build_edge_adjacency_map(
2292    graph_ctx: &GraphExecutionContext,
2293    type_names: &[String],
2294    direction: Direction,
2295    source_vids: Option<HashSet<Vid>>,
2296) -> DFResult<EdgeAdjacencyMap> {
2297    let storage = graph_ctx.storage();
2298    let l0_ctx = graph_ctx.l0_context();
2299
2300    // Step 1: Query main edges table for all type names, pushing the bounded
2301    // source set into the scan when present.
2302    let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2303    let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2304    let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2305        let side = match direction {
2306            Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2307            Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2308            Direction::Both => uni_store::storage::EndpointSide::Either,
2309        };
2310        (side, vids)
2311    });
2312    let edges_with_type = storage
2313        .find_edges_by_type_names(&type_refs, endpoint_filter)
2314        .await
2315        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2316
2317    // Edge filter matching the pushed-down scan, applied to the L0 overlay
2318    // below so the map (and therefore the SSI read-set recorded from it) has
2319    // ONE consistent footprint across both storage tiers.
2320    let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2321        match &source_vids {
2322            None => true,
2323            Some(set) => match direction {
2324                Direction::Outgoing => set.contains(&src),
2325                Direction::Incoming => set.contains(&dst),
2326                Direction::Both => set.contains(&src) || set.contains(&dst),
2327            },
2328        }
2329    };
2330
2331    // Preserve edge type name in the adjacency map for type(r) support
2332    let mut edges: Vec<(
2333        uni_common::Eid,
2334        uni_common::Vid,
2335        uni_common::Vid,
2336        String,
2337        uni_common::Properties,
2338    )> = edges_with_type.into_iter().collect();
2339
2340    // Step 2: Overlay L0 buffers for all type names
2341    for l0 in l0_ctx.iter_l0_buffers() {
2342        let l0_guard = l0.read();
2343
2344        for type_name in type_names {
2345            let l0_eids = l0_guard.eids_for_type(type_name);
2346
2347            // For each L0 edge, extract its information
2348            for &eid in &l0_eids {
2349                if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2350                    let src_vid = edge_ref.src_vid;
2351                    let dst_vid = edge_ref.dst_vid;
2352                    if !edge_in_scope(src_vid, dst_vid) {
2353                        continue;
2354                    }
2355
2356                    // Get properties for this edge from L0
2357                    let props = l0_guard
2358                        .edge_properties
2359                        .get(&eid)
2360                        .cloned()
2361                        .unwrap_or_default();
2362
2363                    edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2364                }
2365            }
2366        }
2367    }
2368
2369    // Step 3: Deduplicate by EID (L0 takes precedence)
2370    let mut seen_eids = HashSet::new();
2371    let mut unique_edges = Vec::new();
2372    for edge in edges.into_iter().rev() {
2373        if seen_eids.insert(edge.0) {
2374            unique_edges.push(edge);
2375        }
2376    }
2377    unique_edges.reverse();
2378
2379    // Step 4: Filter out edges tombstoned in any L0 buffer
2380    let mut tombstoned_eids = HashSet::new();
2381    for l0 in l0_ctx.iter_l0_buffers() {
2382        let l0_guard = l0.read();
2383        for eid in l0_guard.tombstones.keys() {
2384            tombstoned_eids.insert(*eid);
2385        }
2386    }
2387    if !tombstoned_eids.is_empty() {
2388        unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2389    }
2390
2391    // Step 5: Build adjacency map based on direction. Type name and props are
2392    // Arc-shared so the `Both` arm clones two pointers, not the property map.
2393    let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2394
2395    for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2396        let edge_type: Arc<str> = edge_type.into();
2397        let props = Arc::new(props);
2398        match direction {
2399            Direction::Outgoing => {
2400                adjacency
2401                    .entry(src_vid)
2402                    .or_default()
2403                    .push((dst_vid, eid, edge_type, props));
2404            }
2405            Direction::Incoming => {
2406                adjacency
2407                    .entry(dst_vid)
2408                    .or_default()
2409                    .push((src_vid, eid, edge_type, props));
2410            }
2411            Direction::Both => {
2412                adjacency.entry(src_vid).or_default().push((
2413                    dst_vid,
2414                    eid,
2415                    Arc::clone(&edge_type),
2416                    Arc::clone(&props),
2417                ));
2418                adjacency
2419                    .entry(dst_vid)
2420                    .or_default()
2421                    .push((src_vid, eid, edge_type, props));
2422            }
2423        }
2424    }
2425
2426    // SSI: a schemaless traversal physically scans the edge type above —
2427    // whole-type, or with source pushdown every edge incident to every actual
2428    // source — so record that read footprint for read-write transactions
2429    // (no-op otherwise). With pushdown this records exactly what the
2430    // transaction logically read; the whole-type scan was incidentally
2431    // (not load-bearing) more conservative under the item-level,
2432    // phantoms-out-of-scope read-set contract.
2433    graph_ctx.record_edge_adjacency(&adjacency);
2434
2435    Ok(adjacency)
2436}
2437
2438/// Build the edge-property `EidFilter` for a typed variable-length traversal.
2439///
2440/// For a pattern like `[r:KNOWS*1..3 {year: 1988}]` the inline edge-property
2441/// conditions must filter *flushed* (CSR/Lance) edges too — not only L0
2442/// in-memory edges as the original code did. We pre-scan every edge of the
2443/// traversed type(s) — flushed edges via `find_edges_by_type_names`, L0 edges
2444/// via the overlay, both handled (with tombstone/dedup semantics) by
2445/// [`build_edge_adjacency_map`] — evaluate the conditions against each edge's
2446/// materialized properties, and collect the passing Eids into an allow-set.
2447///
2448/// Returns [`EidFilter::AllAllowed`] when there are no conditions (nothing to
2449/// filter) or when no edge type id resolves to a name (cannot pre-scan —
2450/// degrade to the prior, less-restrictive behaviour rather than drop every
2451/// edge).
2452async fn build_edge_property_filter(
2453    graph_ctx: &Arc<GraphExecutionContext>,
2454    edge_type_ids: &[u32],
2455    direction: Direction,
2456    conditions: &[(String, UniValue)],
2457) -> DFResult<EidFilter> {
2458    if conditions.is_empty() {
2459        return Ok(EidFilter::AllAllowed);
2460    }
2461
2462    let uni_schema = graph_ctx.storage().schema_manager().schema();
2463    let type_names: Vec<String> = edge_type_ids
2464        .iter()
2465        .filter_map(|id| uni_schema.edge_type_name_by_id_unified(*id))
2466        .collect();
2467    if type_names.is_empty() {
2468        return Ok(EidFilter::AllAllowed);
2469    }
2470
2471    // Enumerate every edge of these types (flushed + L0, tombstone-aware) with
2472    // properties materialized. `None` source-vids => whole-type scan, matching
2473    // the schemaless VLP path.
2474    let adjacency = build_edge_adjacency_map(graph_ctx, &type_names, direction, None).await?;
2475
2476    let mut passing: Vec<u64> = Vec::new();
2477    let mut max_eid: u64 = 0;
2478    let mut seen: FxHashSet<u64> = FxHashSet::default();
2479    for edges in adjacency.values() {
2480        for (_neighbor, eid, _etype, props) in edges {
2481            let raw = eid.as_u64();
2482            // `Direction::Both` lists each edge under both endpoints — dedup so
2483            // the density heuristic in `from_eids` sees a true count.
2484            if !seen.insert(raw) {
2485                continue;
2486            }
2487            max_eid = max_eid.max(raw);
2488            let passes = conditions
2489                .iter()
2490                .all(|(name, expected)| props.get(name).is_some_and(|actual| actual == expected));
2491            if passes {
2492                passing.push(raw);
2493            }
2494        }
2495    }
2496
2497    Ok(EidFilter::from_eids(passing, max_eid as usize + 1))
2498}
2499
2500impl Stream for GraphTraverseMainStream {
2501    type Item = DFResult<RecordBatch>;
2502
2503    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2504        let metrics = self.metrics.clone();
2505        let _timer = metrics.elapsed_compute().timer();
2506        loop {
2507            let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2508
2509            match state {
2510                GraphTraverseMainState::CollectingInput {
2511                    mut input_stream,
2512                    mut buffered,
2513                } => match input_stream.poll_next_unpin(cx) {
2514                    Poll::Ready(Some(Ok(batch))) => {
2515                        buffered.push(batch);
2516                        self.state = GraphTraverseMainState::CollectingInput {
2517                            input_stream,
2518                            buffered,
2519                        };
2520                        // Continue loop to keep draining.
2521                    }
2522                    Poll::Ready(Some(Err(e))) => {
2523                        self.state = GraphTraverseMainState::Done;
2524                        return Poll::Ready(Some(Err(e)));
2525                    }
2526                    Poll::Ready(None) => {
2527                        // Input fully drained: kick off the edge load with the
2528                        // bounded source set pushed into the scan (when small).
2529                        let source_vids = self.collect_source_vids(&buffered);
2530                        let loading_ctx = self.graph_ctx.clone();
2531                        let loading_types = self.type_names.clone();
2532                        let direction = self.direction;
2533                        let fut = async move {
2534                            build_edge_adjacency_map(
2535                                &loading_ctx,
2536                                &loading_types,
2537                                direction,
2538                                source_vids,
2539                            )
2540                            .await
2541                        };
2542                        self.state = GraphTraverseMainState::LoadingEdges {
2543                            future: Box::pin(fut),
2544                            buffered,
2545                        };
2546                        // Continue loop to poll the load future.
2547                    }
2548                    Poll::Pending => {
2549                        self.state = GraphTraverseMainState::CollectingInput {
2550                            input_stream,
2551                            buffered,
2552                        };
2553                        return Poll::Pending;
2554                    }
2555                },
2556                GraphTraverseMainState::LoadingEdges {
2557                    mut future,
2558                    buffered,
2559                } => match future.as_mut().poll(cx) {
2560                    Poll::Ready(Ok(adjacency)) => {
2561                        // Move to processing state with loaded adjacency
2562                        self.state = GraphTraverseMainState::Processing {
2563                            adjacency,
2564                            buffered: buffered.into_iter(),
2565                        };
2566                        // Continue loop to start processing
2567                    }
2568                    Poll::Ready(Err(e)) => {
2569                        self.state = GraphTraverseMainState::Done;
2570                        return Poll::Ready(Some(Err(e)));
2571                    }
2572                    Poll::Pending => {
2573                        self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2574                        return Poll::Pending;
2575                    }
2576                },
2577                GraphTraverseMainState::Processing {
2578                    adjacency,
2579                    mut buffered,
2580                } => {
2581                    // Check timeout
2582                    if let Err(e) = self.graph_ctx.check_timeout() {
2583                        return Poll::Ready(Some(Err(
2584                            datafusion::error::DataFusionError::Execution(e.to_string()),
2585                        )));
2586                    }
2587
2588                    match buffered.next() {
2589                        Some(batch) => {
2590                            // Expand batch using adjacency map
2591                            let result = self.expand_batch(&batch, &adjacency);
2592
2593                            self.state = GraphTraverseMainState::Processing {
2594                                adjacency,
2595                                buffered,
2596                            };
2597
2598                            if let Ok(ref r) = result {
2599                                self.metrics.record_output(r.num_rows());
2600                            }
2601                            return Poll::Ready(Some(result));
2602                        }
2603                        None => {
2604                            self.state = GraphTraverseMainState::Done;
2605                            return Poll::Ready(None);
2606                        }
2607                    }
2608                }
2609                GraphTraverseMainState::Done => {
2610                    return Poll::Ready(None);
2611                }
2612            }
2613        }
2614    }
2615}
2616
2617impl RecordBatchStream for GraphTraverseMainStream {
2618    fn schema(&self) -> SchemaRef {
2619        self.schema.clone()
2620    }
2621}
2622
2623/// Variable-length graph traversal execution plan.
2624///
2625/// Performs BFS traversal from source vertices with configurable min/max hops.
2626/// Tracks visited nodes to avoid cycles.
2627///
2628/// # Example
2629///
2630/// ```ignore
2631/// // Find all nodes 1-3 hops away via KNOWS edges
2632/// let traverse = GraphVariableLengthTraverseExec::new(
2633///     input_plan,
2634///     "_vid",
2635///     knows_type_id,
2636///     Direction::Outgoing,
2637///     1,  // min_hops
2638///     3,  // max_hops
2639///     Some("p"), // path variable
2640///     graph_ctx,
2641/// );
2642/// ```
2643pub struct GraphVariableLengthTraverseExec {
2644    /// Input execution plan.
2645    input: Arc<dyn ExecutionPlan>,
2646
2647    /// Column name containing source VIDs.
2648    source_column: String,
2649
2650    /// Edge type IDs to traverse.
2651    edge_type_ids: Vec<u32>,
2652
2653    /// Traversal direction.
2654    direction: Direction,
2655
2656    /// Minimum number of hops.
2657    min_hops: usize,
2658
2659    /// Maximum number of hops.
2660    max_hops: usize,
2661
2662    /// Variable name for target vertex columns.
2663    target_variable: String,
2664
2665    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
2666    step_variable: Option<String>,
2667
2668    /// Variable name for path (if path is bound).
2669    path_variable: Option<String>,
2670
2671    /// Target vertex properties to materialize.
2672    target_properties: Vec<String>,
2673
2674    /// Target label name for property type resolution.
2675    target_label_name: Option<String>,
2676
2677    /// Whether this is an optional match (LEFT JOIN semantics).
2678    is_optional: bool,
2679
2680    /// Column name of an already-bound target VID (for patterns where target is in scope).
2681    bound_target_column: Option<String>,
2682
2683    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
2684    edge_lance_filter: Option<String>,
2685
2686    /// Simple property equality conditions for per-edge L0 checking during BFS.
2687    /// Each entry is (property_name, expected_value).
2688    edge_property_conditions: Vec<(String, UniValue)>,
2689
2690    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
2691    used_edge_columns: Vec<String>,
2692
2693    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
2694    path_mode: super::nfa::PathMode,
2695
2696    /// Output mode determining BFS strategy.
2697    output_mode: super::nfa::VlpOutputMode,
2698
2699    /// Compiled NFA for path pattern matching.
2700    nfa: Arc<PathNfa>,
2701
2702    /// Graph execution context.
2703    graph_ctx: Arc<GraphExecutionContext>,
2704
2705    /// Output schema.
2706    schema: SchemaRef,
2707
2708    /// Cached plan properties.
2709    properties: Arc<PlanProperties>,
2710
2711    /// Execution metrics.
2712    metrics: ExecutionPlanMetricsSet,
2713}
2714
2715impl fmt::Debug for GraphVariableLengthTraverseExec {
2716    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2717        f.debug_struct("GraphVariableLengthTraverseExec")
2718            .field("source_column", &self.source_column)
2719            .field("edge_type_ids", &self.edge_type_ids)
2720            .field("direction", &self.direction)
2721            .field("min_hops", &self.min_hops)
2722            .field("max_hops", &self.max_hops)
2723            .field("target_variable", &self.target_variable)
2724            .finish()
2725    }
2726}
2727
2728impl GraphVariableLengthTraverseExec {
2729    /// Create a new variable-length traversal plan.
2730    ///
2731    /// For QPP (Quantified Path Patterns), pass a pre-compiled NFA via `qpp_nfa`.
2732    /// For simple VLP patterns, pass `None` and the NFA will be compiled from
2733    /// `edge_type_ids`, `direction`, `min_hops`, `max_hops`.
2734    #[expect(clippy::too_many_arguments)]
2735    pub fn new(
2736        input: Arc<dyn ExecutionPlan>,
2737        source_column: impl Into<String>,
2738        edge_type_ids: Vec<u32>,
2739        direction: Direction,
2740        min_hops: usize,
2741        max_hops: usize,
2742        target_variable: impl Into<String>,
2743        step_variable: Option<String>,
2744        path_variable: Option<String>,
2745        target_properties: Vec<String>,
2746        target_label_name: Option<String>,
2747        graph_ctx: Arc<GraphExecutionContext>,
2748        is_optional: bool,
2749        bound_target_column: Option<String>,
2750        edge_lance_filter: Option<String>,
2751        edge_property_conditions: Vec<(String, UniValue)>,
2752        used_edge_columns: Vec<String>,
2753        path_mode: super::nfa::PathMode,
2754        output_mode: super::nfa::VlpOutputMode,
2755        qpp_nfa: Option<PathNfa>,
2756    ) -> Self {
2757        let source_column = source_column.into();
2758        let target_variable = target_variable.into();
2759
2760        // Resolve target property Arrow types from the schema
2761        let uni_schema = graph_ctx.storage().schema_manager().schema();
2762        let label_props = target_label_name
2763            .as_deref()
2764            .and_then(|ln| uni_schema.properties.get(ln));
2765
2766        // Build output schema
2767        let schema = Self::build_schema(
2768            input.schema(),
2769            &target_variable,
2770            step_variable.as_deref(),
2771            path_variable.as_deref(),
2772            &target_properties,
2773            label_props,
2774        );
2775        let properties = compute_plan_properties(schema.clone());
2776
2777        // Use pre-compiled QPP NFA if provided, otherwise compile from VLP parameters
2778        let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2779            PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2780        }));
2781
2782        Self {
2783            input,
2784            source_column,
2785            edge_type_ids,
2786            direction,
2787            min_hops,
2788            max_hops,
2789            target_variable,
2790            step_variable,
2791            path_variable,
2792            target_properties,
2793            target_label_name,
2794            is_optional,
2795            bound_target_column,
2796            edge_lance_filter,
2797            edge_property_conditions,
2798            used_edge_columns,
2799            path_mode,
2800            output_mode,
2801            nfa,
2802            graph_ctx,
2803            schema,
2804            properties,
2805            metrics: ExecutionPlanMetricsSet::new(),
2806        }
2807    }
2808
2809    /// Build output schema.
2810    fn build_schema(
2811        input_schema: SchemaRef,
2812        target_variable: &str,
2813        step_variable: Option<&str>,
2814        path_variable: Option<&str>,
2815        target_properties: &[String],
2816        label_props: Option<
2817            &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2818        >,
2819    ) -> SchemaRef {
2820        let mut fields: Vec<Field> = input_schema
2821            .fields()
2822            .iter()
2823            .map(|f| f.as_ref().clone())
2824            .collect();
2825
2826        // Add target VID column (only if not already in input)
2827        let target_vid_name = format!("{}._vid", target_variable);
2828        if input_schema.column_with_name(&target_vid_name).is_none() {
2829            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2830        }
2831
2832        // Add target ._labels column (only if not already in input)
2833        let target_labels_name = format!("{}._labels", target_variable);
2834        if input_schema.column_with_name(&target_labels_name).is_none() {
2835            fields.push(Field::new(target_labels_name, labels_data_type(), true));
2836        }
2837
2838        // Add target vertex property columns (skip if already in input)
2839        for prop_name in target_properties {
2840            let col_name = format!("{}.{}", target_variable, prop_name);
2841            if input_schema.column_with_name(&col_name).is_none() {
2842                let arrow_type = resolve_property_type(prop_name, label_props);
2843                let uni_type = label_props
2844                    .and_then(|p| p.get(prop_name))
2845                    .map(|m| &m.r#type);
2846                fields.push(property_field(&col_name, arrow_type, uni_type));
2847            }
2848        }
2849
2850        // Add hop count
2851        fields.push(Field::new("_hop_count", DataType::UInt64, false));
2852
2853        // Add step variable (edge list) if bound
2854        if let Some(step_var) = step_variable {
2855            fields.push(build_edge_list_field(step_var));
2856        }
2857
2858        // Add path struct if bound (only if not already in input from prior BindFixedPath)
2859        if let Some(path_var) = path_variable
2860            && input_schema.column_with_name(path_var).is_none()
2861        {
2862            fields.push(build_path_struct_field(path_var));
2863        }
2864
2865        Arc::new(Schema::new(fields))
2866    }
2867}
2868
2869impl DisplayAs for GraphVariableLengthTraverseExec {
2870    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2871        write!(
2872            f,
2873            "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2874            self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2875        )
2876    }
2877}
2878
2879impl ExecutionPlan for GraphVariableLengthTraverseExec {
2880    fn name(&self) -> &str {
2881        "GraphVariableLengthTraverseExec"
2882    }
2883
2884    fn as_any(&self) -> &dyn Any {
2885        self
2886    }
2887
2888    fn schema(&self) -> SchemaRef {
2889        self.schema.clone()
2890    }
2891
2892    fn properties(&self) -> &Arc<PlanProperties> {
2893        &self.properties
2894    }
2895
2896    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2897        vec![&self.input]
2898    }
2899
2900    fn with_new_children(
2901        self: Arc<Self>,
2902        children: Vec<Arc<dyn ExecutionPlan>>,
2903    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2904        if children.len() != 1 {
2905            return Err(datafusion::error::DataFusionError::Plan(
2906                "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2907            ));
2908        }
2909
2910        // Pass the existing NFA to avoid recompilation (important for QPP NFA)
2911        Ok(Arc::new(Self::new(
2912            children[0].clone(),
2913            self.source_column.clone(),
2914            self.edge_type_ids.clone(),
2915            self.direction,
2916            self.min_hops,
2917            self.max_hops,
2918            self.target_variable.clone(),
2919            self.step_variable.clone(),
2920            self.path_variable.clone(),
2921            self.target_properties.clone(),
2922            self.target_label_name.clone(),
2923            self.graph_ctx.clone(),
2924            self.is_optional,
2925            self.bound_target_column.clone(),
2926            self.edge_lance_filter.clone(),
2927            self.edge_property_conditions.clone(),
2928            self.used_edge_columns.clone(),
2929            self.path_mode.clone(),
2930            self.output_mode.clone(),
2931            Some((*self.nfa).clone()),
2932        )))
2933    }
2934
2935    fn execute(
2936        &self,
2937        partition: usize,
2938        context: Arc<TaskContext>,
2939    ) -> DFResult<SendableRecordBatchStream> {
2940        let input_stream = self.input.execute(partition, context)?;
2941
2942        let metrics = BaselineMetrics::new(&self.metrics, partition);
2943
2944        // Warm the adjacency CSRs, then (when the pattern carries inline
2945        // edge-property conditions) build the real `EidFilter` from a property
2946        // pre-scan so that flushed CSR/Lance edges are filtered too — not just
2947        // L0 in-memory edges. See `build_edge_property_filter`.
2948        let graph_ctx = self.graph_ctx.clone();
2949        let edge_type_ids = self.edge_type_ids.clone();
2950        let direction = self.direction;
2951        let edge_property_conditions = self.edge_property_conditions.clone();
2952        let warm_fut: Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>> =
2953            Box::pin(async move {
2954                graph_ctx
2955                    .ensure_adjacency_warmed(&edge_type_ids, direction)
2956                    .await
2957                    .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2958                build_edge_property_filter(
2959                    &graph_ctx,
2960                    &edge_type_ids,
2961                    direction,
2962                    &edge_property_conditions,
2963                )
2964                .await
2965            });
2966
2967        Ok(Box::pin(GraphVariableLengthTraverseStream {
2968            input: input_stream,
2969            exec: Arc::new(self.clone_for_stream()),
2970            schema: self.schema.clone(),
2971            state: VarLengthStreamState::Warming(warm_fut),
2972            edge_property_filter: EidFilter::AllAllowed,
2973            metrics,
2974        }))
2975    }
2976
2977    fn metrics(&self) -> Option<MetricsSet> {
2978        Some(self.metrics.clone_inner())
2979    }
2980}
2981
2982impl GraphVariableLengthTraverseExec {
2983    /// Clone fields needed for stream (avoids cloning the full struct).
2984    fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2985        GraphVariableLengthTraverseExecData {
2986            source_column: self.source_column.clone(),
2987            edge_type_ids: self.edge_type_ids.clone(),
2988            direction: self.direction,
2989            min_hops: self.min_hops,
2990            max_hops: self.max_hops,
2991            target_variable: self.target_variable.clone(),
2992            step_variable: self.step_variable.clone(),
2993            path_variable: self.path_variable.clone(),
2994            target_properties: self.target_properties.clone(),
2995            target_label_name: self.target_label_name.clone(),
2996            is_optional: self.is_optional,
2997            bound_target_column: self.bound_target_column.clone(),
2998            edge_lance_filter: self.edge_lance_filter.clone(),
2999            edge_property_conditions: self.edge_property_conditions.clone(),
3000            used_edge_columns: self.used_edge_columns.clone(),
3001            path_mode: self.path_mode.clone(),
3002            output_mode: self.output_mode.clone(),
3003            nfa: self.nfa.clone(),
3004            graph_ctx: self.graph_ctx.clone(),
3005        }
3006    }
3007}
3008
3009/// Data needed by the stream (without ExecutionPlan overhead).
3010#[expect(
3011    dead_code,
3012    reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
3013)]
3014struct GraphVariableLengthTraverseExecData {
3015    source_column: String,
3016    edge_type_ids: Vec<u32>,
3017    direction: Direction,
3018    min_hops: usize,
3019    max_hops: usize,
3020    target_variable: String,
3021    step_variable: Option<String>,
3022    path_variable: Option<String>,
3023    target_properties: Vec<String>,
3024    target_label_name: Option<String>,
3025    is_optional: bool,
3026    bound_target_column: Option<String>,
3027    #[expect(dead_code, reason = "Used in Phase 3 warming")]
3028    edge_lance_filter: Option<String>,
3029    /// Simple property equality conditions for per-edge L0 checking during BFS.
3030    edge_property_conditions: Vec<(String, UniValue)>,
3031    used_edge_columns: Vec<String>,
3032    path_mode: super::nfa::PathMode,
3033    output_mode: super::nfa::VlpOutputMode,
3034    nfa: Arc<PathNfa>,
3035    graph_ctx: Arc<GraphExecutionContext>,
3036}
3037
3038/// Safety cap for frontier size to prevent OOM on pathological graphs.
3039const MAX_FRONTIER_SIZE: usize = 500_000;
3040/// Safety cap for predecessor pool size.
3041const MAX_PRED_POOL_SIZE: usize = 2_000_000;
3042
3043impl GraphVariableLengthTraverseExecData {
3044    /// Check if a vertex passes the target label filter.
3045    fn check_target_label(&self, vid: Vid) -> bool {
3046        if let Some(ref label_name) = self.target_label_name {
3047            let query_ctx = self.graph_ctx.query_context();
3048            match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3049                Some(labels) => labels.contains(label_name),
3050                None => true, // not in L0, trust storage
3051            }
3052        } else {
3053            true
3054        }
3055    }
3056
3057    /// Check if a vertex satisfies an NFA state constraint (QPP intermediate node label).
3058    fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
3059        match constraint {
3060            super::nfa::VertexConstraint::Label(label_name) => {
3061                let query_ctx = self.graph_ctx.query_context();
3062                match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3063                    Some(labels) => labels.contains(label_name),
3064                    None => true, // not in L0, trust storage
3065                }
3066            }
3067        }
3068    }
3069
3070    /// Expand neighbors from a vertex through all NFA transitions from the given state.
3071    /// Returns (neighbor_vid, neighbor_eid, destination_nfa_state) triples.
3072    fn expand_neighbors(
3073        &self,
3074        vid: Vid,
3075        state: NfaStateId,
3076        eid_filter: &EidFilter,
3077        used_eids: &FxHashSet<u64>,
3078    ) -> Vec<(Vid, Eid, NfaStateId)> {
3079        let is_undirected = matches!(self.direction, Direction::Both);
3080        let mut results = Vec::new();
3081
3082        for transition in self.nfa.transitions_from(state) {
3083            let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
3084
3085            for &etype in &transition.edge_type_ids {
3086                for (neighbor, eid) in
3087                    self.graph_ctx
3088                        .get_neighbors(vid, etype, transition.direction)
3089                {
3090                    // Deduplicate edges for undirected patterns
3091                    if is_undirected && !seen_edges.insert(eid.as_u64()) {
3092                        continue;
3093                    }
3094
3095                    // Edge-property predicate gate. When the pattern carries
3096                    // inline `{prop: value}` conditions, `eid_filter` is the
3097                    // allow-set of edges (flushed *and* L0) that satisfy them,
3098                    // built during warming by `build_edge_property_filter`. When
3099                    // there are no conditions it is `AllAllowed`. This replaces
3100                    // the old bifurcated check that filtered only L0 edges and
3101                    // let every flushed CSR/Lance edge through (H4).
3102                    if !eid_filter.contains(eid) {
3103                        continue;
3104                    }
3105
3106                    // Check cross-pattern relationship uniqueness
3107                    if used_eids.contains(&eid.as_u64()) {
3108                        continue;
3109                    }
3110
3111                    // Check NFA state constraint on the destination state (QPP label filters)
3112                    if let Some(constraint) = self.nfa.state_constraint(transition.to)
3113                        && !self.check_state_constraint(neighbor, constraint)
3114                    {
3115                        continue;
3116                    }
3117
3118                    results.push((neighbor, eid, transition.to));
3119                }
3120            }
3121        }
3122
3123        results
3124    }
3125
3126    /// NFA-driven BFS with predecessor DAG for full path enumeration (Mode B).
3127    ///
3128    /// Returns BFS results in the same format as the old bfs() for compatibility
3129    /// with build_output_batch.
3130    fn bfs_with_dag(
3131        &self,
3132        source: Vid,
3133        eid_filter: &EidFilter,
3134        used_eids: &FxHashSet<u64>,
3135        vid_filter: &VidFilter,
3136    ) -> Vec<BfsResult> {
3137        let nfa = &self.nfa;
3138        let selector = PathSelector::All;
3139        let mut dag = PredecessorDag::new(selector);
3140        let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3141
3142        // Handle zero-length paths (min_hops == 0)
3143        if nfa.is_accepting(nfa.start_state())
3144            && self.check_target_label(source)
3145            && vid_filter.contains(source)
3146        {
3147            accepting.push((source, nfa.start_state(), 0));
3148        }
3149
3150        // Per-depth frontier BFS
3151        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3152        let mut depth: u32 = 0;
3153
3154        while !frontier.is_empty() && depth < self.max_hops as u32 {
3155            depth += 1;
3156            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3157            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3158
3159            for &(vid, state) in &frontier {
3160                for (neighbor, eid, dst_state) in
3161                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3162                {
3163                    // Record in predecessor DAG
3164                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3165
3166                    // Add to next frontier (deduplicated per depth)
3167                    if seen_at_depth.insert((neighbor, dst_state)) {
3168                        next_frontier.push((neighbor, dst_state));
3169
3170                        // Check if accepting
3171                        if nfa.is_accepting(dst_state)
3172                            && self.check_target_label(neighbor)
3173                            && vid_filter.contains(neighbor)
3174                        {
3175                            accepting.push((neighbor, dst_state, depth));
3176                        }
3177                    }
3178                }
3179            }
3180
3181            // Safety cap
3182            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3183                break;
3184            }
3185
3186            frontier = next_frontier;
3187        }
3188
3189        // Enumerate paths from DAG to produce BfsResult tuples
3190        let mut results: Vec<BfsResult> = Vec::new();
3191        for &(target, state, depth) in &accepting {
3192            dag.enumerate_paths(
3193                source,
3194                target,
3195                state,
3196                depth,
3197                depth,
3198                &self.path_mode,
3199                &mut |nodes, edges| {
3200                    results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3201                    std::ops::ControlFlow::Continue(())
3202                },
3203            );
3204        }
3205
3206        results
3207    }
3208
3209    /// NFA-driven BFS returning only endpoints and depths (Mode A).
3210    ///
3211    /// More efficient when no path/step variable is bound — skips full path enumeration.
3212    /// Uses lightweight trail verification via has_trail_valid_path().
3213    fn bfs_endpoints_only(
3214        &self,
3215        source: Vid,
3216        eid_filter: &EidFilter,
3217        used_eids: &FxHashSet<u64>,
3218        vid_filter: &VidFilter,
3219    ) -> Vec<(Vid, u32)> {
3220        let nfa = &self.nfa;
3221        let selector = PathSelector::Any; // Only need existence, not all paths
3222        let mut dag = PredecessorDag::new(selector);
3223        let mut results: Vec<(Vid, u32)> = Vec::new();
3224
3225        // Handle zero-length paths
3226        if nfa.is_accepting(nfa.start_state())
3227            && self.check_target_label(source)
3228            && vid_filter.contains(source)
3229        {
3230            results.push((source, 0));
3231        }
3232
3233        // Per-depth frontier BFS
3234        let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3235        let mut depth: u32 = 0;
3236
3237        while !frontier.is_empty() && depth < self.max_hops as u32 {
3238            depth += 1;
3239            let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3240            let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3241
3242            for &(vid, state) in &frontier {
3243                for (neighbor, eid, dst_state) in
3244                    self.expand_neighbors(vid, state, eid_filter, used_eids)
3245                {
3246                    dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3247
3248                    if seen_at_depth.insert((neighbor, dst_state)) {
3249                        next_frontier.push((neighbor, dst_state));
3250
3251                        // Check if accepting with trail verification
3252                        if nfa.is_accepting(dst_state)
3253                            && self.check_target_label(neighbor)
3254                            && vid_filter.contains(neighbor)
3255                            && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3256                        {
3257                            results.push((neighbor, depth));
3258                        }
3259                    }
3260                }
3261            }
3262
3263            if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3264                break;
3265            }
3266
3267            frontier = next_frontier;
3268        }
3269
3270        results
3271    }
3272}
3273
3274/// State machine for variable-length traverse stream.
3275enum VarLengthStreamState {
3276    /// Warming adjacency CSRs before first batch. Resolves to the prebuilt
3277    /// edge-property `EidFilter` (the allow-set of edges — flushed *and* L0 —
3278    /// that satisfy the inline `{prop: value}` conditions), or
3279    /// `EidFilter::AllAllowed` when there are no conditions.
3280    Warming(Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>>),
3281    /// Processing input batches.
3282    Reading,
3283    /// Materializing target vertex properties asynchronously.
3284    Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3285    /// Stream is done.
3286    Done,
3287}
3288
3289/// Stream for variable-length traversal.
3290struct GraphVariableLengthTraverseStream {
3291    input: SendableRecordBatchStream,
3292    exec: Arc<GraphVariableLengthTraverseExecData>,
3293    schema: SchemaRef,
3294    state: VarLengthStreamState,
3295    /// Edge-property allow-set built during warming (see [`VarLengthStreamState::Warming`]).
3296    /// `AllAllowed` until warming completes (or when there are no edge-property
3297    /// conditions).
3298    edge_property_filter: EidFilter,
3299    metrics: BaselineMetrics,
3300}
3301
3302impl Stream for GraphVariableLengthTraverseStream {
3303    type Item = DFResult<RecordBatch>;
3304
3305    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3306        let metrics = self.metrics.clone();
3307        let _timer = metrics.elapsed_compute().timer();
3308        loop {
3309            let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3310
3311            match state {
3312                VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3313                    Poll::Ready(Ok(filter)) => {
3314                        self.edge_property_filter = filter;
3315                        self.state = VarLengthStreamState::Reading;
3316                        // Continue loop to start reading
3317                    }
3318                    Poll::Ready(Err(e)) => {
3319                        self.state = VarLengthStreamState::Done;
3320                        return Poll::Ready(Some(Err(e)));
3321                    }
3322                    Poll::Pending => {
3323                        self.state = VarLengthStreamState::Warming(fut);
3324                        return Poll::Pending;
3325                    }
3326                },
3327                VarLengthStreamState::Reading => {
3328                    // Check timeout
3329                    if let Err(e) = self.exec.graph_ctx.check_timeout() {
3330                        return Poll::Ready(Some(Err(
3331                            datafusion::error::DataFusionError::Execution(e.to_string()),
3332                        )));
3333                    }
3334
3335                    match self.input.poll_next_unpin(cx) {
3336                        Poll::Ready(Some(Ok(batch))) => {
3337                            // Build base batch synchronously (BFS + expand).
3338                            // `edge_property_filter` was built during warming and
3339                            // gates flushed edges by their properties; VidFilter
3340                            // is still unconstrained (TODO: source pre-scan).
3341                            let vid_filter = VidFilter::AllAllowed;
3342                            let base_result = self.process_batch_base(
3343                                batch,
3344                                &self.edge_property_filter,
3345                                &vid_filter,
3346                            );
3347                            let base_batch = match base_result {
3348                                Ok(b) => b,
3349                                Err(e) => {
3350                                    self.state = VarLengthStreamState::Reading;
3351                                    return Poll::Ready(Some(Err(e)));
3352                                }
3353                            };
3354
3355                            // If no properties need async hydration, return directly
3356                            if self.exec.target_properties.is_empty() {
3357                                self.state = VarLengthStreamState::Reading;
3358                                return Poll::Ready(Some(Ok(base_batch)));
3359                            }
3360
3361                            // Properties needed — create async future for hydration
3362                            let schema = self.schema.clone();
3363                            let target_variable = self.exec.target_variable.clone();
3364                            let target_properties = self.exec.target_properties.clone();
3365                            let target_label_name = self.exec.target_label_name.clone();
3366                            let graph_ctx = self.exec.graph_ctx.clone();
3367
3368                            let fut = hydrate_vlp_target_properties(
3369                                base_batch,
3370                                schema,
3371                                target_variable,
3372                                target_properties,
3373                                target_label_name,
3374                                graph_ctx,
3375                            );
3376
3377                            self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3378                            // Continue loop to poll the future
3379                        }
3380                        Poll::Ready(Some(Err(e))) => {
3381                            self.state = VarLengthStreamState::Done;
3382                            return Poll::Ready(Some(Err(e)));
3383                        }
3384                        Poll::Ready(None) => {
3385                            self.state = VarLengthStreamState::Done;
3386                            return Poll::Ready(None);
3387                        }
3388                        Poll::Pending => {
3389                            self.state = VarLengthStreamState::Reading;
3390                            return Poll::Pending;
3391                        }
3392                    }
3393                }
3394                VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3395                    Poll::Ready(Ok(batch)) => {
3396                        self.state = VarLengthStreamState::Reading;
3397                        self.metrics.record_output(batch.num_rows());
3398                        return Poll::Ready(Some(Ok(batch)));
3399                    }
3400                    Poll::Ready(Err(e)) => {
3401                        self.state = VarLengthStreamState::Done;
3402                        return Poll::Ready(Some(Err(e)));
3403                    }
3404                    Poll::Pending => {
3405                        self.state = VarLengthStreamState::Materializing(fut);
3406                        return Poll::Pending;
3407                    }
3408                },
3409                VarLengthStreamState::Done => {
3410                    return Poll::Ready(None);
3411                }
3412            }
3413        }
3414    }
3415}
3416
3417impl GraphVariableLengthTraverseStream {
3418    fn process_batch_base(
3419        &self,
3420        batch: RecordBatch,
3421        eid_filter: &EidFilter,
3422        vid_filter: &VidFilter,
3423    ) -> DFResult<RecordBatch> {
3424        let source_col = batch
3425            .column_by_name(&self.exec.source_column)
3426            .ok_or_else(|| {
3427                datafusion::error::DataFusionError::Execution(format!(
3428                    "Source column '{}' not found",
3429                    self.exec.source_column
3430                ))
3431            })?;
3432
3433        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3434        let source_vids: &UInt64Array = &source_vid_cow;
3435
3436        // Read bound target VIDs if column exists
3437        let bound_target_cow = self
3438            .exec
3439            .bound_target_column
3440            .as_ref()
3441            .and_then(|col| batch.column_by_name(col))
3442            .map(|c| column_as_vid_array(c.as_ref()))
3443            .transpose()?;
3444        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3445
3446        // Extract used edge columns for cross-pattern relationship uniqueness
3447        let used_edge_arrays: Vec<&UInt64Array> = self
3448            .exec
3449            .used_edge_columns
3450            .iter()
3451            .filter_map(|col| {
3452                batch
3453                    .column_by_name(col)?
3454                    .as_any()
3455                    .downcast_ref::<UInt64Array>()
3456            })
3457            .collect();
3458
3459        // Collect all BFS results
3460        let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3461
3462        for (row_idx, source_vid) in source_vids.iter().enumerate() {
3463            let mut emitted_for_row = false;
3464
3465            if let Some(src) = source_vid {
3466                let vid = Vid::from(src);
3467
3468                // Collect used edge IDs from previous hops for this row
3469                let used_eids: FxHashSet<u64> = used_edge_arrays
3470                    .iter()
3471                    .filter_map(|arr| {
3472                        if arr.is_null(row_idx) {
3473                            None
3474                        } else {
3475                            Some(arr.value(row_idx))
3476                        }
3477                    })
3478                    .collect();
3479
3480                // Dispatch to appropriate BFS mode based on output_mode
3481                match &self.exec.output_mode {
3482                    VlpOutputMode::EndpointsOnly => {
3483                        let endpoints = self
3484                            .exec
3485                            .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3486                        for (target, depth) in endpoints {
3487                            // Filter by bound target VID
3488                            if let Some(targets) = expected_targets {
3489                                if targets.is_null(row_idx) {
3490                                    continue;
3491                                }
3492                                if target.as_u64() != targets.value(row_idx) {
3493                                    continue;
3494                                }
3495                            }
3496                            expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3497                            emitted_for_row = true;
3498                        }
3499                    }
3500                    _ => {
3501                        // FullPath, StepVariable, CountOnly, etc.
3502                        let bfs_results = self
3503                            .exec
3504                            .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3505                        for (target, hop_count, node_path, edge_path) in bfs_results {
3506                            // Filter by bound target VID
3507                            if let Some(targets) = expected_targets {
3508                                if targets.is_null(row_idx) {
3509                                    continue;
3510                                }
3511                                if target.as_u64() != targets.value(row_idx) {
3512                                    continue;
3513                                }
3514                            }
3515                            expansions.push((row_idx, target, hop_count, node_path, edge_path));
3516                            emitted_for_row = true;
3517                        }
3518                    }
3519                }
3520            }
3521
3522            if self.exec.is_optional && !emitted_for_row {
3523                // Preserve the source row with NULL optional bindings.
3524                // We use empty node/edge paths to mark unmatched rows.
3525                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3526            }
3527        }
3528
3529        self.build_output_batch(&batch, &expansions)
3530    }
3531
3532    fn build_output_batch(
3533        &self,
3534        input: &RecordBatch,
3535        expansions: &[VarLengthExpansion],
3536    ) -> DFResult<RecordBatch> {
3537        if expansions.is_empty() {
3538            return Ok(RecordBatch::new_empty(self.schema.clone()));
3539        }
3540
3541        let num_rows = expansions.len();
3542
3543        // Build index array
3544        let indices: Vec<u64> = expansions
3545            .iter()
3546            .map(|(idx, _, _, _, _)| *idx as u64)
3547            .collect();
3548        let indices_array = UInt64Array::from(indices);
3549
3550        // Expand input columns
3551        let mut columns: Vec<ArrayRef> = Vec::new();
3552        for col in input.columns() {
3553            let expanded = take(col.as_ref(), &indices_array, None)?;
3554            columns.push(expanded);
3555        }
3556
3557        // Collect target VIDs and unmatched markers for use in multiple places.
3558        // Unmatched OPTIONAL rows use the sentinel VID (u64::MAX) — not empty paths,
3559        // because EndpointsOnly mode legitimately uses empty node/edge path vectors.
3560        let unmatched_rows: Vec<bool> = expansions
3561            .iter()
3562            .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3563            .collect();
3564        let target_vids: Vec<Option<u64>> = expansions
3565            .iter()
3566            .zip(unmatched_rows.iter())
3567            .map(
3568                |((_, vid, _, _, _), unmatched)| {
3569                    if *unmatched { None } else { Some(vid.as_u64()) }
3570                },
3571            )
3572            .collect();
3573
3574        // Add target VID column (only if not already in input)
3575        let target_vid_name = format!("{}._vid", self.exec.target_variable);
3576        if input.schema().column_with_name(&target_vid_name).is_none() {
3577            columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3578        }
3579
3580        // Add target ._labels column (only if not already in input)
3581        let target_labels_name = format!("{}._labels", self.exec.target_variable);
3582        if input
3583            .schema()
3584            .column_with_name(&target_labels_name)
3585            .is_none()
3586        {
3587            use arrow_array::builder::{ListBuilder, StringBuilder};
3588            let query_ctx = self.exec.graph_ctx.query_context();
3589            let mut labels_builder = ListBuilder::new(StringBuilder::new());
3590            for target_vid in &target_vids {
3591                let Some(vid_u64) = target_vid else {
3592                    labels_builder.append(false);
3593                    continue;
3594                };
3595                let vid = Vid::from(*vid_u64);
3596                let row_labels: Vec<String> =
3597                    match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3598                        Some(labels) => {
3599                            // Vertex is in L0 — use actual labels only
3600                            labels
3601                        }
3602                        None => {
3603                            // Vertex not in L0 — trust schema label (storage already filtered)
3604                            if let Some(ref label_name) = self.exec.target_label_name {
3605                                vec![label_name.clone()]
3606                            } else {
3607                                vec![]
3608                            }
3609                        }
3610                    };
3611                let values = labels_builder.values();
3612                for lbl in &row_labels {
3613                    values.append_value(lbl);
3614                }
3615                labels_builder.append(true);
3616            }
3617            columns.push(Arc::new(labels_builder.finish()));
3618        }
3619
3620        // Add null placeholder columns for target properties (hydrated async if needed, skip if already in input)
3621        for prop_name in &self.exec.target_properties {
3622            let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3623            if input.schema().column_with_name(&full_prop_name).is_none() {
3624                let col_idx = columns.len();
3625                if col_idx < self.schema.fields().len() {
3626                    let field = self.schema.field(col_idx);
3627                    columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3628                }
3629            }
3630        }
3631
3632        // Add hop count column
3633        let hop_counts: Vec<u64> = expansions
3634            .iter()
3635            .map(|(_, _, hops, _, _)| *hops as u64)
3636            .collect();
3637        columns.push(Arc::new(UInt64Array::from(hop_counts)));
3638
3639        // Add step variable (edge list) column if bound
3640        if self.exec.step_variable.is_some() {
3641            let mut edges_builder = new_edge_list_builder();
3642            let query_ctx = self.exec.graph_ctx.query_context();
3643
3644            for (_, _, _, node_path, edge_path) in expansions {
3645                if node_path.is_empty() && edge_path.is_empty() {
3646                    // Null row for OPTIONAL MATCH unmatched
3647                    edges_builder.append_null();
3648                } else if edge_path.is_empty() {
3649                    // Zero-hop match: empty list
3650                    edges_builder.append(true);
3651                } else {
3652                    for (i, eid) in edge_path.iter().enumerate() {
3653                        let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3654                            .unwrap_or_else(|| "UNKNOWN".to_string());
3655                        // Report the relationship's STORED direction, not the
3656                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
3657                        // Resolves flushed (L1-resident) edges too, where the L0
3658                        // chain no longer holds the stored endpoints.
3659                        let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3660                            *eid,
3661                            node_path[i],
3662                            node_path[i + 1],
3663                            &self.exec.edge_type_ids,
3664                        );
3665                        append_edge_to_struct(
3666                            edges_builder.values(),
3667                            *eid,
3668                            &type_name,
3669                            src,
3670                            dst,
3671                            &query_ctx,
3672                        );
3673                    }
3674                    edges_builder.append(true);
3675                }
3676            }
3677
3678            columns.push(Arc::new(edges_builder.finish()));
3679        }
3680
3681        // Add path variable column if bound.
3682        // For named paths, we output a Path struct with nodes and relationships arrays.
3683        // If a path column already exists in input (from a prior BindFixedPath), extend it
3684        // rather than building from scratch.
3685        if let Some(path_var_name) = &self.exec.path_variable {
3686            let existing_path_col_idx = input
3687                .schema()
3688                .column_with_name(path_var_name)
3689                .map(|(idx, _)| idx);
3690            // Clone the Arc so we can read existing path without borrowing `columns`
3691            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3692            let existing_path = existing_path_arc
3693                .as_ref()
3694                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3695
3696            let mut nodes_builder = new_node_list_builder();
3697            let mut rels_builder = new_edge_list_builder();
3698            let query_ctx = self.exec.graph_ctx.query_context();
3699            let mut path_validity = Vec::with_capacity(expansions.len());
3700
3701            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3702                if node_path.is_empty() && edge_path.is_empty() {
3703                    nodes_builder.append(false);
3704                    rels_builder.append(false);
3705                    path_validity.push(false);
3706                    continue;
3707                }
3708
3709                // Prepend existing path prefix if extending
3710                let skip_first_vlp_node = if let Some(existing) = existing_path {
3711                    if !existing.is_null(row_out_idx) {
3712                        prepend_existing_path(
3713                            existing,
3714                            row_out_idx,
3715                            &mut nodes_builder,
3716                            &mut rels_builder,
3717                            &query_ctx,
3718                        );
3719                        true
3720                    } else {
3721                        false
3722                    }
3723                } else {
3724                    false
3725                };
3726
3727                // Append VLP nodes (skip first if extending — it's the junction point)
3728                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3729                for vid in &node_path[start_idx..] {
3730                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3731                }
3732                nodes_builder.append(true);
3733
3734                for (i, eid) in edge_path.iter().enumerate() {
3735                    let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3736                        .unwrap_or_else(|| "UNKNOWN".to_string());
3737                    // Report the relationship's STORED direction, not the traversal
3738                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
3739                    // (L1-resident) edges too, where the L0 chain no longer holds
3740                    // the stored endpoints.
3741                    let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3742                        *eid,
3743                        node_path[i],
3744                        node_path[i + 1],
3745                        &self.exec.edge_type_ids,
3746                    );
3747                    append_edge_to_struct(
3748                        rels_builder.values(),
3749                        *eid,
3750                        &type_name,
3751                        src,
3752                        dst,
3753                        &query_ctx,
3754                    );
3755                }
3756                rels_builder.append(true);
3757                path_validity.push(true);
3758            }
3759
3760            // Finish builders and get ListArrays
3761            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3762            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3763
3764            // Build the path struct fields
3765            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3766            let rels_field = Arc::new(Field::new(
3767                "relationships",
3768                rels_array.data_type().clone(),
3769                true,
3770            ));
3771
3772            // Create the path struct array
3773            let path_struct = arrow_array::StructArray::try_new(
3774                vec![nodes_field, rels_field].into(),
3775                vec![nodes_array, rels_array],
3776                Some(arrow::buffer::NullBuffer::from(path_validity)),
3777            )
3778            .map_err(arrow_err)?;
3779
3780            if let Some(idx) = existing_path_col_idx {
3781                columns[idx] = Arc::new(path_struct);
3782            } else {
3783                columns.push(Arc::new(path_struct));
3784            }
3785        }
3786
3787        self.metrics.record_output(num_rows);
3788
3789        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3790    }
3791}
3792
3793impl RecordBatchStream for GraphVariableLengthTraverseStream {
3794    fn schema(&self) -> SchemaRef {
3795        self.schema.clone()
3796    }
3797}
3798
3799/// Hydrate target vertex properties into a VLP batch.
3800///
3801/// The base batch already has null placeholder columns for target properties.
3802/// This function replaces them with actual property values fetched from storage.
3803async fn hydrate_vlp_target_properties(
3804    base_batch: RecordBatch,
3805    schema: SchemaRef,
3806    target_variable: String,
3807    target_properties: Vec<String>,
3808    target_label_name: Option<String>,
3809    graph_ctx: Arc<GraphExecutionContext>,
3810) -> DFResult<RecordBatch> {
3811    if base_batch.num_rows() == 0 || target_properties.is_empty() {
3812        return Ok(base_batch);
3813    }
3814
3815    // Find the target VID column by exact name.
3816    // Schema layout: [input cols..., target._vid, target.prop1..., _hop_count, path?]
3817    //
3818    // IMPORTANT: When the target variable is already bound in the input (e.g., two MATCH
3819    // clauses referencing the same variable), there may be duplicate column names. We need
3820    // the LAST occurrence of target._vid, which is the one added by the VLP.
3821    let target_vid_col_name = format!("{}._vid", target_variable);
3822    let vid_col_idx = schema
3823        .fields()
3824        .iter()
3825        .enumerate()
3826        .rev()
3827        .find(|(_, f)| f.name() == &target_vid_col_name)
3828        .map(|(i, _)| i);
3829
3830    let Some(vid_col_idx) = vid_col_idx else {
3831        return Ok(base_batch);
3832    };
3833
3834    let vid_col = base_batch.column(vid_col_idx);
3835    let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3836    let target_vid_array: &UInt64Array = &target_vid_cow;
3837
3838    let target_vids: Vec<Vid> = target_vid_array
3839        .iter()
3840        // Preserve null rows by mapping them to a sentinel VID that never resolves
3841        // to stored properties. The output property columns remain NULL for these rows.
3842        .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3843        .collect();
3844
3845    // Fetch properties from storage
3846    let mut property_columns: Vec<ArrayRef> = Vec::new();
3847
3848    if let Some(ref label_name) = target_label_name {
3849        let property_manager = graph_ctx.property_manager();
3850        let query_ctx = graph_ctx.query_context();
3851
3852        let props_map = property_manager
3853            .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3854            .await
3855            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3856
3857        let uni_schema = graph_ctx.storage().schema_manager().schema();
3858        let label_props = uni_schema.properties.get(label_name.as_str());
3859
3860        for prop_name in &target_properties {
3861            let data_type = resolve_property_type(prop_name, label_props);
3862            let column =
3863                build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3864            property_columns.push(column);
3865        }
3866    } else {
3867        // No label name — use label-agnostic property lookup.
3868        // This scans all label datasets, slower but correct for label-less traversals.
3869        let non_internal_props: Vec<&str> = target_properties
3870            .iter()
3871            .filter(|p| *p != "_all_props")
3872            .map(|s| s.as_str())
3873            .collect();
3874        let property_manager = graph_ctx.property_manager();
3875        let query_ctx = graph_ctx.query_context();
3876
3877        let props_map = if !non_internal_props.is_empty() {
3878            property_manager
3879                .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3880                .await
3881                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3882        } else {
3883            std::collections::HashMap::new()
3884        };
3885
3886        for prop_name in &target_properties {
3887            if prop_name == "_all_props" {
3888                // Build CypherValue blob from all vertex properties (L0 + storage),
3889                // staying in `Value` space so typed values (temporals) are preserved.
3890                use arrow_array::builder::LargeBinaryBuilder;
3891
3892                let mut builder = LargeBinaryBuilder::new();
3893                let l0_ctx = graph_ctx.l0_context();
3894                for vid in &target_vids {
3895                    let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3896                    // Collect from storage-hydrated props
3897                    if let Some(vid_props) = props_map.get(vid) {
3898                        for (k, v) in vid_props.iter() {
3899                            merged_props.insert(k.clone(), v.clone());
3900                        }
3901                    }
3902                    // Overlay L0 properties
3903                    for l0 in l0_ctx.iter_l0_buffers() {
3904                        let guard = l0.read();
3905                        if let Some(l0_props) = guard.vertex_properties.get(vid) {
3906                            for (k, v) in l0_props.iter() {
3907                                merged_props.insert(k.clone(), v.clone());
3908                            }
3909                        }
3910                    }
3911                    if merged_props.is_empty() {
3912                        builder.append_null();
3913                    } else {
3914                        builder.append_value(uni_common::cypher_value_codec::encode(
3915                            &uni_common::Value::Map(merged_props),
3916                        ));
3917                    }
3918                }
3919                property_columns.push(Arc::new(builder.finish()));
3920            } else {
3921                let column = build_property_column_static(
3922                    &target_vids,
3923                    &props_map,
3924                    prop_name,
3925                    &arrow::datatypes::DataType::LargeBinary,
3926                )?;
3927                property_columns.push(column);
3928            }
3929        }
3930    }
3931
3932    // Rebuild batch replacing the null placeholder property columns with hydrated ones.
3933    // Find each property column by name — works regardless of column ordering
3934    // (schema-aware puts props before _hop_count; schemaless puts them after).
3935    // Use col_idx > vid_col_idx to only replace this VLP's own property columns,
3936    // not pre-existing input columns with the same name (duplicate variable binding).
3937    let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3938    let mut prop_idx = 0;
3939    for (col_idx, field) in schema.fields().iter().enumerate() {
3940        let is_target_prop = col_idx > vid_col_idx
3941            && target_properties
3942                .iter()
3943                .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3944        if is_target_prop && prop_idx < property_columns.len() {
3945            new_columns.push(property_columns[prop_idx].clone());
3946            prop_idx += 1;
3947        } else {
3948            new_columns.push(base_batch.column(col_idx).clone());
3949        }
3950    }
3951
3952    RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3953}
3954
3955// ============================================================================
3956// GraphVariableLengthTraverseMainExec - VLP for schemaless edge types
3957// ============================================================================
3958
3959/// Execution plan for variable-length path traversal on schemaless edge types.
3960///
3961/// This is similar to `GraphVariableLengthTraverseExec` but works with edge types
3962/// that don't have schema-defined IDs. It queries the main edges table by type name.
3963/// Supports OR relationship types like `[:KNOWS|HATES]` via multiple type_names.
3964pub struct GraphVariableLengthTraverseMainExec {
3965    /// Input execution plan.
3966    input: Arc<dyn ExecutionPlan>,
3967
3968    /// Column name containing source VIDs.
3969    source_column: String,
3970
3971    /// Edge type names (not IDs, since schemaless types may not have IDs).
3972    type_names: Vec<String>,
3973
3974    /// Traversal direction.
3975    direction: Direction,
3976
3977    /// Minimum number of hops.
3978    min_hops: usize,
3979
3980    /// Maximum number of hops.
3981    max_hops: usize,
3982
3983    /// Variable name for target vertex columns.
3984    target_variable: String,
3985
3986    /// Variable name for relationship list (r in `[r*]`) - holds `List<Edge>`.
3987    step_variable: Option<String>,
3988
3989    /// Variable name for named path (p in `p = ...`) - holds `Path`.
3990    path_variable: Option<String>,
3991
3992    /// Target vertex properties to materialize.
3993    target_properties: Vec<String>,
3994
3995    /// Whether this is an optional match (LEFT JOIN semantics).
3996    is_optional: bool,
3997
3998    /// Column name of an already-bound target VID (for patterns where target is in scope).
3999    bound_target_column: Option<String>,
4000
4001    /// Lance SQL filter for edge property predicates (VLP bitmap preselection).
4002    edge_lance_filter: Option<String>,
4003
4004    /// Edge property conditions to check during BFS (e.g., `{year: 1988}`).
4005    /// Each entry is (property_name, expected_value). All must match for an edge to be traversed.
4006    edge_property_conditions: Vec<(String, UniValue)>,
4007
4008    /// Edge ID columns from previous hops for cross-pattern relationship uniqueness.
4009    used_edge_columns: Vec<String>,
4010
4011    /// Path semantics mode (Trail = no repeated edges, default for OpenCypher).
4012    path_mode: super::nfa::PathMode,
4013
4014    /// Output mode determining BFS strategy.
4015    output_mode: super::nfa::VlpOutputMode,
4016
4017    /// Graph execution context.
4018    graph_ctx: Arc<GraphExecutionContext>,
4019
4020    /// Output schema.
4021    schema: SchemaRef,
4022
4023    /// Cached plan properties.
4024    properties: Arc<PlanProperties>,
4025
4026    /// Execution metrics.
4027    metrics: ExecutionPlanMetricsSet,
4028}
4029
4030impl fmt::Debug for GraphVariableLengthTraverseMainExec {
4031    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4032        f.debug_struct("GraphVariableLengthTraverseMainExec")
4033            .field("source_column", &self.source_column)
4034            .field("type_names", &self.type_names)
4035            .field("direction", &self.direction)
4036            .field("min_hops", &self.min_hops)
4037            .field("max_hops", &self.max_hops)
4038            .field("target_variable", &self.target_variable)
4039            .finish()
4040    }
4041}
4042
4043impl GraphVariableLengthTraverseMainExec {
4044    /// Create a new variable-length traversal plan for schemaless edges.
4045    #[expect(clippy::too_many_arguments)]
4046    pub fn new(
4047        input: Arc<dyn ExecutionPlan>,
4048        source_column: impl Into<String>,
4049        type_names: Vec<String>,
4050        direction: Direction,
4051        min_hops: usize,
4052        max_hops: usize,
4053        target_variable: impl Into<String>,
4054        step_variable: Option<String>,
4055        path_variable: Option<String>,
4056        target_properties: Vec<String>,
4057        graph_ctx: Arc<GraphExecutionContext>,
4058        is_optional: bool,
4059        bound_target_column: Option<String>,
4060        edge_lance_filter: Option<String>,
4061        edge_property_conditions: Vec<(String, UniValue)>,
4062        used_edge_columns: Vec<String>,
4063        path_mode: super::nfa::PathMode,
4064        output_mode: super::nfa::VlpOutputMode,
4065    ) -> Self {
4066        let source_column = source_column.into();
4067        let target_variable = target_variable.into();
4068
4069        // Build output schema
4070        let schema = Self::build_schema(
4071            input.schema(),
4072            &target_variable,
4073            step_variable.as_deref(),
4074            path_variable.as_deref(),
4075            &target_properties,
4076        );
4077        let properties = compute_plan_properties(schema.clone());
4078
4079        Self {
4080            input,
4081            source_column,
4082            type_names,
4083            direction,
4084            min_hops,
4085            max_hops,
4086            target_variable,
4087            step_variable,
4088            path_variable,
4089            target_properties,
4090            is_optional,
4091            bound_target_column,
4092            edge_lance_filter,
4093            edge_property_conditions,
4094            used_edge_columns,
4095            path_mode,
4096            output_mode,
4097            graph_ctx,
4098            schema,
4099            properties,
4100            metrics: ExecutionPlanMetricsSet::new(),
4101        }
4102    }
4103
4104    /// Build output schema.
4105    fn build_schema(
4106        input_schema: SchemaRef,
4107        target_variable: &str,
4108        step_variable: Option<&str>,
4109        path_variable: Option<&str>,
4110        target_properties: &[String],
4111    ) -> SchemaRef {
4112        let mut fields: Vec<Field> = input_schema
4113            .fields()
4114            .iter()
4115            .map(|f| f.as_ref().clone())
4116            .collect();
4117
4118        // Add target VID column (only if not already in input)
4119        let target_vid_name = format!("{}._vid", target_variable);
4120        if input_schema.column_with_name(&target_vid_name).is_none() {
4121            fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4122        }
4123
4124        // Add target ._labels column (only if not already in input)
4125        let target_labels_name = format!("{}._labels", target_variable);
4126        if input_schema.column_with_name(&target_labels_name).is_none() {
4127            fields.push(Field::new(target_labels_name, labels_data_type(), true));
4128        }
4129
4130        // Add hop count
4131        fields.push(Field::new("_hop_count", DataType::UInt64, false));
4132
4133        // Add step variable column (list of edge structs) if bound
4134        // This is the relationship variable like `r` in `[r*1..3]`
4135        if let Some(step_var) = step_variable {
4136            fields.push(build_edge_list_field(step_var));
4137        }
4138
4139        // Add path struct if bound (only if not already in input from prior BindFixedPath)
4140        if let Some(path_var) = path_variable
4141            && input_schema.column_with_name(path_var).is_none()
4142        {
4143            fields.push(build_path_struct_field(path_var));
4144        }
4145
4146        // Add target property columns (as LargeBinary for lazy hydration via PropertyManager)
4147        // Skip properties that are already in the input schema
4148        for prop in target_properties {
4149            let prop_name = format!("{}.{}", target_variable, prop);
4150            if input_schema.column_with_name(&prop_name).is_none() {
4151                fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4152            }
4153        }
4154
4155        Arc::new(Schema::new(fields))
4156    }
4157}
4158
4159impl DisplayAs for GraphVariableLengthTraverseMainExec {
4160    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4161        write!(
4162            f,
4163            "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4164            self.source_column, self.type_names, self.min_hops, self.max_hops
4165        )
4166    }
4167}
4168
4169impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4170    fn name(&self) -> &str {
4171        "GraphVariableLengthTraverseMainExec"
4172    }
4173
4174    fn as_any(&self) -> &dyn Any {
4175        self
4176    }
4177
4178    fn schema(&self) -> SchemaRef {
4179        self.schema.clone()
4180    }
4181
4182    fn properties(&self) -> &Arc<PlanProperties> {
4183        &self.properties
4184    }
4185
4186    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4187        vec![&self.input]
4188    }
4189
4190    fn with_new_children(
4191        self: Arc<Self>,
4192        children: Vec<Arc<dyn ExecutionPlan>>,
4193    ) -> DFResult<Arc<dyn ExecutionPlan>> {
4194        if children.len() != 1 {
4195            return Err(datafusion::error::DataFusionError::Plan(
4196                "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4197            ));
4198        }
4199
4200        Ok(Arc::new(Self::new(
4201            children[0].clone(),
4202            self.source_column.clone(),
4203            self.type_names.clone(),
4204            self.direction,
4205            self.min_hops,
4206            self.max_hops,
4207            self.target_variable.clone(),
4208            self.step_variable.clone(),
4209            self.path_variable.clone(),
4210            self.target_properties.clone(),
4211            self.graph_ctx.clone(),
4212            self.is_optional,
4213            self.bound_target_column.clone(),
4214            self.edge_lance_filter.clone(),
4215            self.edge_property_conditions.clone(),
4216            self.used_edge_columns.clone(),
4217            self.path_mode.clone(),
4218            self.output_mode.clone(),
4219        )))
4220    }
4221
4222    fn execute(
4223        &self,
4224        partition: usize,
4225        context: Arc<TaskContext>,
4226    ) -> DFResult<SendableRecordBatchStream> {
4227        let input_stream = self.input.execute(partition, context)?;
4228        let metrics = BaselineMetrics::new(&self.metrics, partition);
4229
4230        // Build adjacency map from main edges table (async)
4231        let graph_ctx = self.graph_ctx.clone();
4232        let type_names = self.type_names.clone();
4233        let direction = self.direction;
4234        let load_fut = async move {
4235            // VLP frontier is unknown ahead of time: no source pushdown (whole-type).
4236            build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4237        };
4238
4239        Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4240            input: input_stream,
4241            source_column: self.source_column.clone(),
4242            type_names: self.type_names.clone(),
4243            direction: self.direction,
4244            min_hops: self.min_hops,
4245            max_hops: self.max_hops,
4246            target_variable: self.target_variable.clone(),
4247            step_variable: self.step_variable.clone(),
4248            path_variable: self.path_variable.clone(),
4249            target_properties: self.target_properties.clone(),
4250            graph_ctx: self.graph_ctx.clone(),
4251            is_optional: self.is_optional,
4252            bound_target_column: self.bound_target_column.clone(),
4253            edge_lance_filter: self.edge_lance_filter.clone(),
4254            edge_property_conditions: self.edge_property_conditions.clone(),
4255            used_edge_columns: self.used_edge_columns.clone(),
4256            path_mode: self.path_mode.clone(),
4257            output_mode: self.output_mode.clone(),
4258            schema: self.schema.clone(),
4259            state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4260            metrics,
4261        }))
4262    }
4263
4264    fn metrics(&self) -> Option<MetricsSet> {
4265        Some(self.metrics.clone_inner())
4266    }
4267}
4268
4269/// State machine for VLP schemaless stream.
4270enum VarLengthMainStreamState {
4271    /// Loading adjacency map from main edges table.
4272    Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4273    /// Processing input batches with loaded adjacency.
4274    Processing(EdgeAdjacencyMap),
4275    /// Materializing properties for a batch.
4276    Materializing {
4277        adjacency: EdgeAdjacencyMap,
4278        fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4279    },
4280    /// Stream is done.
4281    Done,
4282}
4283
4284/// Stream for variable-length traversal on schemaless edges.
4285#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4286struct GraphVariableLengthTraverseMainStream {
4287    input: SendableRecordBatchStream,
4288    source_column: String,
4289    type_names: Vec<String>,
4290    direction: Direction,
4291    min_hops: usize,
4292    max_hops: usize,
4293    target_variable: String,
4294    /// Relationship variable like `r` in `[r*1..3]` - gets a List of edge structs.
4295    step_variable: Option<String>,
4296    path_variable: Option<String>,
4297    target_properties: Vec<String>,
4298    graph_ctx: Arc<GraphExecutionContext>,
4299    is_optional: bool,
4300    bound_target_column: Option<String>,
4301    edge_lance_filter: Option<String>,
4302    /// Edge property conditions to check during BFS.
4303    edge_property_conditions: Vec<(String, UniValue)>,
4304    used_edge_columns: Vec<String>,
4305    path_mode: super::nfa::PathMode,
4306    output_mode: super::nfa::VlpOutputMode,
4307    schema: SchemaRef,
4308    state: VarLengthMainStreamState,
4309    metrics: BaselineMetrics,
4310}
4311
4312/// BFS result type: (target_vid, hop_count, node_path, edge_path)
4313type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4314
4315impl GraphVariableLengthTraverseMainStream {
4316    /// Perform BFS from a source vertex using the adjacency map.
4317    ///
4318    /// `used_eids` contains edge IDs already bound by earlier pattern elements
4319    /// in the same MATCH clause, enforcing cross-pattern relationship uniqueness
4320    /// (Cypher semantics require all relationships in a MATCH to be distinct).
4321    fn bfs(
4322        &self,
4323        source: Vid,
4324        adjacency: &EdgeAdjacencyMap,
4325        used_eids: &FxHashSet<u64>,
4326    ) -> Vec<MainBfsResult> {
4327        let mut results = Vec::new();
4328        let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4329
4330        queue.push_back((source, 0, vec![source], vec![]));
4331
4332        while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4333            // Emit result if within hop range (including zero-length patterns)
4334            if depth >= self.min_hops && depth <= self.max_hops {
4335                results.push((current, depth, node_path.clone(), edge_path.clone()));
4336            }
4337
4338            // Stop if at max depth
4339            if depth >= self.max_hops {
4340                continue;
4341            }
4342
4343            // Get neighbors from adjacency map
4344            if let Some(neighbors) = adjacency.get(&current) {
4345                let is_undirected = matches!(self.direction, Direction::Both);
4346                let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4347
4348                for (neighbor, eid, _edge_type, props) in neighbors {
4349                    // Deduplicate edges for undirected patterns
4350                    if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4351                        continue;
4352                    }
4353
4354                    // Enforce relationship uniqueness per-path (Cypher semantics).
4355                    if edge_path.contains(eid) {
4356                        continue;
4357                    }
4358
4359                    // Enforce cross-pattern relationship uniqueness: skip edges
4360                    // already bound by earlier pattern elements in the same MATCH.
4361                    if used_eids.contains(&eid.as_u64()) {
4362                        continue;
4363                    }
4364
4365                    // Check edge property conditions (e.g., {year: 1988}).
4366                    if !self.edge_property_conditions.is_empty() {
4367                        let passes =
4368                            self.edge_property_conditions
4369                                .iter()
4370                                .all(|(name, expected)| {
4371                                    props.get(name).is_some_and(|actual| actual == expected)
4372                                });
4373                        if !passes {
4374                            continue;
4375                        }
4376                    }
4377
4378                    let mut new_node_path = node_path.clone();
4379                    new_node_path.push(*neighbor);
4380                    let mut new_edge_path = edge_path.clone();
4381                    new_edge_path.push(*eid);
4382                    queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4383                }
4384            }
4385        }
4386
4387        results
4388    }
4389
4390    /// Process a batch using the adjacency map.
4391    fn process_batch(
4392        &self,
4393        batch: RecordBatch,
4394        adjacency: &EdgeAdjacencyMap,
4395    ) -> DFResult<RecordBatch> {
4396        let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4397            datafusion::error::DataFusionError::Execution(format!(
4398                "Source column '{}' not found in input batch",
4399                self.source_column
4400            ))
4401        })?;
4402
4403        let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4404        let source_vids: &UInt64Array = &source_vid_cow;
4405
4406        // Read bound target VIDs if column exists
4407        let bound_target_cow = self
4408            .bound_target_column
4409            .as_ref()
4410            .and_then(|col| batch.column_by_name(col))
4411            .map(|c| column_as_vid_array(c.as_ref()))
4412            .transpose()?;
4413        let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4414
4415        // Extract used edge columns for cross-pattern relationship uniqueness
4416        let used_edge_arrays: Vec<&UInt64Array> = self
4417            .used_edge_columns
4418            .iter()
4419            .filter_map(|col| {
4420                batch
4421                    .column_by_name(col)?
4422                    .as_any()
4423                    .downcast_ref::<UInt64Array>()
4424            })
4425            .collect();
4426
4427        // Collect BFS results: (original_row_idx, target_vid, hop_count, node_path, edge_path)
4428        let mut expansions: Vec<ExpansionRecord> = Vec::new();
4429
4430        for (row_idx, source_opt) in source_vids.iter().enumerate() {
4431            let mut emitted_for_row = false;
4432
4433            if let Some(source_u64) = source_opt {
4434                let source = Vid::from(source_u64);
4435
4436                // Collect used edge IDs from previous hops for this row
4437                let used_eids: FxHashSet<u64> = used_edge_arrays
4438                    .iter()
4439                    .filter_map(|arr| {
4440                        if arr.is_null(row_idx) {
4441                            None
4442                        } else {
4443                            Some(arr.value(row_idx))
4444                        }
4445                    })
4446                    .collect();
4447
4448                let bfs_results = self.bfs(source, adjacency, &used_eids);
4449
4450                for (target, hops, node_path, edge_path) in bfs_results {
4451                    // Filter by bound target VID if set (for patterns where target is in scope).
4452                    // NULL bound targets do not match anything.
4453                    if let Some(targets) = expected_targets {
4454                        if targets.is_null(row_idx) {
4455                            continue;
4456                        }
4457                        let expected_vid = targets.value(row_idx);
4458                        if target.as_u64() != expected_vid {
4459                            continue;
4460                        }
4461                    }
4462
4463                    expansions.push((row_idx, target, hops, node_path, edge_path));
4464                    emitted_for_row = true;
4465                }
4466            }
4467
4468            if self.is_optional && !emitted_for_row {
4469                // Preserve source row with NULL optional bindings.
4470                expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4471            }
4472        }
4473
4474        if expansions.is_empty() {
4475            if self.is_optional {
4476                let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4477                return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4478            }
4479            return Ok(RecordBatch::new_empty(self.schema.clone()));
4480        }
4481
4482        let num_rows = expansions.len();
4483        self.metrics.record_output(num_rows);
4484
4485        // Build output columns
4486        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4487
4488        // Resolve this operator's edge type names to numeric ids once, so path
4489        // builders can recover the STORED orientation of flushed (L1-resident)
4490        // relationships via a directed-outgoing adjacency probe.
4491        let edge_type_ids: Vec<u32> = {
4492            let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4493            self.type_names
4494                .iter()
4495                .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4496                .collect()
4497        };
4498
4499        // Expand input columns
4500        for col_idx in 0..batch.num_columns() {
4501            let array = batch.column(col_idx);
4502            let indices: Vec<u64> = expansions
4503                .iter()
4504                .map(|(idx, _, _, _, _)| *idx as u64)
4505                .collect();
4506            let take_indices = UInt64Array::from(indices);
4507            let expanded = arrow::compute::take(array, &take_indices, None)?;
4508            columns.push(expanded);
4509        }
4510
4511        // Add target VID column (only if not already in input)
4512        let target_vid_name = format!("{}._vid", self.target_variable);
4513        if batch.schema().column_with_name(&target_vid_name).is_none() {
4514            let target_vids: Vec<Option<u64>> = expansions
4515                .iter()
4516                .map(|(_, vid, _, node_path, edge_path)| {
4517                    if node_path.is_empty() && edge_path.is_empty() {
4518                        None
4519                    } else {
4520                        Some(vid.as_u64())
4521                    }
4522                })
4523                .collect();
4524            columns.push(Arc::new(UInt64Array::from(target_vids)));
4525        }
4526
4527        // Add target ._labels column (only if not already in input)
4528        let target_labels_name = format!("{}._labels", self.target_variable);
4529        if batch
4530            .schema()
4531            .column_with_name(&target_labels_name)
4532            .is_none()
4533        {
4534            use arrow_array::builder::{ListBuilder, StringBuilder};
4535            let mut labels_builder = ListBuilder::new(StringBuilder::new());
4536            for (_, vid, _, node_path, edge_path) in expansions.iter() {
4537                if node_path.is_empty() && edge_path.is_empty() {
4538                    labels_builder.append(false);
4539                    continue;
4540                }
4541                let mut row_labels: Vec<String> = Vec::new();
4542                let labels =
4543                    l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4544                for lbl in &labels {
4545                    if !row_labels.contains(lbl) {
4546                        row_labels.push(lbl.clone());
4547                    }
4548                }
4549                let values = labels_builder.values();
4550                for lbl in &row_labels {
4551                    values.append_value(lbl);
4552                }
4553                labels_builder.append(true);
4554            }
4555            columns.push(Arc::new(labels_builder.finish()));
4556        }
4557
4558        // Add hop count column
4559        let hop_counts: Vec<u64> = expansions
4560            .iter()
4561            .map(|(_, _, hops, _, _)| *hops as u64)
4562            .collect();
4563        columns.push(Arc::new(UInt64Array::from(hop_counts)));
4564
4565        // Add step variable column if bound (list of edge structs).
4566        if self.step_variable.is_some() {
4567            let mut edges_builder = new_edge_list_builder();
4568            let query_ctx = self.graph_ctx.query_context();
4569            let type_names_str = self.type_names.join("|");
4570
4571            for (_, _, _, node_path, edge_path) in expansions.iter() {
4572                if node_path.is_empty() && edge_path.is_empty() {
4573                    edges_builder.append_null();
4574                } else if edge_path.is_empty() {
4575                    // Zero-hop match: empty list.
4576                    edges_builder.append(true);
4577                } else {
4578                    for (i, eid) in edge_path.iter().enumerate() {
4579                        // Report the relationship's STORED direction, not the
4580                        // traversal order (`node_path[i]` -> `node_path[i + 1]`).
4581                        // Resolves flushed (L1-resident) edges too, where the L0
4582                        // chain no longer holds the stored endpoints.
4583                        let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4584                            *eid,
4585                            node_path[i],
4586                            node_path[i + 1],
4587                            &edge_type_ids,
4588                        );
4589                        append_edge_to_struct(
4590                            edges_builder.values(),
4591                            *eid,
4592                            &type_names_str,
4593                            src,
4594                            dst,
4595                            &query_ctx,
4596                        );
4597                    }
4598                    edges_builder.append(true);
4599                }
4600            }
4601
4602            columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4603        }
4604
4605        // Add path variable column if bound.
4606        // If a path column already exists in input (from a prior BindFixedPath), extend it
4607        // rather than building from scratch.
4608        if let Some(path_var_name) = &self.path_variable {
4609            let existing_path_col_idx = batch
4610                .schema()
4611                .column_with_name(path_var_name)
4612                .map(|(idx, _)| idx);
4613            let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4614            let existing_path = existing_path_arc
4615                .as_ref()
4616                .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4617
4618            let mut nodes_builder = new_node_list_builder();
4619            let mut rels_builder = new_edge_list_builder();
4620            let query_ctx = self.graph_ctx.query_context();
4621            let type_names_str = self.type_names.join("|");
4622            let mut path_validity = Vec::with_capacity(expansions.len());
4623
4624            for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4625                if node_path.is_empty() && edge_path.is_empty() {
4626                    nodes_builder.append(false);
4627                    rels_builder.append(false);
4628                    path_validity.push(false);
4629                    continue;
4630                }
4631
4632                // Prepend existing path prefix if extending
4633                let skip_first_vlp_node = if let Some(existing) = existing_path {
4634                    if !existing.is_null(row_out_idx) {
4635                        prepend_existing_path(
4636                            existing,
4637                            row_out_idx,
4638                            &mut nodes_builder,
4639                            &mut rels_builder,
4640                            &query_ctx,
4641                        );
4642                        true
4643                    } else {
4644                        false
4645                    }
4646                } else {
4647                    false
4648                };
4649
4650                // Append VLP nodes (skip first if extending — it's the junction point)
4651                let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4652                for vid in &node_path[start_idx..] {
4653                    append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4654                }
4655                nodes_builder.append(true);
4656
4657                for (i, eid) in edge_path.iter().enumerate() {
4658                    // Report the relationship's STORED direction, not the traversal
4659                    // order (`node_path[i]` -> `node_path[i + 1]`). Resolves flushed
4660                    // (L1-resident) edges too, where the L0 chain no longer holds
4661                    // the stored endpoints.
4662                    let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4663                        *eid,
4664                        node_path[i],
4665                        node_path[i + 1],
4666                        &edge_type_ids,
4667                    );
4668                    append_edge_to_struct(
4669                        rels_builder.values(),
4670                        *eid,
4671                        &type_names_str,
4672                        src,
4673                        dst,
4674                        &query_ctx,
4675                    );
4676                }
4677                rels_builder.append(true);
4678                path_validity.push(true);
4679            }
4680
4681            // Finish the builders to get the arrays
4682            let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4683            let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4684
4685            // Build the path struct with nodes and relationships fields
4686            let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4687            let rels_field = Arc::new(Field::new(
4688                "relationships",
4689                rels_array.data_type().clone(),
4690                true,
4691            ));
4692
4693            // Create the path struct array
4694            let path_struct = arrow_array::StructArray::try_new(
4695                vec![nodes_field, rels_field].into(),
4696                vec![nodes_array, rels_array],
4697                Some(arrow::buffer::NullBuffer::from(path_validity)),
4698            )
4699            .map_err(arrow_err)?;
4700
4701            if let Some(idx) = existing_path_col_idx {
4702                columns[idx] = Arc::new(path_struct);
4703            } else {
4704                columns.push(Arc::new(path_struct));
4705            }
4706        }
4707
4708        // Add target property columns as NULL for now (skip if already in input).
4709        // Property hydration happens via PropertyManager in the query execution pipeline.
4710        for prop_name in &self.target_properties {
4711            let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4712            if batch.schema().column_with_name(&full_prop_name).is_none() {
4713                columns.push(arrow_array::new_null_array(
4714                    &DataType::LargeBinary,
4715                    num_rows,
4716                ));
4717            }
4718        }
4719
4720        RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4721    }
4722}
4723
4724impl Stream for GraphVariableLengthTraverseMainStream {
4725    type Item = DFResult<RecordBatch>;
4726
4727    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4728        let metrics = self.metrics.clone();
4729        let _timer = metrics.elapsed_compute().timer();
4730        loop {
4731            let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4732
4733            match state {
4734                VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4735                    Poll::Ready(Ok(adjacency)) => {
4736                        self.state = VarLengthMainStreamState::Processing(adjacency);
4737                        // Continue loop to start processing
4738                    }
4739                    Poll::Ready(Err(e)) => {
4740                        self.state = VarLengthMainStreamState::Done;
4741                        return Poll::Ready(Some(Err(e)));
4742                    }
4743                    Poll::Pending => {
4744                        self.state = VarLengthMainStreamState::Loading(fut);
4745                        return Poll::Pending;
4746                    }
4747                },
4748                VarLengthMainStreamState::Processing(adjacency) => {
4749                    match self.input.poll_next_unpin(cx) {
4750                        Poll::Ready(Some(Ok(batch))) => {
4751                            let base_batch = match self.process_batch(batch, &adjacency) {
4752                                Ok(b) => b,
4753                                Err(e) => {
4754                                    self.state = VarLengthMainStreamState::Processing(adjacency);
4755                                    return Poll::Ready(Some(Err(e)));
4756                                }
4757                            };
4758
4759                            // If no properties need async hydration, return directly
4760                            if self.target_properties.is_empty() {
4761                                self.state = VarLengthMainStreamState::Processing(adjacency);
4762                                return Poll::Ready(Some(Ok(base_batch)));
4763                            }
4764
4765                            // Create async hydration future
4766                            let schema = self.schema.clone();
4767                            let target_variable = self.target_variable.clone();
4768                            let target_properties = self.target_properties.clone();
4769                            let graph_ctx = self.graph_ctx.clone();
4770
4771                            let fut = hydrate_vlp_target_properties(
4772                                base_batch,
4773                                schema,
4774                                target_variable,
4775                                target_properties,
4776                                None, // schemaless — no label name
4777                                graph_ctx,
4778                            );
4779
4780                            self.state = VarLengthMainStreamState::Materializing {
4781                                adjacency,
4782                                fut: Box::pin(fut),
4783                            };
4784                            // Continue loop to poll the future
4785                        }
4786                        Poll::Ready(Some(Err(e))) => {
4787                            self.state = VarLengthMainStreamState::Done;
4788                            return Poll::Ready(Some(Err(e)));
4789                        }
4790                        Poll::Ready(None) => {
4791                            self.state = VarLengthMainStreamState::Done;
4792                            return Poll::Ready(None);
4793                        }
4794                        Poll::Pending => {
4795                            self.state = VarLengthMainStreamState::Processing(adjacency);
4796                            return Poll::Pending;
4797                        }
4798                    }
4799                }
4800                VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4801                    match fut.as_mut().poll(cx) {
4802                        Poll::Ready(Ok(batch)) => {
4803                            self.state = VarLengthMainStreamState::Processing(adjacency);
4804                            return Poll::Ready(Some(Ok(batch)));
4805                        }
4806                        Poll::Ready(Err(e)) => {
4807                            self.state = VarLengthMainStreamState::Done;
4808                            return Poll::Ready(Some(Err(e)));
4809                        }
4810                        Poll::Pending => {
4811                            self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4812                            return Poll::Pending;
4813                        }
4814                    }
4815                }
4816                VarLengthMainStreamState::Done => {
4817                    return Poll::Ready(None);
4818                }
4819            }
4820        }
4821    }
4822}
4823
4824impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4825    fn schema(&self) -> SchemaRef {
4826        self.schema.clone()
4827    }
4828}
4829
4830#[cfg(test)]
4831mod tests {
4832    use super::*;
4833
4834    #[test]
4835    fn test_traverse_schema_without_edge() {
4836        let input_schema = Arc::new(Schema::new(vec![Field::new(
4837            "a._vid",
4838            DataType::UInt64,
4839            false,
4840        )]));
4841
4842        let output_schema =
4843            GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4844
4845        // Schema: input + target VID + target _labels + internal edge ID
4846        assert_eq!(output_schema.fields().len(), 4);
4847        assert_eq!(output_schema.field(0).name(), "a._vid");
4848        assert_eq!(output_schema.field(1).name(), "m._vid");
4849        assert_eq!(output_schema.field(2).name(), "m._labels");
4850        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4851    }
4852
4853    #[test]
4854    fn test_traverse_schema_with_edge() {
4855        let input_schema = Arc::new(Schema::new(vec![Field::new(
4856            "a._vid",
4857            DataType::UInt64,
4858            false,
4859        )]));
4860
4861        let output_schema = GraphTraverseExec::build_schema(
4862            input_schema,
4863            "m",
4864            Some("r"),
4865            &[],
4866            &[],
4867            None,
4868            None,
4869            false,
4870        );
4871
4872        // Schema: input + target VID + target _labels + edge EID + edge _type
4873        assert_eq!(output_schema.fields().len(), 5);
4874        assert_eq!(output_schema.field(0).name(), "a._vid");
4875        assert_eq!(output_schema.field(1).name(), "m._vid");
4876        assert_eq!(output_schema.field(2).name(), "m._labels");
4877        assert_eq!(output_schema.field(3).name(), "r._eid");
4878        assert_eq!(output_schema.field(4).name(), "r._type");
4879    }
4880
4881    #[test]
4882    fn test_traverse_schema_with_target_properties() {
4883        let input_schema = Arc::new(Schema::new(vec![Field::new(
4884            "a._vid",
4885            DataType::UInt64,
4886            false,
4887        )]));
4888
4889        let target_props = vec!["name".to_string(), "age".to_string()];
4890        let output_schema = GraphTraverseExec::build_schema(
4891            input_schema,
4892            "m",
4893            Some("r"),
4894            &[],
4895            &target_props,
4896            None,
4897            None,
4898            false,
4899        );
4900
4901        // a._vid, m._vid, m._labels, m.name, m.age, r._eid, r._type
4902        assert_eq!(output_schema.fields().len(), 7);
4903        assert_eq!(output_schema.field(0).name(), "a._vid");
4904        assert_eq!(output_schema.field(1).name(), "m._vid");
4905        assert_eq!(output_schema.field(2).name(), "m._labels");
4906        assert_eq!(output_schema.field(3).name(), "m.name");
4907        assert_eq!(output_schema.field(4).name(), "m.age");
4908        assert_eq!(output_schema.field(5).name(), "r._eid");
4909        assert_eq!(output_schema.field(6).name(), "r._type");
4910    }
4911
4912    #[test]
4913    fn test_variable_length_schema() {
4914        let input_schema = Arc::new(Schema::new(vec![Field::new(
4915            "a._vid",
4916            DataType::UInt64,
4917            false,
4918        )]));
4919
4920        let output_schema = GraphVariableLengthTraverseExec::build_schema(
4921            input_schema,
4922            "b",
4923            None,
4924            Some("p"),
4925            &[],
4926            None,
4927        );
4928
4929        assert_eq!(output_schema.fields().len(), 5);
4930        assert_eq!(output_schema.field(0).name(), "a._vid");
4931        assert_eq!(output_schema.field(1).name(), "b._vid");
4932        assert_eq!(output_schema.field(2).name(), "b._labels");
4933        assert_eq!(output_schema.field(3).name(), "_hop_count");
4934        assert_eq!(output_schema.field(4).name(), "p");
4935    }
4936
4937    #[test]
4938    fn test_traverse_main_schema_without_edge() {
4939        let input_schema = Arc::new(Schema::new(vec![Field::new(
4940            "a._vid",
4941            DataType::UInt64,
4942            false,
4943        )]));
4944
4945        let output_schema =
4946            GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4947
4948        // a._vid, m._vid, m._labels, __eid_to_m
4949        assert_eq!(output_schema.fields().len(), 4);
4950        assert_eq!(output_schema.field(0).name(), "a._vid");
4951        assert_eq!(output_schema.field(1).name(), "m._vid");
4952        assert_eq!(output_schema.field(2).name(), "m._labels");
4953        assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4954    }
4955
4956    #[test]
4957    fn test_traverse_main_schema_with_edge() {
4958        let input_schema = Arc::new(Schema::new(vec![Field::new(
4959            "a._vid",
4960            DataType::UInt64,
4961            false,
4962        )]));
4963
4964        let output_schema = GraphTraverseMainExec::build_schema(
4965            &input_schema,
4966            "m",
4967            &Some("r".to_string()),
4968            &[],
4969            &[],
4970            false,
4971        );
4972
4973        // a._vid, m._vid, m._labels, r._eid, r._type
4974        assert_eq!(output_schema.fields().len(), 5);
4975        assert_eq!(output_schema.field(0).name(), "a._vid");
4976        assert_eq!(output_schema.field(1).name(), "m._vid");
4977        assert_eq!(output_schema.field(2).name(), "m._labels");
4978        assert_eq!(output_schema.field(3).name(), "r._eid");
4979        assert_eq!(output_schema.field(4).name(), "r._type");
4980    }
4981
4982    #[test]
4983    fn test_traverse_main_schema_with_edge_properties() {
4984        let input_schema = Arc::new(Schema::new(vec![Field::new(
4985            "a._vid",
4986            DataType::UInt64,
4987            false,
4988        )]));
4989
4990        let edge_props = vec!["weight".to_string(), "since".to_string()];
4991        let output_schema = GraphTraverseMainExec::build_schema(
4992            &input_schema,
4993            "m",
4994            &Some("r".to_string()),
4995            &edge_props,
4996            &[],
4997            false,
4998        );
4999
5000        // a._vid, m._vid, m._labels, r._eid, r._type, r.weight, r.since
5001        assert_eq!(output_schema.fields().len(), 7);
5002        assert_eq!(output_schema.field(0).name(), "a._vid");
5003        assert_eq!(output_schema.field(1).name(), "m._vid");
5004        assert_eq!(output_schema.field(2).name(), "m._labels");
5005        assert_eq!(output_schema.field(3).name(), "r._eid");
5006        assert_eq!(output_schema.field(4).name(), "r._type");
5007        assert_eq!(output_schema.field(5).name(), "r.weight");
5008        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5009        assert_eq!(output_schema.field(6).name(), "r.since");
5010        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5011    }
5012
5013    #[test]
5014    fn test_traverse_main_schema_with_target_properties() {
5015        let input_schema = Arc::new(Schema::new(vec![Field::new(
5016            "a._vid",
5017            DataType::UInt64,
5018            false,
5019        )]));
5020
5021        let target_props = vec!["name".to_string(), "age".to_string()];
5022        let output_schema = GraphTraverseMainExec::build_schema(
5023            &input_schema,
5024            "m",
5025            &Some("r".to_string()),
5026            &[],
5027            &target_props,
5028            false,
5029        );
5030
5031        // a._vid, m._vid, m._labels, r._eid, r._type, m.name, m.age
5032        assert_eq!(output_schema.fields().len(), 7);
5033        assert_eq!(output_schema.field(0).name(), "a._vid");
5034        assert_eq!(output_schema.field(1).name(), "m._vid");
5035        assert_eq!(output_schema.field(2).name(), "m._labels");
5036        assert_eq!(output_schema.field(3).name(), "r._eid");
5037        assert_eq!(output_schema.field(4).name(), "r._type");
5038        assert_eq!(output_schema.field(5).name(), "m.name");
5039        assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5040        assert_eq!(output_schema.field(6).name(), "m.age");
5041        assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5042    }
5043}