Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down.
89    filter: Option<Arc<dyn PhysicalExpr>>,
90
91    /// Whether this is an edge scan (vs vertex scan).
92    is_edge_scan: bool,
93
94    /// Whether this is a schemaless scan (uses main table instead of per-label table).
95    is_schemaless: bool,
96
97    /// Output schema with materialized property columns.
98    schema: SchemaRef,
99
100    /// Cached plan properties.
101    properties: PlanProperties,
102
103    /// Metrics for execution tracking.
104    metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("GraphScanExec")
110            .field("label", &self.label)
111            .field("variable", &self.variable)
112            .field("projected_properties", &self.projected_properties)
113            .field("is_edge_scan", &self.is_edge_scan)
114            .finish()
115    }
116}
117
118impl GraphScanExec {
119    /// Create a new graph scan for vertices.
120    ///
121    /// Scans all vertices of the given label from storage and L0 buffers,
122    /// then materializes the requested properties.
123    pub fn new_vertex_scan(
124        graph_ctx: Arc<GraphExecutionContext>,
125        label: impl Into<String>,
126        variable: impl Into<String>,
127        projected_properties: Vec<String>,
128        filter: Option<Arc<dyn PhysicalExpr>>,
129    ) -> Self {
130        let label = label.into();
131        let variable = variable.into();
132
133        // Build output schema with proper types from Uni schema
134        let uni_schema = graph_ctx.storage().schema_manager().schema();
135        let schema =
136            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138        let properties = compute_plan_properties(schema.clone());
139
140        Self {
141            graph_ctx,
142            label,
143            variable,
144            projected_properties,
145            filter,
146            is_edge_scan: false,
147            is_schemaless: false,
148            schema,
149            properties,
150            metrics: ExecutionPlanMetricsSet::new(),
151        }
152    }
153
154    /// Create a new schemaless vertex scan.
155    ///
156    /// Scans the main vertices table for vertices with the given label name.
157    /// Properties are extracted from props_json (all treated as Utf8/JSON).
158    /// This is used for labels that aren't in the schema.
159    pub fn new_schemaless_vertex_scan(
160        graph_ctx: Arc<GraphExecutionContext>,
161        label_name: impl Into<String>,
162        variable: impl Into<String>,
163        projected_properties: Vec<String>,
164        filter: Option<Arc<dyn PhysicalExpr>>,
165    ) -> Self {
166        let label = label_name.into();
167        let variable = variable.into();
168
169        // Filter out system columns that are already materialized as dedicated columns
170        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
171        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
172        // they would create duplicate columns with conflicting types.
173        let projected_properties: Vec<String> = projected_properties
174            .into_iter()
175            .filter(|p| p != "_vid" && p != "_labels")
176            .collect();
177
178        // Build schema - all properties are Utf8 (from JSON) except _vid
179        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
180        let properties = compute_plan_properties(schema.clone());
181
182        Self {
183            graph_ctx,
184            label,
185            variable,
186            projected_properties,
187            filter,
188            is_edge_scan: false,
189            is_schemaless: true,
190            schema,
191            properties,
192            metrics: ExecutionPlanMetricsSet::new(),
193        }
194    }
195
196    /// Create a new multi-label vertex scan using the main vertices table.
197    ///
198    /// Scans for vertices that have ALL specified labels (intersection semantics).
199    /// Properties are extracted from props_json (schemaless).
200    pub fn new_multi_label_vertex_scan(
201        graph_ctx: Arc<GraphExecutionContext>,
202        labels: Vec<String>,
203        variable: impl Into<String>,
204        projected_properties: Vec<String>,
205        filter: Option<Arc<dyn PhysicalExpr>>,
206    ) -> Self {
207        let variable = variable.into();
208        let projected_properties: Vec<String> = projected_properties
209            .into_iter()
210            .filter(|p| p != "_vid" && p != "_labels")
211            .collect();
212        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
213        let properties = compute_plan_properties(schema.clone());
214
215        // Encode labels as colon-separated for the stream to parse
216        let encoded_labels = labels.join(":");
217
218        Self {
219            graph_ctx,
220            label: encoded_labels,
221            variable,
222            projected_properties,
223            filter,
224            is_edge_scan: false,
225            is_schemaless: true,
226            schema,
227            properties,
228            metrics: ExecutionPlanMetricsSet::new(),
229        }
230    }
231
232    /// Create a new schemaless scan for all vertices.
233    ///
234    /// Scans the main vertices table for all vertices regardless of label.
235    /// Properties are extracted from props_json (all treated as Utf8/JSON).
236    /// This is used for `MATCH (n)` without label filter.
237    pub fn new_schemaless_all_scan(
238        graph_ctx: Arc<GraphExecutionContext>,
239        variable: impl Into<String>,
240        projected_properties: Vec<String>,
241        filter: Option<Arc<dyn PhysicalExpr>>,
242    ) -> Self {
243        let variable = variable.into();
244        let projected_properties: Vec<String> = projected_properties
245            .into_iter()
246            .filter(|p| p != "_vid" && p != "_labels")
247            .collect();
248
249        // Build schema - all properties are Utf8 (from JSON) except _vid
250        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
251        let properties = compute_plan_properties(schema.clone());
252
253        Self {
254            graph_ctx,
255            label: String::new(), // Empty label signals "scan all vertices"
256            variable,
257            projected_properties,
258            filter,
259            is_edge_scan: false,
260            is_schemaless: true,
261            schema,
262            properties,
263            metrics: ExecutionPlanMetricsSet::new(),
264        }
265    }
266
267    /// Build schema for schemaless vertex scan (all properties as LargeBinary/CypherValue).
268    fn build_schemaless_vertex_schema(variable: &str, properties: &[String]) -> SchemaRef {
269        let mut fields = vec![
270            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
271            Field::new(format!("{}._labels", variable), labels_data_type(), true),
272        ];
273
274        for prop in properties {
275            let col_name = format!("{}.{}", variable, prop);
276            // Schemaless properties use LargeBinary with CypherValue encoding to preserve types
277            fields.push(Field::new(&col_name, DataType::LargeBinary, true));
278        }
279
280        Arc::new(Schema::new(fields))
281    }
282
283    /// Create a new graph scan for edges.
284    ///
285    /// Scans all edges of the given type from storage and L0 buffers,
286    /// then materializes the requested properties.
287    pub fn new_edge_scan(
288        graph_ctx: Arc<GraphExecutionContext>,
289        edge_type: impl Into<String>,
290        variable: impl Into<String>,
291        projected_properties: Vec<String>,
292        filter: Option<Arc<dyn PhysicalExpr>>,
293    ) -> Self {
294        let label = edge_type.into();
295        let variable = variable.into();
296
297        // Build output schema with proper types from Uni schema
298        let uni_schema = graph_ctx.storage().schema_manager().schema();
299        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
300
301        let properties = compute_plan_properties(schema.clone());
302
303        Self {
304            graph_ctx,
305            label,
306            variable,
307            projected_properties,
308            filter,
309            is_edge_scan: true,
310            is_schemaless: false,
311            schema,
312            properties,
313            metrics: ExecutionPlanMetricsSet::new(),
314        }
315    }
316
317    /// Build output schema for vertex scan with proper Arrow types.
318    fn build_vertex_schema(
319        variable: &str,
320        label: &str,
321        properties: &[String],
322        uni_schema: &UniSchema,
323    ) -> SchemaRef {
324        let mut fields = vec![
325            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
326            Field::new(format!("{}._labels", variable), labels_data_type(), true),
327        ];
328        let label_props = uni_schema.properties.get(label);
329        for prop in properties {
330            let col_name = format!("{}.{}", variable, prop);
331            let arrow_type = resolve_property_type(prop, label_props);
332            fields.push(Field::new(&col_name, arrow_type, true));
333        }
334        Arc::new(Schema::new(fields))
335    }
336
337    /// Build output schema for edge scan with proper Arrow types.
338    fn build_edge_schema(
339        variable: &str,
340        edge_type: &str,
341        properties: &[String],
342        uni_schema: &UniSchema,
343    ) -> SchemaRef {
344        let mut fields = vec![
345            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
346            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
347            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
348        ];
349        let edge_props = uni_schema.properties.get(edge_type);
350        for prop in properties {
351            let col_name = format!("{}.{}", variable, prop);
352            let arrow_type = resolve_property_type(prop, edge_props);
353            fields.push(Field::new(&col_name, arrow_type, true));
354        }
355        Arc::new(Schema::new(fields))
356    }
357}
358
359impl DisplayAs for GraphScanExec {
360    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
362        write!(
363            f,
364            "GraphScanExec: {}={}, properties={:?}",
365            scan_type, self.label, self.projected_properties
366        )?;
367        if self.filter.is_some() {
368            write!(f, ", filter=<pushed>")?;
369        }
370        Ok(())
371    }
372}
373
374impl ExecutionPlan for GraphScanExec {
375    fn name(&self) -> &str {
376        "GraphScanExec"
377    }
378
379    fn as_any(&self) -> &dyn Any {
380        self
381    }
382
383    fn schema(&self) -> SchemaRef {
384        self.schema.clone()
385    }
386
387    fn properties(&self) -> &PlanProperties {
388        &self.properties
389    }
390
391    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
392        vec![]
393    }
394
395    fn with_new_children(
396        self: Arc<Self>,
397        children: Vec<Arc<dyn ExecutionPlan>>,
398    ) -> DFResult<Arc<dyn ExecutionPlan>> {
399        if children.is_empty() {
400            Ok(self)
401        } else {
402            Err(datafusion::error::DataFusionError::Plan(
403                "GraphScanExec does not accept children".to_string(),
404            ))
405        }
406    }
407
408    fn execute(
409        &self,
410        partition: usize,
411        _context: Arc<TaskContext>,
412    ) -> DFResult<SendableRecordBatchStream> {
413        let metrics = BaselineMetrics::new(&self.metrics, partition);
414
415        Ok(Box::pin(GraphScanStream::new(
416            self.graph_ctx.clone(),
417            self.label.clone(),
418            self.variable.clone(),
419            self.projected_properties.clone(),
420            self.is_edge_scan,
421            self.is_schemaless,
422            self.schema.clone(),
423            metrics,
424        )))
425    }
426
427    fn metrics(&self) -> Option<MetricsSet> {
428        Some(self.metrics.clone_inner())
429    }
430}
431
432/// State machine for graph scan stream execution.
433enum GraphScanState {
434    /// Initial state, ready to start scanning.
435    Init,
436    /// Executing the async scan.
437    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
438    /// Stream is done.
439    Done,
440}
441
442/// Stream that scans vertices or edges and materializes properties.
443///
444/// For known-label vertex scans, uses a single columnar Lance query with
445/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
446/// to the two-phase VID-scan + property-materialize flow.
447struct GraphScanStream {
448    /// Graph execution context.
449    graph_ctx: Arc<GraphExecutionContext>,
450
451    /// Label (vertex) or edge type name.
452    label: String,
453
454    /// Variable name for column prefixing (e.g., "n" in `n.name`).
455    variable: String,
456
457    /// Properties to materialize.
458    properties: Vec<String>,
459
460    /// Whether this is an edge scan.
461    is_edge_scan: bool,
462
463    /// Whether this is a schemaless scan.
464    is_schemaless: bool,
465
466    /// Output schema.
467    schema: SchemaRef,
468
469    /// Stream state.
470    state: GraphScanState,
471
472    /// Metrics.
473    metrics: BaselineMetrics,
474}
475
476impl GraphScanStream {
477    /// Create a new graph scan stream.
478    #[expect(clippy::too_many_arguments)]
479    fn new(
480        graph_ctx: Arc<GraphExecutionContext>,
481        label: String,
482        variable: String,
483        properties: Vec<String>,
484        is_edge_scan: bool,
485        is_schemaless: bool,
486        schema: SchemaRef,
487        metrics: BaselineMetrics,
488    ) -> Self {
489        Self {
490            graph_ctx,
491            label,
492            variable,
493            properties,
494            is_edge_scan,
495            is_schemaless,
496            schema,
497            state: GraphScanState::Init,
498            metrics,
499        }
500    }
501}
502
503/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
504///
505/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
506/// preserving original value types for overflow/unknown properties.
507pub(crate) fn resolve_property_type(
508    prop: &str,
509    schema_props: Option<
510        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
511    >,
512) -> DataType {
513    if prop == "overflow_json" {
514        DataType::LargeBinary
515    } else {
516        schema_props
517            .and_then(|props| props.get(prop))
518            .map(|meta| meta.r#type.to_arrow())
519            .unwrap_or(DataType::LargeBinary)
520    }
521}
522
523// ============================================================================
524// Columnar-first scan helpers
525// ============================================================================
526
527/// MVCC deduplication: keep only the highest-version row for each `_vid`.
528///
529/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
530/// _vid (= the highest version). This is a pure Arrow-compute operation.
531#[cfg(test)]
532fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
533    mvcc_dedup_batch_by(batch, "_vid")
534}
535
536/// Dedup a Lance batch and return `Some` only when rows remain.
537///
538/// Wraps the common pattern of dedup + empty-check that appears in every
539/// columnar scan path (vertex, edge, schemaless).
540fn mvcc_dedup_to_option(
541    batch: Option<RecordBatch>,
542    id_column: &str,
543) -> DFResult<Option<RecordBatch>> {
544    match batch {
545        Some(b) => {
546            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
547            Ok(if deduped.num_rows() > 0 {
548                Some(deduped)
549            } else {
550                None
551            })
552        }
553        None => Ok(None),
554    }
555}
556
557/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
558/// result. Returns an empty batch (against `output_schema`) when both inputs
559/// are empty.
560fn merge_lance_and_l0(
561    lance_deduped: Option<RecordBatch>,
562    l0_batch: RecordBatch,
563    internal_schema: &SchemaRef,
564    id_column: &str,
565) -> DFResult<Option<RecordBatch>> {
566    let has_l0 = l0_batch.num_rows() > 0;
567    match (lance_deduped, has_l0) {
568        (Some(lance), true) => {
569            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
570                .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
571            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
572        }
573        (Some(lance), false) => Ok(Some(lance)),
574        (None, true) => Ok(Some(l0_batch)),
575        (None, false) => Ok(None),
576    }
577}
578
579/// Push `col_name` into `columns` if not already present.
580///
581/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
582/// that creates a temporary `String` allocation on every check.
583fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
584    if !columns.iter().any(|c| c == col_name) {
585        columns.push(col_name.to_string());
586    }
587}
588
589/// Extract a property value from an overflow_json CypherValue blob.
590///
591/// Returns the raw CypherValue bytes for `prop` if found in the blob,
592/// or `None` if the blob is null or the key is absent.
593fn extract_from_overflow_blob(
594    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
595    row: usize,
596    prop: &str,
597) -> Option<Vec<u8>> {
598    let arr = overflow_arr?;
599    if arr.is_null(row) {
600        return None;
601    }
602    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
603}
604
605/// Build a `LargeBinary` column by extracting a property from overflow_json
606/// blobs, with L0 buffer overlay.
607///
608/// For each row, checks L0 buffers first (later buffers take precedence).
609/// If the property is not in L0, falls back to extracting from the
610/// overflow_json CypherValue blob.
611fn build_overflow_property_column(
612    num_rows: usize,
613    vid_arr: &UInt64Array,
614    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
615    prop: &str,
616    l0_ctx: &crate::query::df_graph::L0Context,
617) -> ArrayRef {
618    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
619    for i in 0..num_rows {
620        let vid = Vid::from(vid_arr.value(i));
621
622        // Check L0 buffers (later overwrites earlier)
623        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
624
625        if let Some(val_opt) = l0_val {
626            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
627        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
628            builder.append_value(&bytes);
629        } else {
630            builder.append_null();
631        }
632    }
633    Arc::new(builder.finish())
634}
635
636/// Resolve a property value from the L0 visibility chain.
637///
638/// Returns `Some(Some(val))` when the property exists with a non-null value,
639/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
640/// has the property.
641fn resolve_l0_property(
642    vid: &Vid,
643    prop: &str,
644    l0_ctx: &crate::query::df_graph::L0Context,
645) -> Option<Option<Value>> {
646    let mut result = None;
647    for l0 in l0_ctx.iter_l0_buffers() {
648        let guard = l0.read();
649        if let Some(props) = guard.vertex_properties.get(vid)
650            && let Some(val) = props.get(prop)
651        {
652            result = Some(Some(val.clone()));
653        }
654    }
655    result
656}
657
658/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
659///
660/// Non-null values are JSON-encoded then CypherValue-encoded.
661/// Null values (or encoding failures) produce null entries.
662fn append_value_as_cypher_binary(
663    builder: &mut arrow_array::builder::LargeBinaryBuilder,
664    val: Option<&Value>,
665) {
666    match val {
667        Some(v) if !v.is_null() => {
668            let json_val: serde_json::Value = v.clone().into();
669            match encode_cypher_value(&json_val) {
670                Ok(bytes) => builder.append_value(bytes),
671                Err(_) => builder.append_null(),
672            }
673        }
674        _ => builder.append_null(),
675    }
676}
677
678/// MVCC deduplication: keep only the highest-version row for each unique value
679/// in the given `id_column`.
680///
681/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
682/// each id (= the highest version). This is a pure Arrow-compute operation.
683fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
684    if batch.num_rows() == 0 {
685        return Ok(batch.clone());
686    }
687
688    let id_col = batch
689        .column_by_name(id_column)
690        .ok_or_else(|| {
691            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
692        })?
693        .clone();
694    let version_col = batch
695        .column_by_name("_version")
696        .ok_or_else(|| {
697            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
698        })?
699        .clone();
700
701    // Sort by (id_column ASC, _version DESC)
702    let sort_columns = vec![
703        arrow::compute::SortColumn {
704            values: id_col,
705            options: Some(arrow::compute::SortOptions {
706                descending: false,
707                nulls_first: false,
708            }),
709        },
710        arrow::compute::SortColumn {
711            values: version_col,
712            options: Some(arrow::compute::SortOptions {
713                descending: true,
714                nulls_first: false,
715            }),
716        },
717    ];
718    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None)
719        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
720
721    // Reorder all columns by sorted indices
722    let sorted_columns: Vec<ArrayRef> = batch
723        .columns()
724        .iter()
725        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
726        .collect::<Result<_, _>>()
727        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
728    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns)
729        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
730
731    // Build dedup mask: keep first occurrence of each id
732    let sorted_id = sorted
733        .column_by_name(id_column)
734        .unwrap()
735        .as_any()
736        .downcast_ref::<UInt64Array>()
737        .unwrap();
738
739    let mut keep = vec![false; sorted.num_rows()];
740    if !keep.is_empty() {
741        keep[0] = true;
742        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
743            if sorted_id.value(i) != sorted_id.value(i - 1) {
744                *flag = true;
745            }
746        }
747    }
748
749    let mask = arrow_array::BooleanArray::from(keep);
750    arrow::compute::filter_record_batch(&sorted, &mask)
751        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
752}
753
754/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
755fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
756    if batch.num_rows() == 0 {
757        return Ok(batch.clone());
758    }
759    let op_col = match batch.column_by_name("op") {
760        Some(col) => col
761            .as_any()
762            .downcast_ref::<arrow_array::UInt8Array>()
763            .unwrap(),
764        None => return Ok(batch.clone()),
765    };
766    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
767    let mask = arrow_array::BooleanArray::from(keep);
768    arrow::compute::filter_record_batch(batch, &mask)
769        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
770}
771
772/// Filter out rows where `_deleted = true` after MVCC dedup.
773fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
774    if batch.num_rows() == 0 {
775        return Ok(batch.clone());
776    }
777    let deleted_col = match batch.column_by_name("_deleted") {
778        Some(col) => col
779            .as_any()
780            .downcast_ref::<arrow_array::BooleanArray>()
781            .unwrap(),
782        None => return Ok(batch.clone()),
783    };
784    let keep: Vec<bool> = (0..deleted_col.len())
785        .map(|i| !deleted_col.value(i))
786        .collect();
787    let mask = arrow_array::BooleanArray::from(keep);
788    arrow::compute::filter_record_batch(batch, &mask)
789        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
790}
791
792/// Filter out rows whose `_vid` appears in L0 tombstones.
793fn filter_l0_tombstones(
794    batch: &RecordBatch,
795    l0_ctx: &crate::query::df_graph::L0Context,
796) -> DFResult<RecordBatch> {
797    if batch.num_rows() == 0 {
798        return Ok(batch.clone());
799    }
800
801    let mut tombstones: HashSet<u64> = HashSet::new();
802    for l0 in l0_ctx.iter_l0_buffers() {
803        let guard = l0.read();
804        for vid in guard.vertex_tombstones.iter() {
805            tombstones.insert(vid.as_u64());
806        }
807    }
808
809    if tombstones.is_empty() {
810        return Ok(batch.clone());
811    }
812
813    let vid_col = batch
814        .column_by_name("_vid")
815        .ok_or_else(|| {
816            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
817        })?
818        .as_any()
819        .downcast_ref::<UInt64Array>()
820        .unwrap();
821
822    let keep: Vec<bool> = (0..vid_col.len())
823        .map(|i| !tombstones.contains(&vid_col.value(i)))
824        .collect();
825    let mask = arrow_array::BooleanArray::from(keep);
826    arrow::compute::filter_record_batch(batch, &mask)
827        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
828}
829
830/// Filter out rows whose `eid` appears in L0 edge tombstones.
831fn filter_l0_edge_tombstones(
832    batch: &RecordBatch,
833    l0_ctx: &crate::query::df_graph::L0Context,
834) -> DFResult<RecordBatch> {
835    if batch.num_rows() == 0 {
836        return Ok(batch.clone());
837    }
838
839    let mut tombstones: HashSet<u64> = HashSet::new();
840    for l0 in l0_ctx.iter_l0_buffers() {
841        let guard = l0.read();
842        for eid in guard.tombstones.keys() {
843            tombstones.insert(eid.as_u64());
844        }
845    }
846
847    if tombstones.is_empty() {
848        return Ok(batch.clone());
849    }
850
851    let eid_col = batch
852        .column_by_name("eid")
853        .ok_or_else(|| {
854            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
855        })?
856        .as_any()
857        .downcast_ref::<UInt64Array>()
858        .unwrap();
859
860    let keep: Vec<bool> = (0..eid_col.len())
861        .map(|i| !tombstones.contains(&eid_col.value(i)))
862        .collect();
863    let mask = arrow_array::BooleanArray::from(keep);
864    arrow::compute::filter_record_batch(batch, &mask)
865        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
866}
867
868/// Build a RecordBatch from L0 buffer data for a given label, matching the
869/// Lance query's column set.
870///
871/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
872/// with later buffers overwriting earlier ones for the same VID.
873fn build_l0_vertex_batch(
874    l0_ctx: &crate::query::df_graph::L0Context,
875    label: &str,
876    lance_schema: &SchemaRef,
877    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
878) -> DFResult<RecordBatch> {
879    // Collect all L0 vertex data, merging in visibility order
880    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
881    let mut tombstones: HashSet<u64> = HashSet::new();
882
883    for l0 in l0_ctx.iter_l0_buffers() {
884        let guard = l0.read();
885        // Collect tombstones
886        for vid in guard.vertex_tombstones.iter() {
887            tombstones.insert(vid.as_u64());
888        }
889        // Collect vertices for this label
890        for vid in guard.vids_for_label(label) {
891            let vid_u64 = vid.as_u64();
892            if tombstones.contains(&vid_u64) {
893                continue;
894            }
895            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
896            let entry = vid_data
897                .entry(vid_u64)
898                .or_insert_with(|| (Properties::new(), 0));
899            // Merge properties (later L0 overwrites)
900            if let Some(props) = guard.vertex_properties.get(&vid) {
901                for (k, v) in props {
902                    entry.0.insert(k.clone(), v.clone());
903                }
904            }
905            // Take the highest version
906            if version > entry.1 {
907                entry.1 = version;
908            }
909        }
910    }
911
912    // Remove tombstoned VIDs
913    for t in &tombstones {
914        vid_data.remove(t);
915    }
916
917    if vid_data.is_empty() {
918        return Ok(RecordBatch::new_empty(lance_schema.clone()));
919    }
920
921    // Sort VIDs for deterministic output
922    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
923    vids.sort_unstable();
924
925    let num_rows = vids.len();
926    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
927
928    // Determine which schema property names exist
929    let schema_prop_names: HashSet<&str> = label_props
930        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
931        .unwrap_or_default();
932
933    for field in lance_schema.fields() {
934        let col_name = field.name().as_str();
935        match col_name {
936            "_vid" => {
937                columns.push(Arc::new(UInt64Array::from(vids.clone())));
938            }
939            "_deleted" => {
940                // L0 vertices are always live (tombstoned ones are already excluded)
941                let vals = vec![false; num_rows];
942                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
943            }
944            "_version" => {
945                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
946                columns.push(Arc::new(UInt64Array::from(vals)));
947            }
948            "overflow_json" => {
949                // Collect non-schema properties as CypherValue
950                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
951                for vid_u64 in &vids {
952                    let (props, _) = &vid_data[vid_u64];
953                    let mut overflow = serde_json::Map::new();
954                    for (k, v) in props {
955                        if k == "ext_id" || k.starts_with('_') {
956                            continue;
957                        }
958                        if !schema_prop_names.contains(k.as_str()) {
959                            let json_val: serde_json::Value = v.clone().into();
960                            overflow.insert(k.clone(), json_val);
961                        }
962                    }
963                    if overflow.is_empty() {
964                        builder.append_null();
965                    } else {
966                        let json_val = serde_json::Value::Object(overflow);
967                        match encode_cypher_value(&json_val) {
968                            Ok(bytes) => builder.append_value(bytes),
969                            Err(_) => builder.append_null(),
970                        }
971                    }
972                }
973                columns.push(Arc::new(builder.finish()));
974            }
975            _ => {
976                // Schema property column: convert L0 Value → Arrow typed value
977                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
978                columns.push(col);
979            }
980        }
981    }
982
983    RecordBatch::try_new(lance_schema.clone(), columns)
984        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
985}
986
987/// Build a single Arrow column from L0 property values.
988///
989/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
990fn build_l0_property_column(
991    vids: &[u64],
992    vid_data: &HashMap<u64, (Properties, u64)>,
993    prop_name: &str,
994    data_type: &DataType,
995) -> DFResult<ArrayRef> {
996    // Convert to Vid keys for reuse of existing build_property_column_static
997    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
998    let props_map: HashMap<Vid, Properties> = vid_data
999        .iter()
1000        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1001        .collect();
1002
1003    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1004}
1005
1006/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1007/// the DeltaDataset Lance table's column set.
1008///
1009/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1010/// with later buffers overwriting earlier ones for the same EID.
1011fn build_l0_edge_batch(
1012    l0_ctx: &crate::query::df_graph::L0Context,
1013    edge_type: &str,
1014    internal_schema: &SchemaRef,
1015    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1016) -> DFResult<RecordBatch> {
1017    // Collect all L0 edge data, merging in visibility order
1018    // eid -> (src_vid, dst_vid, properties, version)
1019    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1020    let mut tombstones: HashSet<u64> = HashSet::new();
1021
1022    for l0 in l0_ctx.iter_l0_buffers() {
1023        let guard = l0.read();
1024        // Collect tombstones
1025        for eid in guard.tombstones.keys() {
1026            tombstones.insert(eid.as_u64());
1027        }
1028        // Collect edges for this type
1029        for eid in guard.eids_for_type(edge_type) {
1030            let eid_u64 = eid.as_u64();
1031            if tombstones.contains(&eid_u64) {
1032                continue;
1033            }
1034            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1035                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1036                None => continue,
1037            };
1038            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1039            let entry = eid_data
1040                .entry(eid_u64)
1041                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1042            // Merge properties (later L0 overwrites)
1043            if let Some(props) = guard.edge_properties.get(&eid) {
1044                for (k, v) in props {
1045                    entry.2.insert(k.clone(), v.clone());
1046                }
1047            }
1048            // Update endpoints from latest L0 layer
1049            entry.0 = src_vid;
1050            entry.1 = dst_vid;
1051            // Take the highest version
1052            if version > entry.3 {
1053                entry.3 = version;
1054            }
1055        }
1056    }
1057
1058    // Remove tombstoned EIDs
1059    for t in &tombstones {
1060        eid_data.remove(t);
1061    }
1062
1063    if eid_data.is_empty() {
1064        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1065    }
1066
1067    // Sort EIDs for deterministic output
1068    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1069    eids.sort_unstable();
1070
1071    let num_rows = eids.len();
1072    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1073
1074    // Determine which schema property names exist
1075    let schema_prop_names: HashSet<&str> = type_props
1076        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1077        .unwrap_or_default();
1078
1079    for field in internal_schema.fields() {
1080        let col_name = field.name().as_str();
1081        match col_name {
1082            "eid" => {
1083                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1084            }
1085            "src_vid" => {
1086                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1087                columns.push(Arc::new(UInt64Array::from(vals)));
1088            }
1089            "dst_vid" => {
1090                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1091                columns.push(Arc::new(UInt64Array::from(vals)));
1092            }
1093            "op" => {
1094                // L0 edges are always live (tombstoned ones already excluded)
1095                let vals = vec![0u8; num_rows];
1096                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1097            }
1098            "_version" => {
1099                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1100                columns.push(Arc::new(UInt64Array::from(vals)));
1101            }
1102            "overflow_json" => {
1103                // Collect non-schema properties as CypherValue
1104                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1105                for eid_u64 in &eids {
1106                    let (_, _, props, _) = &eid_data[eid_u64];
1107                    let mut overflow = serde_json::Map::new();
1108                    for (k, v) in props {
1109                        if k.starts_with('_') {
1110                            continue;
1111                        }
1112                        if !schema_prop_names.contains(k.as_str()) {
1113                            let json_val: serde_json::Value = v.clone().into();
1114                            overflow.insert(k.clone(), json_val);
1115                        }
1116                    }
1117                    if overflow.is_empty() {
1118                        builder.append_null();
1119                    } else {
1120                        let json_val = serde_json::Value::Object(overflow);
1121                        match encode_cypher_value(&json_val) {
1122                            Ok(bytes) => builder.append_value(bytes),
1123                            Err(_) => builder.append_null(),
1124                        }
1125                    }
1126                }
1127                columns.push(Arc::new(builder.finish()));
1128            }
1129            _ => {
1130                // Schema property column: convert L0 Value → Arrow typed value
1131                let col =
1132                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1133                columns.push(col);
1134            }
1135        }
1136    }
1137
1138    RecordBatch::try_new(internal_schema.clone(), columns)
1139        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1140}
1141
1142/// Build a single Arrow column from L0 edge property values.
1143///
1144/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1145fn build_l0_edge_property_column(
1146    eids: &[u64],
1147    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1148    prop_name: &str,
1149    data_type: &DataType,
1150) -> DFResult<ArrayRef> {
1151    // Convert to Vid keys for reuse of existing build_property_column_static
1152    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1153    let props_map: HashMap<Vid, Properties> = eid_data
1154        .iter()
1155        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1156        .collect();
1157
1158    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1159}
1160
1161/// Build the `_labels` column for known-label vertices.
1162///
1163/// Reads `_labels` from the stored Lance batch if available. Falls back to
1164/// `[label]` when the column is absent (legacy data). Additional labels from
1165/// L0 buffers are merged in.
1166fn build_labels_column_for_known_label(
1167    vid_arr: &UInt64Array,
1168    label: &str,
1169    l0_ctx: &crate::query::df_graph::L0Context,
1170    batch_labels_col: Option<&arrow_array::ListArray>,
1171) -> DFResult<ArrayRef> {
1172    use uni_store::storage::arrow_convert::labels_from_list_array;
1173
1174    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1175
1176    for i in 0..vid_arr.len() {
1177        let vid = Vid::from(vid_arr.value(i));
1178
1179        // Start with labels from the stored column, falling back to [label]
1180        let mut labels = match batch_labels_col {
1181            Some(list_arr) => {
1182                let stored = labels_from_list_array(list_arr, i);
1183                if stored.is_empty() {
1184                    vec![label.to_string()]
1185                } else {
1186                    stored
1187                }
1188            }
1189            None => vec![label.to_string()],
1190        };
1191
1192        // Ensure the scanned label is present (defensive)
1193        if !labels.iter().any(|l| l == label) {
1194            labels.push(label.to_string());
1195        }
1196
1197        // Merge additional labels from L0 buffers
1198        for l0 in l0_ctx.iter_l0_buffers() {
1199            let guard = l0.read();
1200            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1201                for lbl in l0_labels {
1202                    if !labels.contains(lbl) {
1203                        labels.push(lbl.clone());
1204                    }
1205                }
1206            }
1207        }
1208
1209        let values = labels_builder.values();
1210        for lbl in &labels {
1211            values.append_value(lbl);
1212        }
1213        labels_builder.append(true);
1214    }
1215
1216    Ok(Arc::new(labels_builder.finish()))
1217}
1218
1219/// Map a Lance-schema batch to the DataFusion output schema.
1220///
1221/// The output schema has `{variable}.{property}` column names, while Lance
1222/// uses bare property names. This function performs the positional mapping,
1223/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1224fn map_to_output_schema(
1225    batch: &RecordBatch,
1226    label: &str,
1227    _variable: &str,
1228    projected_properties: &[String],
1229    output_schema: &SchemaRef,
1230    l0_ctx: &crate::query::df_graph::L0Context,
1231) -> DFResult<RecordBatch> {
1232    if batch.num_rows() == 0 {
1233        return Ok(RecordBatch::new_empty(output_schema.clone()));
1234    }
1235
1236    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1237
1238    // 1. {var}._vid
1239    let vid_col = batch
1240        .column_by_name("_vid")
1241        .ok_or_else(|| {
1242            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1243        })?
1244        .clone();
1245    let vid_arr = vid_col
1246        .as_any()
1247        .downcast_ref::<UInt64Array>()
1248        .ok_or_else(|| {
1249            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1250        })?;
1251
1252    // 2. {var}._labels — read from stored column, overlay L0 additions
1253    let batch_labels_col = batch
1254        .column_by_name("_labels")
1255        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1256    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1257    columns.push(vid_col.clone());
1258    columns.push(labels_col);
1259
1260    // 3. Projected properties
1261    // Pre-load overflow_json column for extracting non-schema properties
1262    let overflow_arr = batch
1263        .column_by_name("overflow_json")
1264        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1265
1266    for prop in projected_properties {
1267        if prop == "overflow_json" {
1268            match batch.column_by_name("overflow_json") {
1269                Some(col) => columns.push(col.clone()),
1270                None => {
1271                    // No overflow_json in Lance — return null column
1272                    columns.push(arrow_array::new_null_array(
1273                        &DataType::LargeBinary,
1274                        batch.num_rows(),
1275                    ));
1276                }
1277            }
1278        } else {
1279            match batch.column_by_name(prop) {
1280                Some(col) => columns.push(col.clone()),
1281                None => {
1282                    // Column missing in Lance -- extract from overflow_json
1283                    // CypherValue blob with L0 overlay
1284                    let col = build_overflow_property_column(
1285                        batch.num_rows(),
1286                        vid_arr,
1287                        overflow_arr,
1288                        prop,
1289                        l0_ctx,
1290                    );
1291                    columns.push(col);
1292                }
1293            }
1294        }
1295    }
1296
1297    RecordBatch::try_new(output_schema.clone(), columns)
1298        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1299}
1300
1301/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1302///
1303/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1304/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1305/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1306/// `_version` are dropped.
1307fn map_edge_to_output_schema(
1308    batch: &RecordBatch,
1309    variable: &str,
1310    projected_properties: &[String],
1311    output_schema: &SchemaRef,
1312) -> DFResult<RecordBatch> {
1313    if batch.num_rows() == 0 {
1314        return Ok(RecordBatch::new_empty(output_schema.clone()));
1315    }
1316
1317    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1318
1319    // 1. {var}._eid
1320    let eid_col = batch
1321        .column_by_name("eid")
1322        .ok_or_else(|| {
1323            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1324        })?
1325        .clone();
1326    columns.push(eid_col);
1327
1328    // 2. {var}._src_vid
1329    let src_col = batch
1330        .column_by_name("src_vid")
1331        .ok_or_else(|| {
1332            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1333        })?
1334        .clone();
1335    columns.push(src_col);
1336
1337    // 3. {var}._dst_vid
1338    let dst_col = batch
1339        .column_by_name("dst_vid")
1340        .ok_or_else(|| {
1341            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1342        })?
1343        .clone();
1344    columns.push(dst_col);
1345
1346    // 4. Projected properties
1347    for prop in projected_properties {
1348        if prop == "overflow_json" {
1349            match batch.column_by_name("overflow_json") {
1350                Some(col) => columns.push(col.clone()),
1351                None => {
1352                    columns.push(arrow_array::new_null_array(
1353                        &DataType::LargeBinary,
1354                        batch.num_rows(),
1355                    ));
1356                }
1357            }
1358        } else {
1359            match batch.column_by_name(prop) {
1360                Some(col) => columns.push(col.clone()),
1361                None => {
1362                    // Column missing in Lance — extract from overflow_json CypherValue blob
1363                    // (mirrors the vertex path in map_to_output_schema)
1364                    let overflow_arr = batch
1365                        .column_by_name("overflow_json")
1366                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1367
1368                    if let Some(arr) = overflow_arr {
1369                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1370                        for i in 0..batch.num_rows() {
1371                            if !arr.is_null(i) {
1372                                let blob = arr.value(i);
1373                                // Fast path: extract map entry without decoding entire map
1374                                if let Some(sub_bytes) =
1375                                    uni_common::cypher_value_codec::extract_map_entry_raw(
1376                                        blob, prop,
1377                                    )
1378                                {
1379                                    builder.append_value(&sub_bytes);
1380                                } else {
1381                                    builder.append_null();
1382                                }
1383                            } else {
1384                                builder.append_null();
1385                            }
1386                        }
1387                        columns.push(Arc::new(builder.finish()));
1388                    } else {
1389                        // No overflow_json column either — return null column
1390                        let target_field = output_schema
1391                            .fields()
1392                            .iter()
1393                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
1394                        let dt = target_field
1395                            .map(|f| f.data_type().clone())
1396                            .unwrap_or(DataType::LargeBinary);
1397                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1398                    }
1399                }
1400            }
1401        }
1402    }
1403
1404    RecordBatch::try_new(output_schema.clone(), columns)
1405        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1406}
1407
1408/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
1409///
1410/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
1411/// for known-label vertex scans. Reads all needed columns in a single Lance query,
1412/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
1413/// and maps to the output schema.
1414async fn columnar_scan_vertex_batch_static(
1415    graph_ctx: &GraphExecutionContext,
1416    label: &str,
1417    variable: &str,
1418    projected_properties: &[String],
1419    output_schema: &SchemaRef,
1420) -> DFResult<RecordBatch> {
1421    let storage = graph_ctx.storage();
1422    let l0_ctx = graph_ctx.l0_context();
1423    let uni_schema = storage.schema_manager().schema();
1424    let label_props = uni_schema.properties.get(label);
1425
1426    // Build the list of columns to request from Lance
1427    let mut lance_columns: Vec<String> = vec![
1428        "_vid".to_string(),
1429        "_deleted".to_string(),
1430        "_version".to_string(),
1431    ];
1432    for prop in projected_properties {
1433        if prop == "overflow_json" {
1434            push_column_if_absent(&mut lance_columns, "overflow_json");
1435        } else {
1436            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1437            if exists_in_schema {
1438                push_column_if_absent(&mut lance_columns, prop);
1439            }
1440        }
1441    }
1442
1443    // Ensure overflow_json is present when any projected property is not in the schema
1444    let needs_overflow = projected_properties
1445        .iter()
1446        .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1447    if needs_overflow {
1448        push_column_if_absent(&mut lance_columns, "overflow_json");
1449    }
1450
1451    // Try to query Lance
1452    let ds = storage
1453        .vertex_dataset(label)
1454        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1455    let lancedb_store = storage.lancedb_store();
1456
1457    let lance_batch = match ds.open_lancedb(lancedb_store).await {
1458        Ok(table) => {
1459            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1460
1461            // Check which columns actually exist in the Lance table
1462            let table_schema = table
1463                .schema()
1464                .await
1465                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1466            let table_field_names: HashSet<&str> = table_schema
1467                .fields()
1468                .iter()
1469                .map(|f| f.name().as_str())
1470                .collect();
1471
1472            let actual_columns: Vec<&str> = lance_columns
1473                .iter()
1474                .filter(|c| table_field_names.contains(c.as_str()))
1475                .map(|c| c.as_str())
1476                .collect();
1477
1478            let query = table.query().select(Select::columns(&actual_columns));
1479            // Do NOT filter _deleted here — MVCC dedup must see the
1480            // highest-version row first (which may be a deletion tombstone).
1481            // Deleted rows are filtered out after dedup.
1482            let query = match storage.version_high_water_mark() {
1483                Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1484                None => query,
1485            };
1486
1487            match query.execute().await {
1488                Ok(stream) => {
1489                    use futures::TryStreamExt;
1490                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1491                    if batches.is_empty() {
1492                        None
1493                    } else {
1494                        Some(
1495                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1496                                .map_err(|e| {
1497                                    datafusion::error::DataFusionError::ArrowError(
1498                                        Box::new(e),
1499                                        None,
1500                                    )
1501                                })?,
1502                        )
1503                    }
1504                }
1505                Err(_) => None,
1506            }
1507        }
1508        Err(_) => None, // Table doesn't exist yet — only L0 data
1509    };
1510
1511    // MVCC dedup the Lance batch
1512    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1513
1514    // Build the internal Lance schema for L0 batch construction.
1515    // Use the Lance batch schema if available, otherwise build from scratch.
1516    let internal_schema = match &lance_deduped {
1517        Some(batch) => batch.schema(),
1518        None => {
1519            let mut fields = vec![
1520                Field::new("_vid", DataType::UInt64, false),
1521                Field::new("_deleted", DataType::Boolean, false),
1522                Field::new("_version", DataType::UInt64, false),
1523            ];
1524            for col in &lance_columns {
1525                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1526                    continue;
1527                }
1528                if col == "overflow_json" {
1529                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1530                } else {
1531                    let arrow_type = label_props
1532                        .and_then(|lp| lp.get(col.as_str()))
1533                        .map(|meta| meta.r#type.to_arrow())
1534                        .unwrap_or(DataType::LargeBinary);
1535                    fields.push(Field::new(col, arrow_type, true));
1536                }
1537            }
1538            Arc::new(Schema::new(fields))
1539        }
1540    };
1541
1542    // Build L0 batch
1543    let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1544
1545    // Merge Lance + L0
1546    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1547    else {
1548        return Ok(RecordBatch::new_empty(output_schema.clone()));
1549    };
1550
1551    // Filter out MVCC deletion tombstones (_deleted = true)
1552    let merged = filter_deleted_rows(&merged)?;
1553    if merged.num_rows() == 0 {
1554        return Ok(RecordBatch::new_empty(output_schema.clone()));
1555    }
1556
1557    // Filter L0 tombstones
1558    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1559
1560    if filtered.num_rows() == 0 {
1561        return Ok(RecordBatch::new_empty(output_schema.clone()));
1562    }
1563
1564    // Map to output schema
1565    map_to_output_schema(
1566        &filtered,
1567        label,
1568        variable,
1569        projected_properties,
1570        output_schema,
1571        l0_ctx,
1572    )
1573}
1574
1575/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
1576///
1577/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
1578/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
1579/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
1580/// to the output schema.
1581async fn columnar_scan_edge_batch_static(
1582    graph_ctx: &GraphExecutionContext,
1583    edge_type: &str,
1584    variable: &str,
1585    projected_properties: &[String],
1586    output_schema: &SchemaRef,
1587) -> DFResult<RecordBatch> {
1588    let storage = graph_ctx.storage();
1589    let l0_ctx = graph_ctx.l0_context();
1590    let uni_schema = storage.schema_manager().schema();
1591    let type_props = uni_schema.properties.get(edge_type);
1592
1593    // Build the list of columns to request from DeltaDataset Lance table
1594    let mut lance_columns: Vec<String> = vec![
1595        "eid".to_string(),
1596        "src_vid".to_string(),
1597        "dst_vid".to_string(),
1598        "op".to_string(),
1599        "_version".to_string(),
1600    ];
1601    for prop in projected_properties {
1602        if prop == "overflow_json" {
1603            push_column_if_absent(&mut lance_columns, "overflow_json");
1604        } else {
1605            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1606            if exists_in_schema {
1607                push_column_if_absent(&mut lance_columns, prop);
1608            }
1609        }
1610    }
1611
1612    // Ensure overflow_json is present when any projected property is not in the schema
1613    let needs_overflow = projected_properties
1614        .iter()
1615        .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1616    if needs_overflow {
1617        push_column_if_absent(&mut lance_columns, "overflow_json");
1618    }
1619
1620    // Try to query DeltaDataset (forward direction)
1621    let ds = storage
1622        .delta_dataset(edge_type, "fwd")
1623        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1624    let lancedb_store = storage.lancedb_store();
1625
1626    let lance_batch = match ds.open_lancedb(lancedb_store).await {
1627        Ok(table) => {
1628            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1629
1630            // Check which columns actually exist in the Lance table
1631            let table_schema = table
1632                .schema()
1633                .await
1634                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1635            let table_field_names: HashSet<&str> = table_schema
1636                .fields()
1637                .iter()
1638                .map(|f| f.name().as_str())
1639                .collect();
1640
1641            let actual_columns: Vec<&str> = lance_columns
1642                .iter()
1643                .filter(|c| table_field_names.contains(c.as_str()))
1644                .map(|c| c.as_str())
1645                .collect();
1646
1647            // Do NOT filter op here — MVCC dedup must see DELETE ops
1648            // (op != 0) to pick the highest version. Deleted edges are
1649            // filtered out after dedup.
1650            let query = table.query().select(Select::columns(&actual_columns));
1651            let query = match storage.version_high_water_mark() {
1652                Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1653                None => query,
1654            };
1655
1656            match query.execute().await {
1657                Ok(stream) => {
1658                    use futures::TryStreamExt;
1659                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1660                    if batches.is_empty() {
1661                        None
1662                    } else {
1663                        Some(
1664                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1665                                .map_err(|e| {
1666                                    datafusion::error::DataFusionError::ArrowError(
1667                                        Box::new(e),
1668                                        None,
1669                                    )
1670                                })?,
1671                        )
1672                    }
1673                }
1674                Err(_) => None,
1675            }
1676        }
1677        Err(_) => None, // Table doesn't exist yet — only L0 data
1678    };
1679
1680    // MVCC dedup the Lance batch (by eid)
1681    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1682
1683    // Build the internal schema for L0 batch construction.
1684    // Use the Lance batch schema if available, otherwise build from scratch.
1685    let internal_schema = match &lance_deduped {
1686        Some(batch) => batch.schema(),
1687        None => {
1688            let mut fields = vec![
1689                Field::new("eid", DataType::UInt64, false),
1690                Field::new("src_vid", DataType::UInt64, false),
1691                Field::new("dst_vid", DataType::UInt64, false),
1692                Field::new("op", DataType::UInt8, false),
1693                Field::new("_version", DataType::UInt64, false),
1694            ];
1695            for col in &lance_columns {
1696                if matches!(
1697                    col.as_str(),
1698                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1699                ) {
1700                    continue;
1701                }
1702                if col == "overflow_json" {
1703                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1704                } else {
1705                    let arrow_type = type_props
1706                        .and_then(|tp| tp.get(col.as_str()))
1707                        .map(|meta| meta.r#type.to_arrow())
1708                        .unwrap_or(DataType::LargeBinary);
1709                    fields.push(Field::new(col, arrow_type, true));
1710                }
1711            }
1712            Arc::new(Schema::new(fields))
1713        }
1714    };
1715
1716    // Build L0 batch
1717    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1718
1719    // Merge Lance + L0
1720    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1721        return Ok(RecordBatch::new_empty(output_schema.clone()));
1722    };
1723
1724    // Filter out MVCC deletion ops (op != 0) after dedup
1725    let merged = filter_deleted_edge_ops(&merged)?;
1726    if merged.num_rows() == 0 {
1727        return Ok(RecordBatch::new_empty(output_schema.clone()));
1728    }
1729
1730    // Filter L0 edge tombstones
1731    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1732
1733    if filtered.num_rows() == 0 {
1734        return Ok(RecordBatch::new_empty(output_schema.clone()));
1735    }
1736
1737    // Map to output schema
1738    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1739}
1740
1741/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
1742///
1743/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
1744/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
1745/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
1746/// buffer data, filters tombstones, and maps to the output schema.
1747async fn columnar_scan_schemaless_vertex_batch_static(
1748    graph_ctx: &GraphExecutionContext,
1749    label: &str,
1750    variable: &str,
1751    projected_properties: &[String],
1752    output_schema: &SchemaRef,
1753) -> DFResult<RecordBatch> {
1754    use uni_store::storage::main_vertex::MainVertexDataset;
1755
1756    let storage = graph_ctx.storage();
1757    let l0_ctx = graph_ctx.l0_context();
1758    let lancedb_store = storage.lancedb_store();
1759
1760    // Build the Lance filter expression — do NOT filter _deleted here;
1761    // MVCC dedup must see deletion tombstones to pick the highest version.
1762    let filter = {
1763        let mut parts = Vec::new();
1764
1765        // Label filter
1766        if !label.is_empty() {
1767            if label.contains(':') {
1768                // Multi-label: each label must be present
1769                for lbl in label.split(':') {
1770                    parts.push(format!("array_contains(labels, '{}')", lbl));
1771                }
1772            } else {
1773                parts.push(format!("array_contains(labels, '{}')", label));
1774            }
1775        }
1776
1777        // MVCC version filter
1778        if let Some(hwm) = storage.version_high_water_mark() {
1779            parts.push(format!("_version <= {}", hwm));
1780        }
1781
1782        if parts.is_empty() {
1783            None
1784        } else {
1785            Some(parts.join(" AND "))
1786        }
1787    };
1788
1789    // Single Lance query: SELECT _vid, _deleted, labels, props_json, _version WHERE <filter>
1790    let lance_batch = match MainVertexDataset::open_table(lancedb_store).await {
1791        Ok(table) => {
1792            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1793
1794            let query = table.query().select(Select::columns(&[
1795                "_vid",
1796                "_deleted",
1797                "labels",
1798                "props_json",
1799                "_version",
1800            ]));
1801            let query = match filter {
1802                Some(f) => query.only_if(f),
1803                None => query,
1804            };
1805
1806            match query.execute().await {
1807                Ok(stream) => {
1808                    use futures::TryStreamExt;
1809                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1810                    if batches.is_empty() {
1811                        None
1812                    } else {
1813                        Some(
1814                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1815                                .map_err(|e| {
1816                                    datafusion::error::DataFusionError::ArrowError(
1817                                        Box::new(e),
1818                                        None,
1819                                    )
1820                                })?,
1821                        )
1822                    }
1823                }
1824                Err(_) => None,
1825            }
1826        }
1827        Err(_) => None, // Table doesn't exist yet — only L0 data
1828    };
1829
1830    // MVCC dedup the Lance batch
1831    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1832
1833    // Build the internal schema for L0 batch construction.
1834    // Use the Lance batch schema if available, otherwise build from scratch.
1835    let internal_schema = match &lance_deduped {
1836        Some(batch) => batch.schema(),
1837        None => Arc::new(Schema::new(vec![
1838            Field::new("_vid", DataType::UInt64, false),
1839            Field::new("_deleted", DataType::Boolean, false),
1840            Field::new("labels", labels_data_type(), false),
1841            Field::new("props_json", DataType::LargeBinary, true),
1842            Field::new("_version", DataType::UInt64, false),
1843        ])),
1844    };
1845
1846    // Build L0 batch
1847    let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1848
1849    // Merge Lance + L0
1850    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1851    else {
1852        return Ok(RecordBatch::new_empty(output_schema.clone()));
1853    };
1854
1855    // Filter out MVCC deletion tombstones (_deleted = true)
1856    let merged = filter_deleted_rows(&merged)?;
1857    if merged.num_rows() == 0 {
1858        return Ok(RecordBatch::new_empty(output_schema.clone()));
1859    }
1860
1861    // Filter L0 tombstones
1862    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1863
1864    if filtered.num_rows() == 0 {
1865        return Ok(RecordBatch::new_empty(output_schema.clone()));
1866    }
1867
1868    // Map to output schema
1869    map_to_schemaless_output_schema(
1870        &filtered,
1871        variable,
1872        projected_properties,
1873        output_schema,
1874        l0_ctx,
1875    )
1876}
1877
1878/// Build a RecordBatch from L0 buffer data for schemaless vertices.
1879///
1880/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1881/// with later buffers overwriting earlier ones for the same VID. Produces a batch
1882/// matching the internal schema: `_vid, labels, props_json, _version`.
1883fn build_l0_schemaless_vertex_batch(
1884    l0_ctx: &crate::query::df_graph::L0Context,
1885    label: &str,
1886    internal_schema: &SchemaRef,
1887) -> DFResult<RecordBatch> {
1888    // Collect all L0 vertex data, merging in visibility order
1889    // vid -> (merged_props, highest_version, labels)
1890    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1891    let mut tombstones: HashSet<u64> = HashSet::new();
1892
1893    // Parse multi-label filter
1894    let label_filter: Vec<&str> = if label.is_empty() {
1895        vec![]
1896    } else if label.contains(':') {
1897        label.split(':').collect()
1898    } else {
1899        vec![label]
1900    };
1901
1902    for l0 in l0_ctx.iter_l0_buffers() {
1903        let guard = l0.read();
1904
1905        // Collect tombstones
1906        for vid in guard.vertex_tombstones.iter() {
1907            tombstones.insert(vid.as_u64());
1908        }
1909
1910        // Collect VIDs matching the label filter
1911        let vids: Vec<Vid> = if label_filter.is_empty() {
1912            guard.all_vertex_vids()
1913        } else if label_filter.len() == 1 {
1914            guard.vids_for_label(label_filter[0])
1915        } else {
1916            guard.vids_with_all_labels(&label_filter)
1917        };
1918
1919        for vid in vids {
1920            let vid_u64 = vid.as_u64();
1921            if tombstones.contains(&vid_u64) {
1922                continue;
1923            }
1924            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1925            let entry = vid_data
1926                .entry(vid_u64)
1927                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1928
1929            // Merge properties (later L0 overwrites)
1930            if let Some(props) = guard.vertex_properties.get(&vid) {
1931                for (k, v) in props {
1932                    entry.0.insert(k.clone(), v.clone());
1933                }
1934            }
1935            // Take the highest version
1936            if version > entry.1 {
1937                entry.1 = version;
1938            }
1939            // Update labels from latest L0 layer
1940            if let Some(labels) = guard.vertex_labels.get(&vid) {
1941                entry.2 = labels.clone();
1942            }
1943        }
1944    }
1945
1946    // Remove tombstoned VIDs
1947    for t in &tombstones {
1948        vid_data.remove(t);
1949    }
1950
1951    if vid_data.is_empty() {
1952        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1953    }
1954
1955    // Sort VIDs for deterministic output
1956    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1957    vids.sort_unstable();
1958
1959    let num_rows = vids.len();
1960    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1961
1962    for field in internal_schema.fields() {
1963        match field.name().as_str() {
1964            "_vid" => {
1965                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1966            }
1967            "labels" => {
1968                let mut labels_builder = ListBuilder::new(StringBuilder::new());
1969                for vid_u64 in &vids {
1970                    let (_, _, labels) = &vid_data[vid_u64];
1971                    let values = labels_builder.values();
1972                    for lbl in labels {
1973                        values.append_value(lbl);
1974                    }
1975                    labels_builder.append(true);
1976                }
1977                columns.push(Arc::new(labels_builder.finish()));
1978            }
1979            "props_json" => {
1980                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1981                for vid_u64 in &vids {
1982                    let (props, _, _) = &vid_data[vid_u64];
1983                    if props.is_empty() {
1984                        builder.append_null();
1985                    } else {
1986                        // Encode properties as CypherValue blob
1987                        let json_obj: serde_json::Value = {
1988                            let mut map = serde_json::Map::new();
1989                            for (k, v) in props {
1990                                let json_val: serde_json::Value = v.clone().into();
1991                                map.insert(k.clone(), json_val);
1992                            }
1993                            serde_json::Value::Object(map)
1994                        };
1995                        match encode_cypher_value(&json_obj) {
1996                            Ok(bytes) => builder.append_value(bytes),
1997                            Err(_) => builder.append_null(),
1998                        }
1999                    }
2000                }
2001                columns.push(Arc::new(builder.finish()));
2002            }
2003            "_deleted" => {
2004                // L0 vertices are always live (tombstoned ones already excluded)
2005                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2006                    false;
2007                    num_rows
2008                ])));
2009            }
2010            "_version" => {
2011                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2012                columns.push(Arc::new(UInt64Array::from(vals)));
2013            }
2014            _ => {
2015                // Unexpected column — fill with nulls
2016                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2017            }
2018        }
2019    }
2020
2021    RecordBatch::try_new(internal_schema.clone(), columns)
2022        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2023}
2024
2025/// Map an internal-schema schemaless batch to the DataFusion output schema.
2026///
2027/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2028/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2029/// Individual properties are extracted from the `props_json` CypherValue blob by
2030/// decoding to a Map and extracting the sub-value.
2031fn map_to_schemaless_output_schema(
2032    batch: &RecordBatch,
2033    _variable: &str,
2034    projected_properties: &[String],
2035    output_schema: &SchemaRef,
2036    l0_ctx: &crate::query::df_graph::L0Context,
2037) -> DFResult<RecordBatch> {
2038    if batch.num_rows() == 0 {
2039        return Ok(RecordBatch::new_empty(output_schema.clone()));
2040    }
2041
2042    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2043
2044    // 1. {var}._vid — passthrough
2045    let vid_col = batch
2046        .column_by_name("_vid")
2047        .ok_or_else(|| {
2048            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2049        })?
2050        .clone();
2051    let vid_arr = vid_col
2052        .as_any()
2053        .downcast_ref::<UInt64Array>()
2054        .ok_or_else(|| {
2055            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2056        })?;
2057    columns.push(vid_col.clone());
2058
2059    // 2. {var}._labels — from labels column with L0 overlay
2060    let labels_col = batch.column_by_name("labels");
2061    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2062
2063    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2064    for i in 0..vid_arr.len() {
2065        let vid_u64 = vid_arr.value(i);
2066        let vid = Vid::from(vid_u64);
2067
2068        // Start with labels from the batch
2069        let mut row_labels: Vec<String> = Vec::new();
2070        if let Some(arr) = labels_arr
2071            && !arr.is_null(i)
2072        {
2073            let list_val = arr.value(i);
2074            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2075                for j in 0..str_arr.len() {
2076                    if !str_arr.is_null(j) {
2077                        row_labels.push(str_arr.value(j).to_string());
2078                    }
2079                }
2080            }
2081        }
2082
2083        // Overlay L0 labels
2084        for l0 in l0_ctx.iter_l0_buffers() {
2085            let guard = l0.read();
2086            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2087                for lbl in l0_labels {
2088                    if !row_labels.contains(lbl) {
2089                        row_labels.push(lbl.clone());
2090                    }
2091                }
2092            }
2093        }
2094
2095        let values = labels_builder.values();
2096        for lbl in &row_labels {
2097            values.append_value(lbl);
2098        }
2099        labels_builder.append(true);
2100    }
2101    columns.push(Arc::new(labels_builder.finish()));
2102
2103    // 3. Projected properties — extract from props_json
2104    let props_col = batch.column_by_name("props_json");
2105    let props_arr =
2106        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2107
2108    for prop in projected_properties {
2109        if prop == "_all_props" {
2110            // Optimization: _all_props IS props_json — passthrough directly
2111            match props_col {
2112                Some(col) => columns.push(col.clone()),
2113                None => {
2114                    columns.push(arrow_array::new_null_array(
2115                        &DataType::LargeBinary,
2116                        batch.num_rows(),
2117                    ));
2118                }
2119            }
2120        } else {
2121            // Extract individual property from CypherValue blob with L0 overlay
2122            let col =
2123                build_overflow_property_column(batch.num_rows(), vid_arr, props_arr, prop, l0_ctx);
2124            columns.push(col);
2125        }
2126    }
2127
2128    RecordBatch::try_new(output_schema.clone(), columns)
2129        .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2130}
2131
2132/// Get the property value for a VID, returning None if not found.
2133pub(crate) fn get_property_value(
2134    vid: &Vid,
2135    props_map: &HashMap<Vid, Properties>,
2136    prop_name: &str,
2137) -> Option<Value> {
2138    if prop_name == "_all_props" {
2139        return props_map.get(vid).map(|p| {
2140            let map: HashMap<String, Value> =
2141                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2142            Value::Map(map)
2143        });
2144    }
2145    props_map
2146        .get(vid)
2147        .and_then(|props| props.get(prop_name))
2148        .cloned()
2149}
2150
2151/// Encode a `serde_json::Value` as CypherValue binary (MessagePack-tagged).
2152///
2153/// Converts from serde_json::Value -> uni_common::Value -> CypherValue bytes.
2154pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2155    let uni_val: uni_common::Value = val.clone().into();
2156    Ok(uni_common::cypher_value_codec::encode(&uni_val))
2157}
2158
2159/// Build a numeric column from property values using the specified builder and extractor.
2160macro_rules! build_numeric_column {
2161    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2162        let mut builder = <$builder_ty>::new();
2163        for vid in $vids {
2164            match get_property_value(vid, $props_map, $prop_name) {
2165                Some(ref v) => {
2166                    if let Some(val) = $extractor(v) {
2167                        builder.append_value($cast(val));
2168                    } else {
2169                        builder.append_null();
2170                    }
2171                }
2172                None => builder.append_null(),
2173            }
2174        }
2175        Ok(Arc::new(builder.finish()) as ArrayRef)
2176    }};
2177}
2178
2179/// Build an Arrow column from property values (static version).
2180pub(crate) fn build_property_column_static(
2181    vids: &[Vid],
2182    props_map: &HashMap<Vid, Properties>,
2183    prop_name: &str,
2184    data_type: &DataType,
2185) -> DFResult<ArrayRef> {
2186    match data_type {
2187        DataType::LargeBinary => {
2188            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2189            use arrow_array::builder::LargeBinaryBuilder;
2190            let mut builder = LargeBinaryBuilder::new();
2191
2192            for vid in vids {
2193                match get_property_value(vid, props_map, prop_name) {
2194                    Some(Value::Null) | None => builder.append_null(),
2195                    Some(Value::Bytes(bytes)) => {
2196                        builder.append_value(&bytes);
2197                    }
2198                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2199                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2200                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2201                        let bytes: Vec<u8> = arr
2202                            .iter()
2203                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2204                            .collect();
2205                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2206                            builder.append_value(&bytes);
2207                        } else {
2208                            let json_val: serde_json::Value = Value::List(arr).into();
2209                            match encode_cypher_value(&json_val) {
2210                                Ok(encoded) => builder.append_value(encoded),
2211                                Err(_) => builder.append_null(),
2212                            }
2213                        }
2214                    }
2215                    Some(val) => {
2216                        // Value from PropertyManager — convert to serde_json and re-encode to CypherValue binary
2217                        let json_val: serde_json::Value = val.into();
2218                        match encode_cypher_value(&json_val) {
2219                            Ok(bytes) => builder.append_value(bytes),
2220                            Err(_) => builder.append_null(),
2221                        }
2222                    }
2223                }
2224            }
2225            Ok(Arc::new(builder.finish()))
2226        }
2227        DataType::Binary => {
2228            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2229            let mut builder = BinaryBuilder::new();
2230            for vid in vids {
2231                let bytes = get_property_value(vid, props_map, prop_name)
2232                    .filter(|v| !v.is_null())
2233                    .and_then(|v| {
2234                        let json_val: serde_json::Value = v.into();
2235                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2236                    })
2237                    .and_then(|crdt| crdt.to_msgpack().ok());
2238                match bytes {
2239                    Some(b) => builder.append_value(&b),
2240                    None => builder.append_null(),
2241                }
2242            }
2243            Ok(Arc::new(builder.finish()))
2244        }
2245        DataType::Utf8 => {
2246            let mut builder = StringBuilder::new();
2247            for vid in vids {
2248                match get_property_value(vid, props_map, prop_name) {
2249                    Some(Value::String(s)) => builder.append_value(s),
2250                    Some(Value::Null) | None => builder.append_null(),
2251                    Some(other) => builder.append_value(other.to_string()),
2252                }
2253            }
2254            Ok(Arc::new(builder.finish()))
2255        }
2256        DataType::Int64 => {
2257            build_numeric_column!(
2258                vids,
2259                props_map,
2260                prop_name,
2261                Int64Builder,
2262                |v: &Value| v.as_i64(),
2263                |v| v
2264            )
2265        }
2266        DataType::Int32 => {
2267            build_numeric_column!(
2268                vids,
2269                props_map,
2270                prop_name,
2271                Int32Builder,
2272                |v: &Value| v.as_i64(),
2273                |v: i64| v as i32
2274            )
2275        }
2276        DataType::Float64 => {
2277            build_numeric_column!(
2278                vids,
2279                props_map,
2280                prop_name,
2281                Float64Builder,
2282                |v: &Value| v.as_f64(),
2283                |v| v
2284            )
2285        }
2286        DataType::Float32 => {
2287            build_numeric_column!(
2288                vids,
2289                props_map,
2290                prop_name,
2291                Float32Builder,
2292                |v: &Value| v.as_f64(),
2293                |v: f64| v as f32
2294            )
2295        }
2296        DataType::Boolean => {
2297            let mut builder = BooleanBuilder::new();
2298            for vid in vids {
2299                match get_property_value(vid, props_map, prop_name) {
2300                    Some(Value::Bool(b)) => builder.append_value(b),
2301                    _ => builder.append_null(),
2302                }
2303            }
2304            Ok(Arc::new(builder.finish()))
2305        }
2306        DataType::UInt64 => {
2307            build_numeric_column!(
2308                vids,
2309                props_map,
2310                prop_name,
2311                UInt64Builder,
2312                |v: &Value| v.as_u64(),
2313                |v| v
2314            )
2315        }
2316        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2317            // Vector properties: FixedSizeList(Float32, N)
2318            let values_builder = Float32Builder::new();
2319            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2320            for vid in vids {
2321                match get_property_value(vid, props_map, prop_name) {
2322                    Some(Value::Vector(v)) => {
2323                        for val in v {
2324                            list_builder.values().append_value(val);
2325                        }
2326                        list_builder.append(true);
2327                    }
2328                    Some(Value::List(arr)) => {
2329                        for v in arr {
2330                            list_builder
2331                                .values()
2332                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
2333                        }
2334                        list_builder.append(true);
2335                    }
2336                    _ => {
2337                        // Append dim nulls to inner values, then mark row as null
2338                        for _ in 0..*dim {
2339                            list_builder.values().append_null();
2340                        }
2341                        list_builder.append(false);
2342                    }
2343                }
2344            }
2345            Ok(Arc::new(list_builder.finish()))
2346        }
2347        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2348            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
2349            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2350            for vid in vids {
2351                match get_property_value(vid, props_map, prop_name) {
2352                    Some(Value::Temporal(tv)) => match tv {
2353                        uni_common::TemporalValue::DateTime {
2354                            nanos_since_epoch, ..
2355                        }
2356                        | uni_common::TemporalValue::LocalDateTime {
2357                            nanos_since_epoch, ..
2358                        } => {
2359                            builder.append_value(nanos_since_epoch);
2360                        }
2361                        uni_common::TemporalValue::Date { days_since_epoch } => {
2362                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2363                        }
2364                        _ => builder.append_null(),
2365                    },
2366                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
2367                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2368                        Err(_) => builder.append_null(),
2369                    },
2370                    Some(Value::Int(n)) => {
2371                        builder.append_value(n);
2372                    }
2373                    _ => builder.append_null(),
2374                }
2375            }
2376            Ok(Arc::new(builder.finish()))
2377        }
2378        DataType::Date32 => {
2379            let mut builder = Date32Builder::new();
2380            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2381            for vid in vids {
2382                match get_property_value(vid, props_map, prop_name) {
2383                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2384                        builder.append_value(days_since_epoch);
2385                    }
2386                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2387                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2388                        Err(_) => builder.append_null(),
2389                    },
2390                    Some(Value::Int(n)) => {
2391                        builder.append_value(n as i32);
2392                    }
2393                    _ => builder.append_null(),
2394                }
2395            }
2396            Ok(Arc::new(builder.finish()))
2397        }
2398        DataType::Time64(TimeUnit::Nanosecond) => {
2399            let mut builder = Time64NanosecondBuilder::new();
2400            for vid in vids {
2401                match get_property_value(vid, props_map, prop_name) {
2402                    Some(Value::Temporal(
2403                        uni_common::TemporalValue::LocalTime {
2404                            nanos_since_midnight,
2405                        }
2406                        | uni_common::TemporalValue::Time {
2407                            nanos_since_midnight,
2408                            ..
2409                        },
2410                    )) => {
2411                        builder.append_value(nanos_since_midnight);
2412                    }
2413                    Some(Value::Temporal(_)) => builder.append_null(),
2414                    Some(Value::String(s)) => {
2415                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2416                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2417                        {
2418                            Ok(t) => {
2419                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2420                                    + t.nanosecond() as i64;
2421                                builder.append_value(nanos);
2422                            }
2423                            Err(_) => builder.append_null(),
2424                        }
2425                    }
2426                    Some(Value::Int(n)) => {
2427                        builder.append_value(n);
2428                    }
2429                    _ => builder.append_null(),
2430                }
2431            }
2432            Ok(Arc::new(builder.finish()))
2433        }
2434        DataType::Interval(IntervalUnit::MonthDayNano) => {
2435            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2436                Vec::with_capacity(vids.len());
2437            for vid in vids {
2438                match get_property_value(vid, props_map, prop_name) {
2439                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
2440                        months,
2441                        days,
2442                        nanos,
2443                    })) => {
2444                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2445                            months: months as i32,
2446                            days: days as i32,
2447                            nanoseconds: nanos,
2448                        }));
2449                    }
2450                    Some(Value::Int(_n)) => {
2451                        values.push(None);
2452                    }
2453                    _ => values.push(None),
2454                }
2455            }
2456            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2457            Ok(Arc::new(arr))
2458        }
2459        DataType::List(inner_field) => {
2460            build_list_property_column(vids, props_map, prop_name, inner_field)
2461        }
2462        DataType::Struct(fields) => {
2463            build_struct_property_column(vids, props_map, prop_name, fields)
2464        }
2465        // Default: convert to string
2466        _ => {
2467            let mut builder = StringBuilder::new();
2468            for vid in vids {
2469                match get_property_value(vid, props_map, prop_name) {
2470                    Some(Value::Null) | None => builder.append_null(),
2471                    Some(other) => builder.append_value(other.to_string()),
2472                }
2473            }
2474            Ok(Arc::new(builder.finish()))
2475        }
2476    }
2477}
2478
2479/// Build a List-typed Arrow column from list property values.
2480fn build_list_property_column(
2481    vids: &[Vid],
2482    props_map: &HashMap<Vid, Properties>,
2483    prop_name: &str,
2484    inner_field: &Arc<Field>,
2485) -> DFResult<ArrayRef> {
2486    match inner_field.data_type() {
2487        DataType::Utf8 => {
2488            let mut builder = ListBuilder::new(StringBuilder::new());
2489            for vid in vids {
2490                match get_property_value(vid, props_map, prop_name) {
2491                    Some(Value::List(arr)) => {
2492                        for v in arr {
2493                            match v {
2494                                Value::String(s) => builder.values().append_value(s),
2495                                Value::Null => builder.values().append_null(),
2496                                other => builder.values().append_value(format!("{other:?}")),
2497                            }
2498                        }
2499                        builder.append(true);
2500                    }
2501                    _ => builder.append(false),
2502                }
2503            }
2504            Ok(Arc::new(builder.finish()))
2505        }
2506        DataType::Int64 => {
2507            let mut builder = ListBuilder::new(Int64Builder::new());
2508            for vid in vids {
2509                match get_property_value(vid, props_map, prop_name) {
2510                    Some(Value::List(arr)) => {
2511                        for v in arr {
2512                            match v.as_i64() {
2513                                Some(n) => builder.values().append_value(n),
2514                                None => builder.values().append_null(),
2515                            }
2516                        }
2517                        builder.append(true);
2518                    }
2519                    _ => builder.append(false),
2520                }
2521            }
2522            Ok(Arc::new(builder.finish()))
2523        }
2524        DataType::Float64 => {
2525            let mut builder = ListBuilder::new(Float64Builder::new());
2526            for vid in vids {
2527                match get_property_value(vid, props_map, prop_name) {
2528                    Some(Value::List(arr)) => {
2529                        for v in arr {
2530                            match v.as_f64() {
2531                                Some(n) => builder.values().append_value(n),
2532                                None => builder.values().append_null(),
2533                            }
2534                        }
2535                        builder.append(true);
2536                    }
2537                    _ => builder.append(false),
2538                }
2539            }
2540            Ok(Arc::new(builder.finish()))
2541        }
2542        DataType::Boolean => {
2543            let mut builder = ListBuilder::new(BooleanBuilder::new());
2544            for vid in vids {
2545                match get_property_value(vid, props_map, prop_name) {
2546                    Some(Value::List(arr)) => {
2547                        for v in arr {
2548                            match v.as_bool() {
2549                                Some(b) => builder.values().append_value(b),
2550                                None => builder.values().append_null(),
2551                            }
2552                        }
2553                        builder.append(true);
2554                    }
2555                    _ => builder.append(false),
2556                }
2557            }
2558            Ok(Arc::new(builder.finish()))
2559        }
2560        DataType::Struct(fields) => {
2561            // Map types are List(Struct(key, value)) — build struct inner elements
2562            build_list_of_structs_column(vids, props_map, prop_name, fields)
2563        }
2564        // Fallback: serialize inner elements as strings
2565        _ => {
2566            let mut builder = ListBuilder::new(StringBuilder::new());
2567            for vid in vids {
2568                match get_property_value(vid, props_map, prop_name) {
2569                    Some(Value::List(arr)) => {
2570                        for v in arr {
2571                            match v {
2572                                Value::Null => builder.values().append_null(),
2573                                other => builder.values().append_value(format!("{other:?}")),
2574                            }
2575                        }
2576                        builder.append(true);
2577                    }
2578                    _ => builder.append(false),
2579                }
2580            }
2581            Ok(Arc::new(builder.finish()))
2582        }
2583    }
2584}
2585
2586/// Build a List(Struct(...)) column, used for Map-type properties.
2587///
2588/// Handles two value representations:
2589/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
2590/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
2591fn build_list_of_structs_column(
2592    vids: &[Vid],
2593    props_map: &HashMap<Vid, Properties>,
2594    prop_name: &str,
2595    fields: &Fields,
2596) -> DFResult<ArrayRef> {
2597    use arrow_array::StructArray;
2598
2599    let values: Vec<Option<Value>> = vids
2600        .iter()
2601        .map(|vid| get_property_value(vid, props_map, prop_name))
2602        .collect();
2603
2604    // Convert each row's value to an owned Vec of Maps (key-value pairs).
2605    // This normalizes both List-of-maps and Map representations.
2606    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2607        .iter()
2608        .map(|val| match val {
2609            Some(Value::List(arr)) => {
2610                let objs: Vec<HashMap<String, Value>> = arr
2611                    .iter()
2612                    .filter_map(|v| {
2613                        if let Value::Map(m) = v {
2614                            Some(m.clone())
2615                        } else {
2616                            None
2617                        }
2618                    })
2619                    .collect();
2620                if objs.is_empty() { None } else { Some(objs) }
2621            }
2622            Some(Value::Map(obj)) => {
2623                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
2624                let kv_pairs: Vec<HashMap<String, Value>> = obj
2625                    .iter()
2626                    .map(|(k, v)| {
2627                        let mut m = HashMap::new();
2628                        m.insert("key".to_string(), Value::String(k.clone()));
2629                        m.insert("value".to_string(), v.clone());
2630                        m
2631                    })
2632                    .collect();
2633                Some(kv_pairs)
2634            }
2635            _ => None,
2636        })
2637        .collect();
2638
2639    let total_items: usize = rows
2640        .iter()
2641        .filter_map(|r| r.as_ref())
2642        .map(|v| v.len())
2643        .sum();
2644
2645    // Build child arrays for each field in the struct
2646    let child_arrays: Vec<ArrayRef> = fields
2647        .iter()
2648        .map(|field| {
2649            let field_name = field.name();
2650            match field.data_type() {
2651                DataType::Utf8 => {
2652                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2653                    for obj in rows.iter().flatten().flatten() {
2654                        match obj.get(field_name) {
2655                            Some(Value::String(s)) => builder.append_value(s),
2656                            Some(Value::Null) | None => builder.append_null(),
2657                            Some(other) => builder.append_value(format!("{other:?}")),
2658                        }
2659                    }
2660                    Arc::new(builder.finish()) as ArrayRef
2661                }
2662                DataType::Int64 => {
2663                    let mut builder = Int64Builder::with_capacity(total_items);
2664                    for obj in rows.iter().flatten().flatten() {
2665                        match obj.get(field_name).and_then(|v| v.as_i64()) {
2666                            Some(n) => builder.append_value(n),
2667                            None => builder.append_null(),
2668                        }
2669                    }
2670                    Arc::new(builder.finish()) as ArrayRef
2671                }
2672                DataType::Float64 => {
2673                    let mut builder = Float64Builder::with_capacity(total_items);
2674                    for obj in rows.iter().flatten().flatten() {
2675                        match obj.get(field_name).and_then(|v| v.as_f64()) {
2676                            Some(n) => builder.append_value(n),
2677                            None => builder.append_null(),
2678                        }
2679                    }
2680                    Arc::new(builder.finish()) as ArrayRef
2681                }
2682                // Fallback: serialize as string
2683                _ => {
2684                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2685                    for obj in rows.iter().flatten().flatten() {
2686                        match obj.get(field_name) {
2687                            Some(Value::Null) | None => builder.append_null(),
2688                            Some(other) => builder.append_value(format!("{other:?}")),
2689                        }
2690                    }
2691                    Arc::new(builder.finish()) as ArrayRef
2692                }
2693            }
2694        })
2695        .collect();
2696
2697    // Build struct array from children
2698    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2699        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2700
2701    // Build list offsets
2702    let mut offsets = Vec::with_capacity(vids.len() + 1);
2703    let mut nulls = Vec::with_capacity(vids.len());
2704    let mut offset = 0i32;
2705    offsets.push(offset);
2706    for row in &rows {
2707        match row {
2708            Some(objs) => {
2709                offset += objs.len() as i32;
2710                offsets.push(offset);
2711                nulls.push(true);
2712            }
2713            None => {
2714                offsets.push(offset);
2715                nulls.push(false);
2716            }
2717        }
2718    }
2719
2720    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2721    let list_array = arrow_array::ListArray::try_new(
2722        list_field,
2723        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2724        Arc::new(struct_array),
2725        Some(arrow::buffer::NullBuffer::from(nulls)),
2726    )
2727    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2728
2729    Ok(Arc::new(list_array))
2730}
2731
2732/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
2733/// so that `build_struct_property_column` can extract fields uniformly.
2734fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2735    use uni_common::value::TemporalValue;
2736    let mut m = HashMap::new();
2737    match tv {
2738        TemporalValue::DateTime {
2739            nanos_since_epoch,
2740            offset_seconds,
2741            timezone_name,
2742        } => {
2743            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2744            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2745            if let Some(tz) = timezone_name {
2746                m.insert("timezone_name".into(), Value::String(tz.clone()));
2747            }
2748        }
2749        TemporalValue::LocalDateTime { nanos_since_epoch } => {
2750            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2751        }
2752        TemporalValue::Time {
2753            nanos_since_midnight,
2754            offset_seconds,
2755        } => {
2756            m.insert(
2757                "nanos_since_midnight".into(),
2758                Value::Int(*nanos_since_midnight),
2759            );
2760            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2761        }
2762        TemporalValue::LocalTime {
2763            nanos_since_midnight,
2764        } => {
2765            m.insert(
2766                "nanos_since_midnight".into(),
2767                Value::Int(*nanos_since_midnight),
2768            );
2769        }
2770        TemporalValue::Date { days_since_epoch } => {
2771            m.insert(
2772                "days_since_epoch".into(),
2773                Value::Int(*days_since_epoch as i64),
2774            );
2775        }
2776        TemporalValue::Duration {
2777            months,
2778            days,
2779            nanos,
2780        } => {
2781            m.insert("months".into(), Value::Int(*months));
2782            m.insert("days".into(), Value::Int(*days));
2783            m.insert("nanos".into(), Value::Int(*nanos));
2784        }
2785    }
2786    m
2787}
2788
2789/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
2790fn build_struct_property_column(
2791    vids: &[Vid],
2792    props_map: &HashMap<Vid, Properties>,
2793    prop_name: &str,
2794    fields: &Fields,
2795) -> DFResult<ArrayRef> {
2796    use arrow_array::StructArray;
2797
2798    // Convert raw values, expanding Temporal values into Map representation
2799    // so the struct field extraction below works uniformly.
2800    let values: Vec<Option<Value>> = vids
2801        .iter()
2802        .map(|vid| {
2803            let val = get_property_value(vid, props_map, prop_name);
2804            match val {
2805                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2806                other => other,
2807            }
2808        })
2809        .collect();
2810
2811    let child_arrays: Vec<ArrayRef> = fields
2812        .iter()
2813        .map(|field| {
2814            let field_name = field.name();
2815            match field.data_type() {
2816                DataType::Float64 => {
2817                    let mut builder = Float64Builder::with_capacity(vids.len());
2818                    for val in &values {
2819                        match val {
2820                            Some(Value::Map(obj)) => {
2821                                match obj.get(field_name).and_then(|v| v.as_f64()) {
2822                                    Some(n) => builder.append_value(n),
2823                                    None => builder.append_null(),
2824                                }
2825                            }
2826                            _ => builder.append_null(),
2827                        }
2828                    }
2829                    Arc::new(builder.finish()) as ArrayRef
2830                }
2831                DataType::Utf8 => {
2832                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2833                    for val in &values {
2834                        match val {
2835                            Some(Value::Map(obj)) => match obj.get(field_name) {
2836                                Some(Value::String(s)) => builder.append_value(s),
2837                                Some(Value::Null) | None => builder.append_null(),
2838                                Some(other) => builder.append_value(format!("{other:?}")),
2839                            },
2840                            _ => builder.append_null(),
2841                        }
2842                    }
2843                    Arc::new(builder.finish()) as ArrayRef
2844                }
2845                DataType::Int64 => {
2846                    let mut builder = Int64Builder::with_capacity(vids.len());
2847                    for val in &values {
2848                        match val {
2849                            Some(Value::Map(obj)) => {
2850                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2851                                    Some(n) => builder.append_value(n),
2852                                    None => builder.append_null(),
2853                                }
2854                            }
2855                            _ => builder.append_null(),
2856                        }
2857                    }
2858                    Arc::new(builder.finish()) as ArrayRef
2859                }
2860                DataType::Timestamp(_, _) => {
2861                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
2862                    for val in &values {
2863                        match val {
2864                            Some(Value::Map(obj)) => {
2865                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2866                                    Some(n) => builder.append_value(n),
2867                                    None => builder.append_null(),
2868                                }
2869                            }
2870                            _ => builder.append_null(),
2871                        }
2872                    }
2873                    Arc::new(builder.finish()) as ArrayRef
2874                }
2875                DataType::Int32 => {
2876                    let mut builder = Int32Builder::with_capacity(vids.len());
2877                    for val in &values {
2878                        match val {
2879                            Some(Value::Map(obj)) => {
2880                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2881                                    Some(n) => builder.append_value(n as i32),
2882                                    None => builder.append_null(),
2883                                }
2884                            }
2885                            _ => builder.append_null(),
2886                        }
2887                    }
2888                    Arc::new(builder.finish()) as ArrayRef
2889                }
2890                DataType::Time64(_) => {
2891                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
2892                    for val in &values {
2893                        match val {
2894                            Some(Value::Map(obj)) => {
2895                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2896                                    Some(n) => builder.append_value(n),
2897                                    None => builder.append_null(),
2898                                }
2899                            }
2900                            _ => builder.append_null(),
2901                        }
2902                    }
2903                    Arc::new(builder.finish()) as ArrayRef
2904                }
2905                // Fallback: serialize as string
2906                _ => {
2907                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2908                    for val in &values {
2909                        match val {
2910                            Some(Value::Map(obj)) => match obj.get(field_name) {
2911                                Some(Value::Null) | None => builder.append_null(),
2912                                Some(other) => builder.append_value(format!("{other:?}")),
2913                            },
2914                            _ => builder.append_null(),
2915                        }
2916                    }
2917                    Arc::new(builder.finish()) as ArrayRef
2918                }
2919            }
2920        })
2921        .collect();
2922
2923    // Build null bitmap — null when the value is null/missing
2924    let nulls: Vec<bool> = values
2925        .iter()
2926        .map(|v| matches!(v, Some(Value::Map(_))))
2927        .collect();
2928
2929    let struct_array = StructArray::try_new(
2930        fields.clone(),
2931        child_arrays,
2932        Some(arrow::buffer::NullBuffer::from(nulls)),
2933    )
2934    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2935
2936    Ok(Arc::new(struct_array))
2937}
2938
2939impl Stream for GraphScanStream {
2940    type Item = DFResult<RecordBatch>;
2941
2942    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2943        loop {
2944            // Use a temporary to avoid borrow issues
2945            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
2946
2947            match state {
2948                GraphScanState::Init => {
2949                    // Create the future with cloned data for ownership
2950                    let graph_ctx = self.graph_ctx.clone();
2951                    let label = self.label.clone();
2952                    let variable = self.variable.clone();
2953                    let properties = self.properties.clone();
2954                    let is_edge_scan = self.is_edge_scan;
2955                    let is_schemaless = self.is_schemaless;
2956                    let schema = self.schema.clone();
2957
2958                    let fut = async move {
2959                        graph_ctx.check_timeout().map_err(|e| {
2960                            datafusion::error::DataFusionError::Execution(e.to_string())
2961                        })?;
2962
2963                        let batch = if is_edge_scan {
2964                            columnar_scan_edge_batch_static(
2965                                &graph_ctx,
2966                                &label,
2967                                &variable,
2968                                &properties,
2969                                &schema,
2970                            )
2971                            .await?
2972                        } else if is_schemaless {
2973                            columnar_scan_schemaless_vertex_batch_static(
2974                                &graph_ctx,
2975                                &label,
2976                                &variable,
2977                                &properties,
2978                                &schema,
2979                            )
2980                            .await?
2981                        } else {
2982                            columnar_scan_vertex_batch_static(
2983                                &graph_ctx,
2984                                &label,
2985                                &variable,
2986                                &properties,
2987                                &schema,
2988                            )
2989                            .await?
2990                        };
2991                        Ok(Some(batch))
2992                    };
2993
2994                    self.state = GraphScanState::Executing(Box::pin(fut));
2995                    // Continue loop to poll the future
2996                }
2997                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
2998                    Poll::Ready(Ok(batch)) => {
2999                        self.state = GraphScanState::Done;
3000                        self.metrics
3001                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3002                        return Poll::Ready(batch.map(Ok));
3003                    }
3004                    Poll::Ready(Err(e)) => {
3005                        self.state = GraphScanState::Done;
3006                        return Poll::Ready(Some(Err(e)));
3007                    }
3008                    Poll::Pending => {
3009                        self.state = GraphScanState::Executing(fut);
3010                        return Poll::Pending;
3011                    }
3012                },
3013                GraphScanState::Done => {
3014                    return Poll::Ready(None);
3015                }
3016            }
3017        }
3018    }
3019}
3020
3021impl RecordBatchStream for GraphScanStream {
3022    fn schema(&self) -> SchemaRef {
3023        self.schema.clone()
3024    }
3025}
3026
3027#[cfg(test)]
3028mod tests {
3029    use super::*;
3030
3031    #[test]
3032    fn test_build_vertex_schema() {
3033        let uni_schema = UniSchema::default();
3034        let schema = GraphScanExec::build_vertex_schema(
3035            "n",
3036            "Person",
3037            &["name".to_string(), "age".to_string()],
3038            &uni_schema,
3039        );
3040
3041        assert_eq!(schema.fields().len(), 4);
3042        assert_eq!(schema.field(0).name(), "n._vid");
3043        assert_eq!(schema.field(1).name(), "n._labels");
3044        assert_eq!(schema.field(2).name(), "n.name");
3045        assert_eq!(schema.field(3).name(), "n.age");
3046    }
3047
3048    #[test]
3049    fn test_build_edge_schema() {
3050        let uni_schema = UniSchema::default();
3051        let schema =
3052            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3053
3054        assert_eq!(schema.fields().len(), 4);
3055        assert_eq!(schema.field(0).name(), "r._eid");
3056        assert_eq!(schema.field(1).name(), "r._src_vid");
3057        assert_eq!(schema.field(2).name(), "r._dst_vid");
3058        assert_eq!(schema.field(3).name(), "r.weight");
3059    }
3060
3061    #[test]
3062    fn test_build_schemaless_vertex_schema() {
3063        let schema = GraphScanExec::build_schemaless_vertex_schema(
3064            "n",
3065            &["name".to_string(), "age".to_string()],
3066        );
3067
3068        assert_eq!(schema.fields().len(), 4);
3069        assert_eq!(schema.field(0).name(), "n._vid");
3070        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3071        assert_eq!(schema.field(1).name(), "n._labels");
3072        assert_eq!(schema.field(2).name(), "n.name");
3073        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3074        assert_eq!(schema.field(3).name(), "n.age");
3075        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3076    }
3077
3078    #[test]
3079    fn test_schemaless_all_scan_has_empty_label() {
3080        // This test verifies that new_schemaless_all_scan creates a scan with empty label
3081        // We can't fully test execution without a GraphExecutionContext, but we can verify
3082        // the constructor sets the empty label correctly by checking the struct internals
3083        // This is a structural test to ensure the "empty label signals scan all" pattern works
3084        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[]);
3085
3086        // Verify the schema has _vid and _labels columns for a scan with no properties
3087        assert_eq!(schema.fields().len(), 2);
3088        assert_eq!(schema.field(0).name(), "n._vid");
3089        assert_eq!(schema.field(1).name(), "n._labels");
3090    }
3091
3092    #[test]
3093    fn test_cypher_value_all_props_extraction() {
3094        // Simulate _all_props encoding using encode_cypher_value helper
3095        let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3096        let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3097
3098        // Decode and extract "age" value
3099        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3100        match decoded {
3101            uni_common::Value::Map(map) => {
3102                let age_val = map.get("age").unwrap();
3103                assert_eq!(age_val, &uni_common::Value::Int(30));
3104            }
3105            _ => panic!("Expected Map"),
3106        }
3107
3108        // Also test single value encoding
3109        let single_val = serde_json::json!(30);
3110        let single_bytes = encode_cypher_value(&single_val).unwrap();
3111        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3112        assert_eq!(single_decoded, uni_common::Value::Int(30));
3113    }
3114
3115    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3116    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3117        let schema = Arc::new(Schema::new(vec![
3118            Field::new("_vid", DataType::UInt64, false),
3119            Field::new("_deleted", DataType::Boolean, false),
3120            Field::new("_version", DataType::UInt64, false),
3121            Field::new("name", DataType::Utf8, true),
3122        ]));
3123        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3124        let names: Vec<String> = vids
3125            .iter()
3126            .zip(versions.iter())
3127            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3128            .collect();
3129        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3130
3131        RecordBatch::try_new(
3132            schema,
3133            vec![
3134                Arc::new(UInt64Array::from(vids.to_vec())),
3135                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3136                Arc::new(UInt64Array::from(versions.to_vec())),
3137                Arc::new(name_arr),
3138            ],
3139        )
3140        .unwrap()
3141    }
3142
3143    #[test]
3144    fn test_mvcc_dedup_multiple_versions() {
3145        // VID 1 at versions 3, 1, 5 — should keep version 5
3146        // VID 2 at versions 2, 4 — should keep version 4
3147        let batch = make_mvcc_batch(
3148            &[1, 1, 1, 2, 2],
3149            &[3, 1, 5, 2, 4],
3150            &[false, false, false, false, false],
3151        );
3152
3153        let result = mvcc_dedup_batch(&batch).unwrap();
3154        assert_eq!(result.num_rows(), 2);
3155
3156        let vid_col = result
3157            .column_by_name("_vid")
3158            .unwrap()
3159            .as_any()
3160            .downcast_ref::<UInt64Array>()
3161            .unwrap();
3162        let ver_col = result
3163            .column_by_name("_version")
3164            .unwrap()
3165            .as_any()
3166            .downcast_ref::<UInt64Array>()
3167            .unwrap();
3168        let name_col = result
3169            .column_by_name("name")
3170            .unwrap()
3171            .as_any()
3172            .downcast_ref::<arrow_array::StringArray>()
3173            .unwrap();
3174
3175        // VID 1 → version 5, VID 2 → version 4
3176        assert_eq!(vid_col.value(0), 1);
3177        assert_eq!(ver_col.value(0), 5);
3178        assert_eq!(name_col.value(0), "v1_ver5");
3179
3180        assert_eq!(vid_col.value(1), 2);
3181        assert_eq!(ver_col.value(1), 4);
3182        assert_eq!(name_col.value(1), "v2_ver4");
3183    }
3184
3185    #[test]
3186    fn test_mvcc_dedup_single_rows() {
3187        // Each VID appears once — nothing should change
3188        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3189        let result = mvcc_dedup_batch(&batch).unwrap();
3190        assert_eq!(result.num_rows(), 3);
3191    }
3192
3193    #[test]
3194    fn test_mvcc_dedup_empty() {
3195        let batch = make_mvcc_batch(&[], &[], &[]);
3196        let result = mvcc_dedup_batch(&batch).unwrap();
3197        assert_eq!(result.num_rows(), 0);
3198    }
3199
3200    #[test]
3201    fn test_filter_l0_tombstones_removes_tombstoned() {
3202        use crate::query::df_graph::L0Context;
3203
3204        // Create a batch with VIDs 1, 2, 3
3205        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3206
3207        // Create L0 context with VID 2 tombstoned
3208        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3209        {
3210            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
3211            // But we can't easily create one with tombstones through the constructor.
3212            // Use a direct approach.
3213        }
3214        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3215        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3216
3217        let l0_ctx = L0Context {
3218            current_l0: Some(l0_buf),
3219            transaction_l0: None,
3220            pending_flush_l0s: vec![],
3221        };
3222
3223        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3224        assert_eq!(result.num_rows(), 2);
3225
3226        let vid_col = result
3227            .column_by_name("_vid")
3228            .unwrap()
3229            .as_any()
3230            .downcast_ref::<UInt64Array>()
3231            .unwrap();
3232        assert_eq!(vid_col.value(0), 1);
3233        assert_eq!(vid_col.value(1), 3);
3234    }
3235
3236    #[test]
3237    fn test_filter_l0_tombstones_none() {
3238        use crate::query::df_graph::L0Context;
3239
3240        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3241        let l0_ctx = L0Context::default();
3242
3243        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3244        assert_eq!(result.num_rows(), 3);
3245    }
3246
3247    #[test]
3248    fn test_map_to_output_schema_basic() {
3249        use crate::query::df_graph::L0Context;
3250
3251        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
3252        let lance_schema = Arc::new(Schema::new(vec![
3253            Field::new("_vid", DataType::UInt64, false),
3254            Field::new("_deleted", DataType::Boolean, false),
3255            Field::new("_version", DataType::UInt64, false),
3256            Field::new("name", DataType::Utf8, true),
3257        ]));
3258        let name_arr: arrow_array::StringArray =
3259            vec![Some("Alice"), Some("Bob")].into_iter().collect();
3260        let batch = RecordBatch::try_new(
3261            lance_schema,
3262            vec![
3263                Arc::new(UInt64Array::from(vec![1u64, 2])),
3264                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3265                Arc::new(UInt64Array::from(vec![1u64, 1])),
3266                Arc::new(name_arr),
3267            ],
3268        )
3269        .unwrap();
3270
3271        // Output schema: n._vid, n._labels, n.name
3272        let output_schema = Arc::new(Schema::new(vec![
3273            Field::new("n._vid", DataType::UInt64, false),
3274            Field::new("n._labels", labels_data_type(), true),
3275            Field::new("n.name", DataType::Utf8, true),
3276        ]));
3277
3278        let l0_ctx = L0Context::default();
3279        let result = map_to_output_schema(
3280            &batch,
3281            "Person",
3282            "n",
3283            &["name".to_string()],
3284            &output_schema,
3285            &l0_ctx,
3286        )
3287        .unwrap();
3288
3289        assert_eq!(result.num_rows(), 2);
3290        assert_eq!(result.schema().fields().len(), 3);
3291        assert_eq!(result.schema().field(0).name(), "n._vid");
3292        assert_eq!(result.schema().field(1).name(), "n._labels");
3293        assert_eq!(result.schema().field(2).name(), "n.name");
3294
3295        // Check name values carried through
3296        let name_col = result
3297            .column(2)
3298            .as_any()
3299            .downcast_ref::<arrow_array::StringArray>()
3300            .unwrap();
3301        assert_eq!(name_col.value(0), "Alice");
3302        assert_eq!(name_col.value(1), "Bob");
3303    }
3304}