Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down.
89    filter: Option<Arc<dyn PhysicalExpr>>,
90
91    /// Whether this is an edge scan (vs vertex scan).
92    is_edge_scan: bool,
93
94    /// Whether this is a schemaless scan (uses main table instead of per-label table).
95    is_schemaless: bool,
96
97    /// Output schema with materialized property columns.
98    schema: SchemaRef,
99
100    /// Cached plan properties.
101    properties: PlanProperties,
102
103    /// Metrics for execution tracking.
104    metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("GraphScanExec")
110            .field("label", &self.label)
111            .field("variable", &self.variable)
112            .field("projected_properties", &self.projected_properties)
113            .field("is_edge_scan", &self.is_edge_scan)
114            .finish()
115    }
116}
117
118impl GraphScanExec {
119    /// Create a new graph scan for vertices.
120    ///
121    /// Scans all vertices of the given label from storage and L0 buffers,
122    /// then materializes the requested properties.
123    pub fn new_vertex_scan(
124        graph_ctx: Arc<GraphExecutionContext>,
125        label: impl Into<String>,
126        variable: impl Into<String>,
127        projected_properties: Vec<String>,
128        filter: Option<Arc<dyn PhysicalExpr>>,
129    ) -> Self {
130        let label = label.into();
131        let variable = variable.into();
132
133        // Build output schema with proper types from Uni schema
134        let uni_schema = graph_ctx.storage().schema_manager().schema();
135        let schema =
136            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138        let properties = compute_plan_properties(schema.clone());
139
140        Self {
141            graph_ctx,
142            label,
143            variable,
144            projected_properties,
145            filter,
146            is_edge_scan: false,
147            is_schemaless: false,
148            schema,
149            properties,
150            metrics: ExecutionPlanMetricsSet::new(),
151        }
152    }
153
154    /// Create a new schemaless vertex scan.
155    ///
156    /// Scans the main vertices table for vertices with the given label name.
157    /// Properties are extracted from props_json (all treated as Utf8/JSON).
158    /// This is used for labels that aren't in the schema.
159    pub fn new_schemaless_vertex_scan(
160        graph_ctx: Arc<GraphExecutionContext>,
161        label_name: impl Into<String>,
162        variable: impl Into<String>,
163        projected_properties: Vec<String>,
164        filter: Option<Arc<dyn PhysicalExpr>>,
165    ) -> Self {
166        let label = label_name.into();
167        let variable = variable.into();
168
169        // Filter out system columns that are already materialized as dedicated columns
170        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
171        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
172        // they would create duplicate columns with conflicting types.
173        let projected_properties: Vec<String> = projected_properties
174            .into_iter()
175            .filter(|p| p != "_vid" && p != "_labels")
176            .collect();
177
178        let uni_schema = graph_ctx.storage().schema_manager().schema();
179        let schema =
180            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
181        let properties = compute_plan_properties(schema.clone());
182
183        Self {
184            graph_ctx,
185            label,
186            variable,
187            projected_properties,
188            filter,
189            is_edge_scan: false,
190            is_schemaless: true,
191            schema,
192            properties,
193            metrics: ExecutionPlanMetricsSet::new(),
194        }
195    }
196
197    /// Create a new multi-label vertex scan using the main vertices table.
198    ///
199    /// Scans for vertices that have ALL specified labels (intersection semantics).
200    /// Properties are extracted from props_json (schemaless).
201    pub fn new_multi_label_vertex_scan(
202        graph_ctx: Arc<GraphExecutionContext>,
203        labels: Vec<String>,
204        variable: impl Into<String>,
205        projected_properties: Vec<String>,
206        filter: Option<Arc<dyn PhysicalExpr>>,
207    ) -> Self {
208        let variable = variable.into();
209        let projected_properties: Vec<String> = projected_properties
210            .into_iter()
211            .filter(|p| p != "_vid" && p != "_labels")
212            .collect();
213        let uni_schema = graph_ctx.storage().schema_manager().schema();
214        let schema =
215            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
216        let properties = compute_plan_properties(schema.clone());
217
218        // Encode labels as colon-separated for the stream to parse
219        let encoded_labels = labels.join(":");
220
221        Self {
222            graph_ctx,
223            label: encoded_labels,
224            variable,
225            projected_properties,
226            filter,
227            is_edge_scan: false,
228            is_schemaless: true,
229            schema,
230            properties,
231            metrics: ExecutionPlanMetricsSet::new(),
232        }
233    }
234
235    /// Create a new schemaless scan for all vertices.
236    ///
237    /// Scans the main vertices table for all vertices regardless of label.
238    /// Properties are extracted from props_json with types resolved from the schema.
239    /// This is used for `MATCH (n)` without label filter.
240    pub fn new_schemaless_all_scan(
241        graph_ctx: Arc<GraphExecutionContext>,
242        variable: impl Into<String>,
243        projected_properties: Vec<String>,
244        filter: Option<Arc<dyn PhysicalExpr>>,
245    ) -> Self {
246        let variable = variable.into();
247        let projected_properties: Vec<String> = projected_properties
248            .into_iter()
249            .filter(|p| p != "_vid" && p != "_labels")
250            .collect();
251
252        let uni_schema = graph_ctx.storage().schema_manager().schema();
253        let schema =
254            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
255        let properties = compute_plan_properties(schema.clone());
256
257        Self {
258            graph_ctx,
259            label: String::new(), // Empty label signals "scan all vertices"
260            variable,
261            projected_properties,
262            filter,
263            is_edge_scan: false,
264            is_schemaless: true,
265            schema,
266            properties,
267            metrics: ExecutionPlanMetricsSet::new(),
268        }
269    }
270
271    /// Build schema for schemaless vertex scan.
272    ///
273    /// Resolves property types from all labels in the schema. Falls back to
274    /// LargeBinary (CypherValue encoding) for properties not found in any
275    /// label's schema.
276    fn build_schemaless_vertex_schema(
277        variable: &str,
278        properties: &[String],
279        uni_schema: &uni_common::core::schema::Schema,
280    ) -> SchemaRef {
281        // Merge property metadata from all labels for type resolution.
282        let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
283            std::collections::HashMap::new();
284        for label_props in uni_schema.properties.values() {
285            for (name, meta) in label_props {
286                merged.entry(name.as_str()).or_insert(meta);
287            }
288        }
289
290        let mut fields = vec![
291            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
292            Field::new(format!("{}._labels", variable), labels_data_type(), true),
293        ];
294
295        for prop in properties {
296            let col_name = format!("{}.{}", variable, prop);
297            let arrow_type = merged
298                .get(prop.as_str())
299                .map(|meta| meta.r#type.to_arrow())
300                .unwrap_or(DataType::LargeBinary);
301            fields.push(Field::new(&col_name, arrow_type, true));
302        }
303
304        Arc::new(Schema::new(fields))
305    }
306
307    /// Create a new graph scan for edges.
308    ///
309    /// Scans all edges of the given type from storage and L0 buffers,
310    /// then materializes the requested properties.
311    pub fn new_edge_scan(
312        graph_ctx: Arc<GraphExecutionContext>,
313        edge_type: impl Into<String>,
314        variable: impl Into<String>,
315        projected_properties: Vec<String>,
316        filter: Option<Arc<dyn PhysicalExpr>>,
317    ) -> Self {
318        let label = edge_type.into();
319        let variable = variable.into();
320
321        // Build output schema with proper types from Uni schema
322        let uni_schema = graph_ctx.storage().schema_manager().schema();
323        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
324
325        let properties = compute_plan_properties(schema.clone());
326
327        Self {
328            graph_ctx,
329            label,
330            variable,
331            projected_properties,
332            filter,
333            is_edge_scan: true,
334            is_schemaless: false,
335            schema,
336            properties,
337            metrics: ExecutionPlanMetricsSet::new(),
338        }
339    }
340
341    /// Build output schema for vertex scan with proper Arrow types.
342    fn build_vertex_schema(
343        variable: &str,
344        label: &str,
345        properties: &[String],
346        uni_schema: &UniSchema,
347    ) -> SchemaRef {
348        let mut fields = vec![
349            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
350            Field::new(format!("{}._labels", variable), labels_data_type(), true),
351        ];
352        let label_props = uni_schema.properties.get(label);
353        for prop in properties {
354            let col_name = format!("{}.{}", variable, prop);
355            let arrow_type = resolve_property_type(prop, label_props);
356            fields.push(Field::new(&col_name, arrow_type, true));
357        }
358        Arc::new(Schema::new(fields))
359    }
360
361    /// Build output schema for edge scan with proper Arrow types.
362    fn build_edge_schema(
363        variable: &str,
364        edge_type: &str,
365        properties: &[String],
366        uni_schema: &UniSchema,
367    ) -> SchemaRef {
368        let mut fields = vec![
369            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
370            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
371            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
372        ];
373        let edge_props = uni_schema.properties.get(edge_type);
374        for prop in properties {
375            let col_name = format!("{}.{}", variable, prop);
376            let arrow_type = resolve_property_type(prop, edge_props);
377            fields.push(Field::new(&col_name, arrow_type, true));
378        }
379        Arc::new(Schema::new(fields))
380    }
381}
382
383impl DisplayAs for GraphScanExec {
384    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
386        write!(
387            f,
388            "GraphScanExec: {}={}, properties={:?}",
389            scan_type, self.label, self.projected_properties
390        )?;
391        if self.filter.is_some() {
392            write!(f, ", filter=<pushed>")?;
393        }
394        Ok(())
395    }
396}
397
398impl ExecutionPlan for GraphScanExec {
399    fn name(&self) -> &str {
400        "GraphScanExec"
401    }
402
403    fn as_any(&self) -> &dyn Any {
404        self
405    }
406
407    fn schema(&self) -> SchemaRef {
408        self.schema.clone()
409    }
410
411    fn properties(&self) -> &PlanProperties {
412        &self.properties
413    }
414
415    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
416        vec![]
417    }
418
419    fn with_new_children(
420        self: Arc<Self>,
421        children: Vec<Arc<dyn ExecutionPlan>>,
422    ) -> DFResult<Arc<dyn ExecutionPlan>> {
423        if children.is_empty() {
424            Ok(self)
425        } else {
426            Err(datafusion::error::DataFusionError::Plan(
427                "GraphScanExec does not accept children".to_string(),
428            ))
429        }
430    }
431
432    fn execute(
433        &self,
434        partition: usize,
435        _context: Arc<TaskContext>,
436    ) -> DFResult<SendableRecordBatchStream> {
437        let metrics = BaselineMetrics::new(&self.metrics, partition);
438
439        Ok(Box::pin(GraphScanStream::new(
440            self.graph_ctx.clone(),
441            self.label.clone(),
442            self.variable.clone(),
443            self.projected_properties.clone(),
444            self.is_edge_scan,
445            self.is_schemaless,
446            self.schema.clone(),
447            metrics,
448        )))
449    }
450
451    fn metrics(&self) -> Option<MetricsSet> {
452        Some(self.metrics.clone_inner())
453    }
454}
455
456/// State machine for graph scan stream execution.
457enum GraphScanState {
458    /// Initial state, ready to start scanning.
459    Init,
460    /// Executing the async scan.
461    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
462    /// Stream is done.
463    Done,
464}
465
466/// Stream that scans vertices or edges and materializes properties.
467///
468/// For known-label vertex scans, uses a single columnar Lance query with
469/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
470/// to the two-phase VID-scan + property-materialize flow.
471struct GraphScanStream {
472    /// Graph execution context.
473    graph_ctx: Arc<GraphExecutionContext>,
474
475    /// Label (vertex) or edge type name.
476    label: String,
477
478    /// Variable name for column prefixing (e.g., "n" in `n.name`).
479    variable: String,
480
481    /// Properties to materialize.
482    properties: Vec<String>,
483
484    /// Whether this is an edge scan.
485    is_edge_scan: bool,
486
487    /// Whether this is a schemaless scan.
488    is_schemaless: bool,
489
490    /// Output schema.
491    schema: SchemaRef,
492
493    /// Stream state.
494    state: GraphScanState,
495
496    /// Metrics.
497    metrics: BaselineMetrics,
498}
499
500impl GraphScanStream {
501    /// Create a new graph scan stream.
502    #[expect(clippy::too_many_arguments)]
503    fn new(
504        graph_ctx: Arc<GraphExecutionContext>,
505        label: String,
506        variable: String,
507        properties: Vec<String>,
508        is_edge_scan: bool,
509        is_schemaless: bool,
510        schema: SchemaRef,
511        metrics: BaselineMetrics,
512    ) -> Self {
513        Self {
514            graph_ctx,
515            label,
516            variable,
517            properties,
518            is_edge_scan,
519            is_schemaless,
520            schema,
521            state: GraphScanState::Init,
522            metrics,
523        }
524    }
525}
526
527/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
528///
529/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
530/// preserving original value types for overflow/unknown properties.
531pub(crate) fn resolve_property_type(
532    prop: &str,
533    schema_props: Option<
534        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
535    >,
536) -> DataType {
537    if prop == "overflow_json" {
538        DataType::LargeBinary
539    } else {
540        schema_props
541            .and_then(|props| props.get(prop))
542            .map(|meta| meta.r#type.to_arrow())
543            .unwrap_or(DataType::LargeBinary)
544    }
545}
546
547// ============================================================================
548// Columnar-first scan helpers
549// ============================================================================
550
551/// MVCC deduplication: keep only the highest-version row for each `_vid`.
552///
553/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
554/// _vid (= the highest version). This is a pure Arrow-compute operation.
555#[cfg(test)]
556fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
557    mvcc_dedup_batch_by(batch, "_vid")
558}
559
560/// Dedup a Lance batch and return `Some` only when rows remain.
561///
562/// Wraps the common pattern of dedup + empty-check that appears in every
563/// columnar scan path (vertex, edge, schemaless).
564fn mvcc_dedup_to_option(
565    batch: Option<RecordBatch>,
566    id_column: &str,
567) -> DFResult<Option<RecordBatch>> {
568    match batch {
569        Some(b) => {
570            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
571            Ok(if deduped.num_rows() > 0 {
572                Some(deduped)
573            } else {
574                None
575            })
576        }
577        None => Ok(None),
578    }
579}
580
581/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
582/// result. Returns an empty batch (against `output_schema`) when both inputs
583/// are empty.
584fn merge_lance_and_l0(
585    lance_deduped: Option<RecordBatch>,
586    l0_batch: RecordBatch,
587    internal_schema: &SchemaRef,
588    id_column: &str,
589) -> DFResult<Option<RecordBatch>> {
590    let has_l0 = l0_batch.num_rows() > 0;
591    match (lance_deduped, has_l0) {
592        (Some(lance), true) => {
593            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
594                .map_err(arrow_err)?;
595            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
596        }
597        (Some(lance), false) => Ok(Some(lance)),
598        (None, true) => Ok(Some(l0_batch)),
599        (None, false) => Ok(None),
600    }
601}
602
603/// Push `col_name` into `columns` if not already present.
604///
605/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
606/// that creates a temporary `String` allocation on every check.
607fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
608    if !columns.iter().any(|c| c == col_name) {
609        columns.push(col_name.to_string());
610    }
611}
612
613/// Extract a property value from an overflow_json CypherValue blob.
614///
615/// Returns the raw CypherValue bytes for `prop` if found in the blob,
616/// or `None` if the blob is null or the key is absent.
617fn extract_from_overflow_blob(
618    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
619    row: usize,
620    prop: &str,
621) -> Option<Vec<u8>> {
622    let arr = overflow_arr?;
623    if arr.is_null(row) {
624        return None;
625    }
626    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
627}
628
629/// Build a `LargeBinary` column by extracting a property from overflow_json
630/// blobs, with L0 buffer overlay.
631///
632/// For each row, checks L0 buffers first (later buffers take precedence).
633/// If the property is not in L0, falls back to extracting from the
634/// overflow_json CypherValue blob.
635fn build_overflow_property_column(
636    num_rows: usize,
637    vid_arr: &UInt64Array,
638    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
639    prop: &str,
640    l0_ctx: &crate::query::df_graph::L0Context,
641) -> ArrayRef {
642    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
643    for i in 0..num_rows {
644        let vid = Vid::from(vid_arr.value(i));
645
646        // Check L0 buffers (later overwrites earlier)
647        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
648
649        if let Some(val_opt) = l0_val {
650            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
651        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
652            builder.append_value(&bytes);
653        } else {
654            builder.append_null();
655        }
656    }
657    Arc::new(builder.finish())
658}
659
660/// Resolve a property value from the L0 visibility chain.
661///
662/// Returns `Some(Some(val))` when the property exists with a non-null value,
663/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
664/// has the property.
665fn resolve_l0_property(
666    vid: &Vid,
667    prop: &str,
668    l0_ctx: &crate::query::df_graph::L0Context,
669) -> Option<Option<Value>> {
670    let mut result = None;
671    for l0 in l0_ctx.iter_l0_buffers() {
672        let guard = l0.read();
673        if let Some(props) = guard.vertex_properties.get(vid)
674            && let Some(val) = props.get(prop)
675        {
676            result = Some(Some(val.clone()));
677        }
678    }
679    result
680}
681
682/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
683///
684/// Non-null values are JSON-encoded then CypherValue-encoded.
685/// Null values (or encoding failures) produce null entries.
686fn append_value_as_cypher_binary(
687    builder: &mut arrow_array::builder::LargeBinaryBuilder,
688    val: Option<&Value>,
689) {
690    match val {
691        Some(v) if !v.is_null() => {
692            let json_val: serde_json::Value = v.clone().into();
693            match encode_cypher_value(&json_val) {
694                Ok(bytes) => builder.append_value(bytes),
695                Err(_) => builder.append_null(),
696            }
697        }
698        _ => builder.append_null(),
699    }
700}
701
702/// Build the `_all_props` column by overlaying L0 buffer properties onto
703/// the batch's `props_json` column.
704///
705/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
706/// properties (in visibility order: pending → current → transaction), and
707/// re-encodes the result. This ensures `properties()` and `keys()` reflect
708/// uncommitted L0 mutations.
709fn build_all_props_column_with_l0_overlay(
710    num_rows: usize,
711    vid_arr: &UInt64Array,
712    props_arr: Option<&arrow_array::LargeBinaryArray>,
713    l0_ctx: &crate::query::df_graph::L0Context,
714) -> ArrayRef {
715    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
716    for i in 0..num_rows {
717        let vid = Vid::from(vid_arr.value(i));
718
719        // 1. Decode props_json blob from storage
720        let mut merged_props = serde_json::Map::new();
721        if let Some(arr) = props_arr
722            && !arr.is_null(i)
723            && let Ok(uni_common::Value::Map(map)) =
724                uni_common::cypher_value_codec::decode(arr.value(i))
725        {
726            for (k, v) in map {
727                let json_val: serde_json::Value = v.into();
728                merged_props.insert(k, json_val);
729            }
730        }
731
732        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
733        for l0 in l0_ctx.iter_l0_buffers() {
734            let guard = l0.read();
735            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
736                for (k, v) in l0_props {
737                    let json_val: serde_json::Value = v.clone().into();
738                    merged_props.insert(k.clone(), json_val);
739                }
740            }
741        }
742
743        // 3. Encode merged result
744        if merged_props.is_empty() {
745            builder.append_null();
746        } else {
747            let json_obj = serde_json::Value::Object(merged_props);
748            match encode_cypher_value(&json_obj) {
749                Ok(bytes) => builder.append_value(bytes),
750                Err(_) => builder.append_null(),
751            }
752        }
753    }
754    Arc::new(builder.finish())
755}
756
757/// Build `_all_props` for a schema-based scan by merging:
758/// 1. Schema-defined columns from the batch
759/// 2. Overflow_json properties
760/// 3. L0 buffer properties
761fn build_all_props_column_for_schema_scan(
762    batch: &RecordBatch,
763    vid_arr: &UInt64Array,
764    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
765    projected_properties: &[String],
766    l0_ctx: &crate::query::df_graph::L0Context,
767) -> ArrayRef {
768    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
769    let schema_props: Vec<&str> = projected_properties
770        .iter()
771        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
772        .map(String::as_str)
773        .collect();
774
775    let num_rows = batch.num_rows();
776    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
777    for i in 0..num_rows {
778        let vid = Vid::from(vid_arr.value(i));
779        let mut merged_props = serde_json::Map::new();
780
781        // 1. Schema-defined columns
782        for &prop in &schema_props {
783            if let Some(col) = batch.column_by_name(prop) {
784                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
785                if !val.is_null() {
786                    let json_val: serde_json::Value = val.into();
787                    merged_props.insert(prop.to_string(), json_val);
788                }
789            }
790        }
791
792        // 2. Overflow_json properties
793        if let Some(arr) = overflow_arr
794            && !arr.is_null(i)
795            && let Ok(uni_common::Value::Map(map)) =
796                uni_common::cypher_value_codec::decode(arr.value(i))
797        {
798            for (k, v) in map {
799                let json_val: serde_json::Value = v.into();
800                merged_props.insert(k, json_val);
801            }
802        }
803
804        // 3. L0 buffer overlay (pending → current → transaction)
805        for l0 in l0_ctx.iter_l0_buffers() {
806            let guard = l0.read();
807            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
808                for (k, v) in l0_props {
809                    let json_val: serde_json::Value = v.clone().into();
810                    merged_props.insert(k.clone(), json_val);
811                }
812            }
813        }
814
815        if merged_props.is_empty() {
816            builder.append_null();
817        } else {
818            let json_obj = serde_json::Value::Object(merged_props);
819            match encode_cypher_value(&json_obj) {
820                Ok(bytes) => builder.append_value(bytes),
821                Err(_) => builder.append_null(),
822            }
823        }
824    }
825    Arc::new(builder.finish())
826}
827
828/// MVCC deduplication: keep only the highest-version row for each unique value
829/// in the given `id_column`.
830///
831/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
832/// each id (= the highest version). This is a pure Arrow-compute operation.
833fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
834    if batch.num_rows() == 0 {
835        return Ok(batch.clone());
836    }
837
838    let id_col = batch
839        .column_by_name(id_column)
840        .ok_or_else(|| {
841            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
842        })?
843        .clone();
844    let version_col = batch
845        .column_by_name("_version")
846        .ok_or_else(|| {
847            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
848        })?
849        .clone();
850
851    // Sort by (id_column ASC, _version DESC)
852    let sort_columns = vec![
853        arrow::compute::SortColumn {
854            values: id_col,
855            options: Some(arrow::compute::SortOptions {
856                descending: false,
857                nulls_first: false,
858            }),
859        },
860        arrow::compute::SortColumn {
861            values: version_col,
862            options: Some(arrow::compute::SortOptions {
863                descending: true,
864                nulls_first: false,
865            }),
866        },
867    ];
868    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
869
870    // Reorder all columns by sorted indices
871    let sorted_columns: Vec<ArrayRef> = batch
872        .columns()
873        .iter()
874        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
875        .collect::<Result<_, _>>()
876        .map_err(arrow_err)?;
877    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
878
879    // Build dedup mask: keep first occurrence of each id
880    let sorted_id = sorted
881        .column_by_name(id_column)
882        .unwrap()
883        .as_any()
884        .downcast_ref::<UInt64Array>()
885        .unwrap();
886
887    let mut keep = vec![false; sorted.num_rows()];
888    if !keep.is_empty() {
889        keep[0] = true;
890        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
891            if sorted_id.value(i) != sorted_id.value(i - 1) {
892                *flag = true;
893            }
894        }
895    }
896
897    let mask = arrow_array::BooleanArray::from(keep);
898    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
899}
900
901/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
902fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
903    if batch.num_rows() == 0 {
904        return Ok(batch.clone());
905    }
906    let op_col = match batch.column_by_name("op") {
907        Some(col) => col
908            .as_any()
909            .downcast_ref::<arrow_array::UInt8Array>()
910            .unwrap(),
911        None => return Ok(batch.clone()),
912    };
913    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
914    let mask = arrow_array::BooleanArray::from(keep);
915    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
916}
917
918/// Filter out rows where `_deleted = true` after MVCC dedup.
919fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
920    if batch.num_rows() == 0 {
921        return Ok(batch.clone());
922    }
923    let deleted_col = match batch.column_by_name("_deleted") {
924        Some(col) => col
925            .as_any()
926            .downcast_ref::<arrow_array::BooleanArray>()
927            .unwrap(),
928        None => return Ok(batch.clone()),
929    };
930    let keep: Vec<bool> = (0..deleted_col.len())
931        .map(|i| !deleted_col.value(i))
932        .collect();
933    let mask = arrow_array::BooleanArray::from(keep);
934    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
935}
936
937/// Filter out rows whose `_vid` appears in L0 tombstones.
938fn filter_l0_tombstones(
939    batch: &RecordBatch,
940    l0_ctx: &crate::query::df_graph::L0Context,
941) -> DFResult<RecordBatch> {
942    if batch.num_rows() == 0 {
943        return Ok(batch.clone());
944    }
945
946    let mut tombstones: HashSet<u64> = HashSet::new();
947    for l0 in l0_ctx.iter_l0_buffers() {
948        let guard = l0.read();
949        for vid in guard.vertex_tombstones.iter() {
950            tombstones.insert(vid.as_u64());
951        }
952    }
953
954    if tombstones.is_empty() {
955        return Ok(batch.clone());
956    }
957
958    let vid_col = batch
959        .column_by_name("_vid")
960        .ok_or_else(|| {
961            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
962        })?
963        .as_any()
964        .downcast_ref::<UInt64Array>()
965        .unwrap();
966
967    let keep: Vec<bool> = (0..vid_col.len())
968        .map(|i| !tombstones.contains(&vid_col.value(i)))
969        .collect();
970    let mask = arrow_array::BooleanArray::from(keep);
971    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
972}
973
974/// Filter out rows whose `eid` appears in L0 edge tombstones.
975fn filter_l0_edge_tombstones(
976    batch: &RecordBatch,
977    l0_ctx: &crate::query::df_graph::L0Context,
978) -> DFResult<RecordBatch> {
979    if batch.num_rows() == 0 {
980        return Ok(batch.clone());
981    }
982
983    let mut tombstones: HashSet<u64> = HashSet::new();
984    for l0 in l0_ctx.iter_l0_buffers() {
985        let guard = l0.read();
986        for eid in guard.tombstones.keys() {
987            tombstones.insert(eid.as_u64());
988        }
989    }
990
991    if tombstones.is_empty() {
992        return Ok(batch.clone());
993    }
994
995    let eid_col = batch
996        .column_by_name("eid")
997        .ok_or_else(|| {
998            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
999        })?
1000        .as_any()
1001        .downcast_ref::<UInt64Array>()
1002        .unwrap();
1003
1004    let keep: Vec<bool> = (0..eid_col.len())
1005        .map(|i| !tombstones.contains(&eid_col.value(i)))
1006        .collect();
1007    let mask = arrow_array::BooleanArray::from(keep);
1008    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1009}
1010
1011/// Build a RecordBatch from L0 buffer data for a given label, matching the
1012/// Lance query's column set.
1013///
1014/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1015/// with later buffers overwriting earlier ones for the same VID.
1016fn build_l0_vertex_batch(
1017    l0_ctx: &crate::query::df_graph::L0Context,
1018    label: &str,
1019    lance_schema: &SchemaRef,
1020    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1021) -> DFResult<RecordBatch> {
1022    // Collect all L0 vertex data, merging in visibility order
1023    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1024    let mut tombstones: HashSet<u64> = HashSet::new();
1025
1026    for l0 in l0_ctx.iter_l0_buffers() {
1027        let guard = l0.read();
1028        // Collect tombstones
1029        for vid in guard.vertex_tombstones.iter() {
1030            tombstones.insert(vid.as_u64());
1031        }
1032        // Collect vertices for this label
1033        for vid in guard.vids_for_label(label) {
1034            let vid_u64 = vid.as_u64();
1035            if tombstones.contains(&vid_u64) {
1036                continue;
1037            }
1038            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1039            let entry = vid_data
1040                .entry(vid_u64)
1041                .or_insert_with(|| (Properties::new(), 0));
1042            // Merge properties (later L0 overwrites)
1043            if let Some(props) = guard.vertex_properties.get(&vid) {
1044                for (k, v) in props {
1045                    entry.0.insert(k.clone(), v.clone());
1046                }
1047            }
1048            // Take the highest version
1049            if version > entry.1 {
1050                entry.1 = version;
1051            }
1052        }
1053    }
1054
1055    // Remove tombstoned VIDs
1056    for t in &tombstones {
1057        vid_data.remove(t);
1058    }
1059
1060    if vid_data.is_empty() {
1061        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1062    }
1063
1064    // Sort VIDs for deterministic output
1065    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1066    vids.sort_unstable();
1067
1068    let num_rows = vids.len();
1069    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1070
1071    // Determine which schema property names exist
1072    let schema_prop_names: HashSet<&str> = label_props
1073        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1074        .unwrap_or_default();
1075
1076    for field in lance_schema.fields() {
1077        let col_name = field.name().as_str();
1078        match col_name {
1079            "_vid" => {
1080                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1081            }
1082            "_deleted" => {
1083                // L0 vertices are always live (tombstoned ones are already excluded)
1084                let vals = vec![false; num_rows];
1085                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1086            }
1087            "_version" => {
1088                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1089                columns.push(Arc::new(UInt64Array::from(vals)));
1090            }
1091            "overflow_json" => {
1092                // Collect non-schema properties as CypherValue
1093                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1094                for vid_u64 in &vids {
1095                    let (props, _) = &vid_data[vid_u64];
1096                    let mut overflow = serde_json::Map::new();
1097                    for (k, v) in props {
1098                        if k == "ext_id" || k.starts_with('_') {
1099                            continue;
1100                        }
1101                        if !schema_prop_names.contains(k.as_str()) {
1102                            let json_val: serde_json::Value = v.clone().into();
1103                            overflow.insert(k.clone(), json_val);
1104                        }
1105                    }
1106                    if overflow.is_empty() {
1107                        builder.append_null();
1108                    } else {
1109                        let json_val = serde_json::Value::Object(overflow);
1110                        match encode_cypher_value(&json_val) {
1111                            Ok(bytes) => builder.append_value(bytes),
1112                            Err(_) => builder.append_null(),
1113                        }
1114                    }
1115                }
1116                columns.push(Arc::new(builder.finish()));
1117            }
1118            _ => {
1119                // Schema property column: convert L0 Value → Arrow typed value
1120                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1121                columns.push(col);
1122            }
1123        }
1124    }
1125
1126    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1127}
1128
1129/// Build a single Arrow column from L0 property values.
1130///
1131/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1132fn build_l0_property_column(
1133    vids: &[u64],
1134    vid_data: &HashMap<u64, (Properties, u64)>,
1135    prop_name: &str,
1136    data_type: &DataType,
1137) -> DFResult<ArrayRef> {
1138    // Convert to Vid keys for reuse of existing build_property_column_static
1139    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1140    let props_map: HashMap<Vid, Properties> = vid_data
1141        .iter()
1142        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1143        .collect();
1144
1145    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1146}
1147
1148/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1149/// the DeltaDataset Lance table's column set.
1150///
1151/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1152/// with later buffers overwriting earlier ones for the same EID.
1153fn build_l0_edge_batch(
1154    l0_ctx: &crate::query::df_graph::L0Context,
1155    edge_type: &str,
1156    internal_schema: &SchemaRef,
1157    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1158) -> DFResult<RecordBatch> {
1159    // Collect all L0 edge data, merging in visibility order
1160    // eid -> (src_vid, dst_vid, properties, version)
1161    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1162    let mut tombstones: HashSet<u64> = HashSet::new();
1163
1164    for l0 in l0_ctx.iter_l0_buffers() {
1165        let guard = l0.read();
1166        // Collect tombstones
1167        for eid in guard.tombstones.keys() {
1168            tombstones.insert(eid.as_u64());
1169        }
1170        // Collect edges for this type
1171        for eid in guard.eids_for_type(edge_type) {
1172            let eid_u64 = eid.as_u64();
1173            if tombstones.contains(&eid_u64) {
1174                continue;
1175            }
1176            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1177                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1178                None => continue,
1179            };
1180            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1181            let entry = eid_data
1182                .entry(eid_u64)
1183                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1184            // Merge properties (later L0 overwrites)
1185            if let Some(props) = guard.edge_properties.get(&eid) {
1186                for (k, v) in props {
1187                    entry.2.insert(k.clone(), v.clone());
1188                }
1189            }
1190            // Update endpoints from latest L0 layer
1191            entry.0 = src_vid;
1192            entry.1 = dst_vid;
1193            // Take the highest version
1194            if version > entry.3 {
1195                entry.3 = version;
1196            }
1197        }
1198    }
1199
1200    // Remove tombstoned EIDs
1201    for t in &tombstones {
1202        eid_data.remove(t);
1203    }
1204
1205    if eid_data.is_empty() {
1206        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1207    }
1208
1209    // Sort EIDs for deterministic output
1210    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1211    eids.sort_unstable();
1212
1213    let num_rows = eids.len();
1214    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1215
1216    // Determine which schema property names exist
1217    let schema_prop_names: HashSet<&str> = type_props
1218        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1219        .unwrap_or_default();
1220
1221    for field in internal_schema.fields() {
1222        let col_name = field.name().as_str();
1223        match col_name {
1224            "eid" => {
1225                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1226            }
1227            "src_vid" => {
1228                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1229                columns.push(Arc::new(UInt64Array::from(vals)));
1230            }
1231            "dst_vid" => {
1232                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1233                columns.push(Arc::new(UInt64Array::from(vals)));
1234            }
1235            "op" => {
1236                // L0 edges are always live (tombstoned ones already excluded)
1237                let vals = vec![0u8; num_rows];
1238                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1239            }
1240            "_version" => {
1241                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1242                columns.push(Arc::new(UInt64Array::from(vals)));
1243            }
1244            "overflow_json" => {
1245                // Collect non-schema properties as CypherValue
1246                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1247                for eid_u64 in &eids {
1248                    let (_, _, props, _) = &eid_data[eid_u64];
1249                    let mut overflow = serde_json::Map::new();
1250                    for (k, v) in props {
1251                        if k.starts_with('_') {
1252                            continue;
1253                        }
1254                        if !schema_prop_names.contains(k.as_str()) {
1255                            let json_val: serde_json::Value = v.clone().into();
1256                            overflow.insert(k.clone(), json_val);
1257                        }
1258                    }
1259                    if overflow.is_empty() {
1260                        builder.append_null();
1261                    } else {
1262                        let json_val = serde_json::Value::Object(overflow);
1263                        match encode_cypher_value(&json_val) {
1264                            Ok(bytes) => builder.append_value(bytes),
1265                            Err(_) => builder.append_null(),
1266                        }
1267                    }
1268                }
1269                columns.push(Arc::new(builder.finish()));
1270            }
1271            _ => {
1272                // Schema property column: convert L0 Value → Arrow typed value
1273                let col =
1274                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1275                columns.push(col);
1276            }
1277        }
1278    }
1279
1280    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1281}
1282
1283/// Build a single Arrow column from L0 edge property values.
1284///
1285/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1286fn build_l0_edge_property_column(
1287    eids: &[u64],
1288    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1289    prop_name: &str,
1290    data_type: &DataType,
1291) -> DFResult<ArrayRef> {
1292    // Convert to Vid keys for reuse of existing build_property_column_static
1293    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1294    let props_map: HashMap<Vid, Properties> = eid_data
1295        .iter()
1296        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1297        .collect();
1298
1299    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1300}
1301
1302/// Build the `_labels` column for known-label vertices.
1303///
1304/// Reads `_labels` from the stored Lance batch if available. Falls back to
1305/// `[label]` when the column is absent (legacy data). Additional labels from
1306/// L0 buffers are merged in.
1307fn build_labels_column_for_known_label(
1308    vid_arr: &UInt64Array,
1309    label: &str,
1310    l0_ctx: &crate::query::df_graph::L0Context,
1311    batch_labels_col: Option<&arrow_array::ListArray>,
1312) -> DFResult<ArrayRef> {
1313    use uni_store::storage::arrow_convert::labels_from_list_array;
1314
1315    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1316
1317    for i in 0..vid_arr.len() {
1318        let vid = Vid::from(vid_arr.value(i));
1319
1320        // Start with labels from the stored column, falling back to [label]
1321        let mut labels = match batch_labels_col {
1322            Some(list_arr) => {
1323                let stored = labels_from_list_array(list_arr, i);
1324                if stored.is_empty() {
1325                    vec![label.to_string()]
1326                } else {
1327                    stored
1328                }
1329            }
1330            None => vec![label.to_string()],
1331        };
1332
1333        // Ensure the scanned label is present (defensive)
1334        if !labels.iter().any(|l| l == label) {
1335            labels.push(label.to_string());
1336        }
1337
1338        // Merge additional labels from L0 buffers
1339        for l0 in l0_ctx.iter_l0_buffers() {
1340            let guard = l0.read();
1341            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1342                for lbl in l0_labels {
1343                    if !labels.contains(lbl) {
1344                        labels.push(lbl.clone());
1345                    }
1346                }
1347            }
1348        }
1349
1350        let values = labels_builder.values();
1351        for lbl in &labels {
1352            values.append_value(lbl);
1353        }
1354        labels_builder.append(true);
1355    }
1356
1357    Ok(Arc::new(labels_builder.finish()))
1358}
1359
1360/// Map a Lance-schema batch to the DataFusion output schema.
1361///
1362/// The output schema has `{variable}.{property}` column names, while Lance
1363/// uses bare property names. This function performs the positional mapping,
1364/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1365fn map_to_output_schema(
1366    batch: &RecordBatch,
1367    label: &str,
1368    _variable: &str,
1369    projected_properties: &[String],
1370    output_schema: &SchemaRef,
1371    l0_ctx: &crate::query::df_graph::L0Context,
1372) -> DFResult<RecordBatch> {
1373    if batch.num_rows() == 0 {
1374        return Ok(RecordBatch::new_empty(output_schema.clone()));
1375    }
1376
1377    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1378
1379    // 1. {var}._vid
1380    let vid_col = batch
1381        .column_by_name("_vid")
1382        .ok_or_else(|| {
1383            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1384        })?
1385        .clone();
1386    let vid_arr = vid_col
1387        .as_any()
1388        .downcast_ref::<UInt64Array>()
1389        .ok_or_else(|| {
1390            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1391        })?;
1392
1393    // 2. {var}._labels — read from stored column, overlay L0 additions
1394    let batch_labels_col = batch
1395        .column_by_name("_labels")
1396        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1397    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1398    columns.push(vid_col.clone());
1399    columns.push(labels_col);
1400
1401    // 3. Projected properties
1402    // Pre-load overflow_json column for extracting non-schema properties
1403    let overflow_arr = batch
1404        .column_by_name("overflow_json")
1405        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1406
1407    for prop in projected_properties {
1408        if prop == "overflow_json" {
1409            match batch.column_by_name("overflow_json") {
1410                Some(col) => columns.push(col.clone()),
1411                None => {
1412                    // No overflow_json in Lance — return null column
1413                    columns.push(arrow_array::new_null_array(
1414                        &DataType::LargeBinary,
1415                        batch.num_rows(),
1416                    ));
1417                }
1418            }
1419        } else if prop == "_all_props" {
1420            // Build _all_props from overflow_json + L0 overlay.
1421            // Fast path: if no L0 buffer has vertex property mutations AND
1422            // there are no schema columns to merge, pass through overflow_json.
1423            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1424                let guard = l0.read();
1425                !guard.vertex_properties.is_empty()
1426            });
1427            // Check if this label has schema-defined columns (besides system columns)
1428            let has_schema_cols = projected_properties
1429                .iter()
1430                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1431
1432            if !any_l0_has_vertex_props && !has_schema_cols {
1433                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1434                match batch.column_by_name("overflow_json") {
1435                    Some(col) => columns.push(col.clone()),
1436                    None => {
1437                        columns.push(arrow_array::new_null_array(
1438                            &DataType::LargeBinary,
1439                            batch.num_rows(),
1440                        ));
1441                    }
1442                }
1443            } else {
1444                // Need to merge: schema columns + overflow_json + L0 overlay
1445                let col = build_all_props_column_for_schema_scan(
1446                    batch,
1447                    vid_arr,
1448                    overflow_arr,
1449                    projected_properties,
1450                    l0_ctx,
1451                );
1452                columns.push(col);
1453            }
1454        } else {
1455            match batch.column_by_name(prop) {
1456                Some(col) => columns.push(col.clone()),
1457                None => {
1458                    // Column missing in Lance -- extract from overflow_json
1459                    // CypherValue blob with L0 overlay
1460                    let col = build_overflow_property_column(
1461                        batch.num_rows(),
1462                        vid_arr,
1463                        overflow_arr,
1464                        prop,
1465                        l0_ctx,
1466                    );
1467                    columns.push(col);
1468                }
1469            }
1470        }
1471    }
1472
1473    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1474}
1475
1476/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1477///
1478/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1479/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1480/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1481/// `_version` are dropped.
1482fn map_edge_to_output_schema(
1483    batch: &RecordBatch,
1484    variable: &str,
1485    projected_properties: &[String],
1486    output_schema: &SchemaRef,
1487) -> DFResult<RecordBatch> {
1488    if batch.num_rows() == 0 {
1489        return Ok(RecordBatch::new_empty(output_schema.clone()));
1490    }
1491
1492    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1493
1494    // 1. {var}._eid
1495    let eid_col = batch
1496        .column_by_name("eid")
1497        .ok_or_else(|| {
1498            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1499        })?
1500        .clone();
1501    columns.push(eid_col);
1502
1503    // 2. {var}._src_vid
1504    let src_col = batch
1505        .column_by_name("src_vid")
1506        .ok_or_else(|| {
1507            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1508        })?
1509        .clone();
1510    columns.push(src_col);
1511
1512    // 3. {var}._dst_vid
1513    let dst_col = batch
1514        .column_by_name("dst_vid")
1515        .ok_or_else(|| {
1516            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1517        })?
1518        .clone();
1519    columns.push(dst_col);
1520
1521    // 4. Projected properties
1522    for prop in projected_properties {
1523        if prop == "overflow_json" {
1524            match batch.column_by_name("overflow_json") {
1525                Some(col) => columns.push(col.clone()),
1526                None => {
1527                    columns.push(arrow_array::new_null_array(
1528                        &DataType::LargeBinary,
1529                        batch.num_rows(),
1530                    ));
1531                }
1532            }
1533        } else {
1534            match batch.column_by_name(prop) {
1535                Some(col) => columns.push(col.clone()),
1536                None => {
1537                    // Column missing in Lance — extract from overflow_json CypherValue blob
1538                    // (mirrors the vertex path in map_to_output_schema)
1539                    let overflow_arr = batch
1540                        .column_by_name("overflow_json")
1541                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1542
1543                    if let Some(arr) = overflow_arr {
1544                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1545                        for i in 0..batch.num_rows() {
1546                            if !arr.is_null(i) {
1547                                let blob = arr.value(i);
1548                                // Fast path: extract map entry without decoding entire map
1549                                if let Some(sub_bytes) =
1550                                    uni_common::cypher_value_codec::extract_map_entry_raw(
1551                                        blob, prop,
1552                                    )
1553                                {
1554                                    builder.append_value(&sub_bytes);
1555                                } else {
1556                                    builder.append_null();
1557                                }
1558                            } else {
1559                                builder.append_null();
1560                            }
1561                        }
1562                        columns.push(Arc::new(builder.finish()));
1563                    } else {
1564                        // No overflow_json column either — return null column
1565                        let target_field = output_schema
1566                            .fields()
1567                            .iter()
1568                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
1569                        let dt = target_field
1570                            .map(|f| f.data_type().clone())
1571                            .unwrap_or(DataType::LargeBinary);
1572                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1573                    }
1574                }
1575            }
1576        }
1577    }
1578
1579    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1580}
1581
1582/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
1583///
1584/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
1585/// for known-label vertex scans. Reads all needed columns in a single Lance query,
1586/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
1587/// and maps to the output schema.
1588async fn columnar_scan_vertex_batch_static(
1589    graph_ctx: &GraphExecutionContext,
1590    label: &str,
1591    variable: &str,
1592    projected_properties: &[String],
1593    output_schema: &SchemaRef,
1594) -> DFResult<RecordBatch> {
1595    let storage = graph_ctx.storage();
1596    let l0_ctx = graph_ctx.l0_context();
1597    let uni_schema = storage.schema_manager().schema();
1598    let label_props = uni_schema.properties.get(label);
1599
1600    // Build the list of columns to request from Lance
1601    let mut lance_columns: Vec<String> = vec![
1602        "_vid".to_string(),
1603        "_deleted".to_string(),
1604        "_version".to_string(),
1605    ];
1606    for prop in projected_properties {
1607        if prop == "overflow_json" {
1608            push_column_if_absent(&mut lance_columns, "overflow_json");
1609        } else {
1610            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1611            if exists_in_schema {
1612                push_column_if_absent(&mut lance_columns, prop);
1613            }
1614        }
1615    }
1616
1617    // Ensure overflow_json is present when any projected property is not in the schema
1618    let needs_overflow = projected_properties
1619        .iter()
1620        .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1621    if needs_overflow {
1622        push_column_if_absent(&mut lance_columns, "overflow_json");
1623    }
1624
1625    // Try to query Lance via StorageManager domain method
1626    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1627    let lance_batch = storage
1628        .scan_vertex_table(label, &lance_columns_refs, None)
1629        .await
1630        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1631
1632    // MVCC dedup the Lance batch
1633    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1634
1635    // Build the internal Lance schema for L0 batch construction.
1636    // Use the Lance batch schema if available, otherwise build from scratch.
1637    let internal_schema = match &lance_deduped {
1638        Some(batch) => batch.schema(),
1639        None => {
1640            let mut fields = vec![
1641                Field::new("_vid", DataType::UInt64, false),
1642                Field::new("_deleted", DataType::Boolean, false),
1643                Field::new("_version", DataType::UInt64, false),
1644            ];
1645            for col in &lance_columns {
1646                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1647                    continue;
1648                }
1649                if col == "overflow_json" {
1650                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1651                } else {
1652                    let arrow_type = label_props
1653                        .and_then(|lp| lp.get(col.as_str()))
1654                        .map(|meta| meta.r#type.to_arrow())
1655                        .unwrap_or(DataType::LargeBinary);
1656                    fields.push(Field::new(col, arrow_type, true));
1657                }
1658            }
1659            Arc::new(Schema::new(fields))
1660        }
1661    };
1662
1663    // Build L0 batch
1664    let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1665
1666    // Merge Lance + L0
1667    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1668    else {
1669        return Ok(RecordBatch::new_empty(output_schema.clone()));
1670    };
1671
1672    // Filter out MVCC deletion tombstones (_deleted = true)
1673    let merged = filter_deleted_rows(&merged)?;
1674    if merged.num_rows() == 0 {
1675        return Ok(RecordBatch::new_empty(output_schema.clone()));
1676    }
1677
1678    // Filter L0 tombstones
1679    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1680
1681    if filtered.num_rows() == 0 {
1682        return Ok(RecordBatch::new_empty(output_schema.clone()));
1683    }
1684
1685    // Map to output schema
1686    map_to_output_schema(
1687        &filtered,
1688        label,
1689        variable,
1690        projected_properties,
1691        output_schema,
1692        l0_ctx,
1693    )
1694}
1695
1696/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
1697///
1698/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
1699/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
1700/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
1701/// to the output schema.
1702async fn columnar_scan_edge_batch_static(
1703    graph_ctx: &GraphExecutionContext,
1704    edge_type: &str,
1705    variable: &str,
1706    projected_properties: &[String],
1707    output_schema: &SchemaRef,
1708) -> DFResult<RecordBatch> {
1709    let storage = graph_ctx.storage();
1710    let l0_ctx = graph_ctx.l0_context();
1711    let uni_schema = storage.schema_manager().schema();
1712    let type_props = uni_schema.properties.get(edge_type);
1713
1714    // Build the list of columns to request from DeltaDataset Lance table
1715    let mut lance_columns: Vec<String> = vec![
1716        "eid".to_string(),
1717        "src_vid".to_string(),
1718        "dst_vid".to_string(),
1719        "op".to_string(),
1720        "_version".to_string(),
1721    ];
1722    for prop in projected_properties {
1723        if prop == "overflow_json" {
1724            push_column_if_absent(&mut lance_columns, "overflow_json");
1725        } else {
1726            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1727            if exists_in_schema {
1728                push_column_if_absent(&mut lance_columns, prop);
1729            }
1730        }
1731    }
1732
1733    // Ensure overflow_json is present when any projected property is not in the schema
1734    let needs_overflow = projected_properties
1735        .iter()
1736        .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1737    if needs_overflow {
1738        push_column_if_absent(&mut lance_columns, "overflow_json");
1739    }
1740
1741    // Try to query DeltaDataset (forward direction) via StorageManager domain method
1742    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1743    let lance_batch = storage
1744        .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
1745        .await
1746        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1747
1748    // MVCC dedup the Lance batch (by eid)
1749    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1750
1751    // Build the internal schema for L0 batch construction.
1752    // Use the Lance batch schema if available, otherwise build from scratch.
1753    let internal_schema = match &lance_deduped {
1754        Some(batch) => batch.schema(),
1755        None => {
1756            let mut fields = vec![
1757                Field::new("eid", DataType::UInt64, false),
1758                Field::new("src_vid", DataType::UInt64, false),
1759                Field::new("dst_vid", DataType::UInt64, false),
1760                Field::new("op", DataType::UInt8, false),
1761                Field::new("_version", DataType::UInt64, false),
1762            ];
1763            for col in &lance_columns {
1764                if matches!(
1765                    col.as_str(),
1766                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1767                ) {
1768                    continue;
1769                }
1770                if col == "overflow_json" {
1771                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1772                } else {
1773                    let arrow_type = type_props
1774                        .and_then(|tp| tp.get(col.as_str()))
1775                        .map(|meta| meta.r#type.to_arrow())
1776                        .unwrap_or(DataType::LargeBinary);
1777                    fields.push(Field::new(col, arrow_type, true));
1778                }
1779            }
1780            Arc::new(Schema::new(fields))
1781        }
1782    };
1783
1784    // Build L0 batch
1785    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1786
1787    // Merge Lance + L0
1788    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1789        return Ok(RecordBatch::new_empty(output_schema.clone()));
1790    };
1791
1792    // Filter out MVCC deletion ops (op != 0) after dedup
1793    let merged = filter_deleted_edge_ops(&merged)?;
1794    if merged.num_rows() == 0 {
1795        return Ok(RecordBatch::new_empty(output_schema.clone()));
1796    }
1797
1798    // Filter L0 edge tombstones
1799    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1800
1801    if filtered.num_rows() == 0 {
1802        return Ok(RecordBatch::new_empty(output_schema.clone()));
1803    }
1804
1805    // Map to output schema
1806    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1807}
1808
1809/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
1810///
1811/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
1812/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
1813/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
1814/// buffer data, filters tombstones, and maps to the output schema.
1815async fn columnar_scan_schemaless_vertex_batch_static(
1816    graph_ctx: &GraphExecutionContext,
1817    label: &str,
1818    variable: &str,
1819    projected_properties: &[String],
1820    output_schema: &SchemaRef,
1821) -> DFResult<RecordBatch> {
1822    let storage = graph_ctx.storage();
1823    let l0_ctx = graph_ctx.l0_context();
1824
1825    // Build the Lance filter expression — do NOT filter _deleted here;
1826    // MVCC dedup must see deletion tombstones to pick the highest version.
1827    let filter = {
1828        let mut parts = Vec::new();
1829
1830        // Label filter
1831        if !label.is_empty() {
1832            if label.contains(':') {
1833                // Multi-label: each label must be present
1834                for lbl in label.split(':') {
1835                    parts.push(format!("array_contains(labels, '{}')", lbl));
1836                }
1837            } else {
1838                parts.push(format!("array_contains(labels, '{}')", label));
1839            }
1840        }
1841
1842        if parts.is_empty() {
1843            None
1844        } else {
1845            Some(parts.join(" AND "))
1846        }
1847    };
1848
1849    // Single Lance query via StorageManager domain method
1850    let lance_batch = storage
1851        .scan_main_vertex_table(
1852            &["_vid", "_deleted", "labels", "props_json", "_version"],
1853            filter.as_deref(),
1854        )
1855        .await
1856        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1857
1858    // MVCC dedup the Lance batch
1859    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1860
1861    // Build the internal schema for L0 batch construction.
1862    // Use the Lance batch schema if available, otherwise build from scratch.
1863    let internal_schema = match &lance_deduped {
1864        Some(batch) => batch.schema(),
1865        None => Arc::new(Schema::new(vec![
1866            Field::new("_vid", DataType::UInt64, false),
1867            Field::new("_deleted", DataType::Boolean, false),
1868            Field::new("labels", labels_data_type(), false),
1869            Field::new("props_json", DataType::LargeBinary, true),
1870            Field::new("_version", DataType::UInt64, false),
1871        ])),
1872    };
1873
1874    // Build L0 batch
1875    let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1876
1877    // Merge Lance + L0
1878    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1879    else {
1880        return Ok(RecordBatch::new_empty(output_schema.clone()));
1881    };
1882
1883    // Filter out MVCC deletion tombstones (_deleted = true)
1884    let merged = filter_deleted_rows(&merged)?;
1885    if merged.num_rows() == 0 {
1886        return Ok(RecordBatch::new_empty(output_schema.clone()));
1887    }
1888
1889    // Filter L0 tombstones
1890    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1891
1892    if filtered.num_rows() == 0 {
1893        return Ok(RecordBatch::new_empty(output_schema.clone()));
1894    }
1895
1896    // Map to output schema
1897    map_to_schemaless_output_schema(
1898        &filtered,
1899        variable,
1900        projected_properties,
1901        output_schema,
1902        l0_ctx,
1903    )
1904}
1905
1906/// Build a RecordBatch from L0 buffer data for schemaless vertices.
1907///
1908/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1909/// with later buffers overwriting earlier ones for the same VID. Produces a batch
1910/// matching the internal schema: `_vid, labels, props_json, _version`.
1911fn build_l0_schemaless_vertex_batch(
1912    l0_ctx: &crate::query::df_graph::L0Context,
1913    label: &str,
1914    internal_schema: &SchemaRef,
1915) -> DFResult<RecordBatch> {
1916    // Collect all L0 vertex data, merging in visibility order
1917    // vid -> (merged_props, highest_version, labels)
1918    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1919    let mut tombstones: HashSet<u64> = HashSet::new();
1920
1921    // Parse multi-label filter
1922    let label_filter: Vec<&str> = if label.is_empty() {
1923        vec![]
1924    } else if label.contains(':') {
1925        label.split(':').collect()
1926    } else {
1927        vec![label]
1928    };
1929
1930    for l0 in l0_ctx.iter_l0_buffers() {
1931        let guard = l0.read();
1932
1933        // Collect tombstones
1934        for vid in guard.vertex_tombstones.iter() {
1935            tombstones.insert(vid.as_u64());
1936        }
1937
1938        // Collect VIDs matching the label filter
1939        let vids: Vec<Vid> = if label_filter.is_empty() {
1940            guard.all_vertex_vids()
1941        } else if label_filter.len() == 1 {
1942            guard.vids_for_label(label_filter[0])
1943        } else {
1944            guard.vids_with_all_labels(&label_filter)
1945        };
1946
1947        for vid in vids {
1948            let vid_u64 = vid.as_u64();
1949            if tombstones.contains(&vid_u64) {
1950                continue;
1951            }
1952            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1953            let entry = vid_data
1954                .entry(vid_u64)
1955                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1956
1957            // Merge properties (later L0 overwrites)
1958            if let Some(props) = guard.vertex_properties.get(&vid) {
1959                for (k, v) in props {
1960                    entry.0.insert(k.clone(), v.clone());
1961                }
1962            }
1963            // Take the highest version
1964            if version > entry.1 {
1965                entry.1 = version;
1966            }
1967            // Update labels from latest L0 layer
1968            if let Some(labels) = guard.vertex_labels.get(&vid) {
1969                entry.2 = labels.clone();
1970            }
1971        }
1972    }
1973
1974    // Remove tombstoned VIDs
1975    for t in &tombstones {
1976        vid_data.remove(t);
1977    }
1978
1979    if vid_data.is_empty() {
1980        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1981    }
1982
1983    // Sort VIDs for deterministic output
1984    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1985    vids.sort_unstable();
1986
1987    let num_rows = vids.len();
1988    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1989
1990    for field in internal_schema.fields() {
1991        match field.name().as_str() {
1992            "_vid" => {
1993                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1994            }
1995            "labels" => {
1996                let mut labels_builder = ListBuilder::new(StringBuilder::new());
1997                for vid_u64 in &vids {
1998                    let (_, _, labels) = &vid_data[vid_u64];
1999                    let values = labels_builder.values();
2000                    for lbl in labels {
2001                        values.append_value(lbl);
2002                    }
2003                    labels_builder.append(true);
2004                }
2005                columns.push(Arc::new(labels_builder.finish()));
2006            }
2007            "props_json" => {
2008                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2009                for vid_u64 in &vids {
2010                    let (props, _, _) = &vid_data[vid_u64];
2011                    if props.is_empty() {
2012                        builder.append_null();
2013                    } else {
2014                        // Encode properties as CypherValue blob
2015                        let json_obj: serde_json::Value = {
2016                            let mut map = serde_json::Map::new();
2017                            for (k, v) in props {
2018                                let json_val: serde_json::Value = v.clone().into();
2019                                map.insert(k.clone(), json_val);
2020                            }
2021                            serde_json::Value::Object(map)
2022                        };
2023                        match encode_cypher_value(&json_obj) {
2024                            Ok(bytes) => builder.append_value(bytes),
2025                            Err(_) => builder.append_null(),
2026                        }
2027                    }
2028                }
2029                columns.push(Arc::new(builder.finish()));
2030            }
2031            "_deleted" => {
2032                // L0 vertices are always live (tombstoned ones already excluded)
2033                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2034                    false;
2035                    num_rows
2036                ])));
2037            }
2038            "_version" => {
2039                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2040                columns.push(Arc::new(UInt64Array::from(vals)));
2041            }
2042            _ => {
2043                // Unexpected column — fill with nulls
2044                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2045            }
2046        }
2047    }
2048
2049    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2050}
2051
2052/// Map an internal-schema schemaless batch to the DataFusion output schema.
2053///
2054/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2055/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2056/// Individual properties are extracted from the `props_json` CypherValue blob by
2057/// decoding to a Map and extracting the sub-value.
2058fn map_to_schemaless_output_schema(
2059    batch: &RecordBatch,
2060    _variable: &str,
2061    projected_properties: &[String],
2062    output_schema: &SchemaRef,
2063    l0_ctx: &crate::query::df_graph::L0Context,
2064) -> DFResult<RecordBatch> {
2065    if batch.num_rows() == 0 {
2066        return Ok(RecordBatch::new_empty(output_schema.clone()));
2067    }
2068
2069    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2070
2071    // 1. {var}._vid — passthrough
2072    let vid_col = batch
2073        .column_by_name("_vid")
2074        .ok_or_else(|| {
2075            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2076        })?
2077        .clone();
2078    let vid_arr = vid_col
2079        .as_any()
2080        .downcast_ref::<UInt64Array>()
2081        .ok_or_else(|| {
2082            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2083        })?;
2084    columns.push(vid_col.clone());
2085
2086    // 2. {var}._labels — from labels column with L0 overlay
2087    let labels_col = batch.column_by_name("labels");
2088    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2089
2090    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2091    for i in 0..vid_arr.len() {
2092        let vid_u64 = vid_arr.value(i);
2093        let vid = Vid::from(vid_u64);
2094
2095        // Start with labels from the batch
2096        let mut row_labels: Vec<String> = Vec::new();
2097        if let Some(arr) = labels_arr
2098            && !arr.is_null(i)
2099        {
2100            let list_val = arr.value(i);
2101            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2102                for j in 0..str_arr.len() {
2103                    if !str_arr.is_null(j) {
2104                        row_labels.push(str_arr.value(j).to_string());
2105                    }
2106                }
2107            }
2108        }
2109
2110        // Overlay L0 labels
2111        for l0 in l0_ctx.iter_l0_buffers() {
2112            let guard = l0.read();
2113            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2114                for lbl in l0_labels {
2115                    if !row_labels.contains(lbl) {
2116                        row_labels.push(lbl.clone());
2117                    }
2118                }
2119            }
2120        }
2121
2122        let values = labels_builder.values();
2123        for lbl in &row_labels {
2124            values.append_value(lbl);
2125        }
2126        labels_builder.append(true);
2127    }
2128    columns.push(Arc::new(labels_builder.finish()));
2129
2130    // 3. Projected properties — extract from props_json
2131    let props_col = batch.column_by_name("props_json");
2132    let props_arr =
2133        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2134
2135    for prop in projected_properties {
2136        if prop == "_all_props" {
2137            // Fast path: if no L0 buffer has vertex property mutations,
2138            // the raw props_json passthrough is correct.
2139            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2140                let guard = l0.read();
2141                !guard.vertex_properties.is_empty()
2142            });
2143            if !any_l0_has_vertex_props {
2144                match props_col {
2145                    Some(col) => columns.push(col.clone()),
2146                    None => {
2147                        columns.push(arrow_array::new_null_array(
2148                            &DataType::LargeBinary,
2149                            batch.num_rows(),
2150                        ));
2151                    }
2152                }
2153            } else {
2154                let col = build_all_props_column_with_l0_overlay(
2155                    batch.num_rows(),
2156                    vid_arr,
2157                    props_arr,
2158                    l0_ctx,
2159                );
2160                columns.push(col);
2161            }
2162        } else {
2163            // Extract individual property from CypherValue blob with L0 overlay.
2164            // The raw column is LargeBinary (CypherValue-encoded). If the output
2165            // schema expects a typed column (e.g., Utf8 for String properties),
2166            // decode the CypherValue and build the correct Arrow type.
2167            let expected_type = output_schema
2168                .field_with_name(&format!("{_variable}.{prop}"))
2169                .map(|f| f.data_type().clone())
2170                .unwrap_or(DataType::LargeBinary);
2171
2172            if expected_type == DataType::LargeBinary {
2173                let col = build_overflow_property_column(
2174                    batch.num_rows(),
2175                    vid_arr,
2176                    props_arr,
2177                    prop,
2178                    l0_ctx,
2179                );
2180                columns.push(col);
2181            } else {
2182                // Decode CypherValue to the expected type via build_property_column_static.
2183                let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2184                for i in 0..batch.num_rows() {
2185                    let vid = Vid::from(vid_arr.value(i));
2186                    let resolved =
2187                        resolve_l0_property(&vid, prop, l0_ctx)
2188                            .flatten()
2189                            .or_else(|| {
2190                                extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2191                                    uni_common::cypher_value_codec::decode(&bytes).ok()
2192                                })
2193                            });
2194                    if let Some(val) = resolved {
2195                        prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2196                    }
2197                }
2198                let vids: Vec<Vid> = (0..batch.num_rows())
2199                    .map(|i| Vid::from(vid_arr.value(i)))
2200                    .collect();
2201                let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2202                    .unwrap_or_else(|_| {
2203                        arrow_array::new_null_array(&expected_type, batch.num_rows())
2204                    });
2205                columns.push(col);
2206            }
2207        }
2208    }
2209
2210    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2211}
2212
2213/// Get the property value for a VID, returning None if not found.
2214pub(crate) fn get_property_value(
2215    vid: &Vid,
2216    props_map: &HashMap<Vid, Properties>,
2217    prop_name: &str,
2218) -> Option<Value> {
2219    if prop_name == "_all_props" {
2220        return props_map.get(vid).map(|p| {
2221            let map: HashMap<String, Value> =
2222                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2223            Value::Map(map)
2224        });
2225    }
2226    props_map
2227        .get(vid)
2228        .and_then(|props| props.get(prop_name))
2229        .cloned()
2230}
2231
2232/// Encode a `serde_json::Value` as CypherValue binary (MessagePack-tagged).
2233///
2234/// Converts from serde_json::Value -> uni_common::Value -> CypherValue bytes.
2235pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2236    let uni_val: uni_common::Value = val.clone().into();
2237    Ok(uni_common::cypher_value_codec::encode(&uni_val))
2238}
2239
2240/// Build a numeric column from property values using the specified builder and extractor.
2241macro_rules! build_numeric_column {
2242    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2243        let mut builder = <$builder_ty>::new();
2244        for vid in $vids {
2245            match get_property_value(vid, $props_map, $prop_name) {
2246                Some(ref v) => {
2247                    if let Some(val) = $extractor(v) {
2248                        builder.append_value($cast(val));
2249                    } else {
2250                        builder.append_null();
2251                    }
2252                }
2253                None => builder.append_null(),
2254            }
2255        }
2256        Ok(Arc::new(builder.finish()) as ArrayRef)
2257    }};
2258}
2259
2260/// Build an Arrow column from property values (static version).
2261pub(crate) fn build_property_column_static(
2262    vids: &[Vid],
2263    props_map: &HashMap<Vid, Properties>,
2264    prop_name: &str,
2265    data_type: &DataType,
2266) -> DFResult<ArrayRef> {
2267    match data_type {
2268        DataType::LargeBinary => {
2269            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2270            use arrow_array::builder::LargeBinaryBuilder;
2271            let mut builder = LargeBinaryBuilder::new();
2272
2273            for vid in vids {
2274                match get_property_value(vid, props_map, prop_name) {
2275                    Some(Value::Null) | None => builder.append_null(),
2276                    Some(Value::Bytes(bytes)) => {
2277                        builder.append_value(&bytes);
2278                    }
2279                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2280                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2281                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2282                        let bytes: Vec<u8> = arr
2283                            .iter()
2284                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2285                            .collect();
2286                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2287                            builder.append_value(&bytes);
2288                        } else {
2289                            let json_val: serde_json::Value = Value::List(arr).into();
2290                            match encode_cypher_value(&json_val) {
2291                                Ok(encoded) => builder.append_value(encoded),
2292                                Err(_) => builder.append_null(),
2293                            }
2294                        }
2295                    }
2296                    Some(val @ Value::Temporal(_)) => {
2297                        // Temporal values (including BTIC) must be encoded directly
2298                        // via CypherValue codec — serde_json round-trip loses the type.
2299                        builder.append_value(uni_common::cypher_value_codec::encode(&val));
2300                    }
2301                    Some(val) => {
2302                        // Value from PropertyManager — convert to serde_json and re-encode to CypherValue binary
2303                        let json_val: serde_json::Value = val.into();
2304                        match encode_cypher_value(&json_val) {
2305                            Ok(bytes) => builder.append_value(bytes),
2306                            Err(_) => builder.append_null(),
2307                        }
2308                    }
2309                }
2310            }
2311            Ok(Arc::new(builder.finish()))
2312        }
2313        DataType::Binary => {
2314            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2315            let mut builder = BinaryBuilder::new();
2316            for vid in vids {
2317                let bytes = get_property_value(vid, props_map, prop_name)
2318                    .filter(|v| !v.is_null())
2319                    .and_then(|v| {
2320                        let json_val: serde_json::Value = v.into();
2321                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2322                    })
2323                    .and_then(|crdt| crdt.to_msgpack().ok());
2324                match bytes {
2325                    Some(b) => builder.append_value(&b),
2326                    None => builder.append_null(),
2327                }
2328            }
2329            Ok(Arc::new(builder.finish()))
2330        }
2331        DataType::Utf8 => {
2332            let mut builder = StringBuilder::new();
2333            for vid in vids {
2334                match get_property_value(vid, props_map, prop_name) {
2335                    Some(Value::String(s)) => builder.append_value(s),
2336                    Some(Value::Null) | None => builder.append_null(),
2337                    Some(other) => builder.append_value(other.to_string()),
2338                }
2339            }
2340            Ok(Arc::new(builder.finish()))
2341        }
2342        DataType::Int64 => {
2343            build_numeric_column!(
2344                vids,
2345                props_map,
2346                prop_name,
2347                Int64Builder,
2348                |v: &Value| v.as_i64(),
2349                |v| v
2350            )
2351        }
2352        DataType::Int32 => {
2353            build_numeric_column!(
2354                vids,
2355                props_map,
2356                prop_name,
2357                Int32Builder,
2358                |v: &Value| v.as_i64(),
2359                |v: i64| v as i32
2360            )
2361        }
2362        DataType::Float64 => {
2363            build_numeric_column!(
2364                vids,
2365                props_map,
2366                prop_name,
2367                Float64Builder,
2368                |v: &Value| v.as_f64(),
2369                |v| v
2370            )
2371        }
2372        DataType::Float32 => {
2373            build_numeric_column!(
2374                vids,
2375                props_map,
2376                prop_name,
2377                Float32Builder,
2378                |v: &Value| v.as_f64(),
2379                |v: f64| v as f32
2380            )
2381        }
2382        DataType::Boolean => {
2383            let mut builder = BooleanBuilder::new();
2384            for vid in vids {
2385                match get_property_value(vid, props_map, prop_name) {
2386                    Some(Value::Bool(b)) => builder.append_value(b),
2387                    _ => builder.append_null(),
2388                }
2389            }
2390            Ok(Arc::new(builder.finish()))
2391        }
2392        DataType::UInt64 => {
2393            build_numeric_column!(
2394                vids,
2395                props_map,
2396                prop_name,
2397                UInt64Builder,
2398                |v: &Value| v.as_u64(),
2399                |v| v
2400            )
2401        }
2402        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2403            // Vector properties: FixedSizeList(Float32, N)
2404            let values_builder = Float32Builder::new();
2405            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2406            for vid in vids {
2407                match get_property_value(vid, props_map, prop_name) {
2408                    Some(Value::Vector(v)) => {
2409                        for val in v {
2410                            list_builder.values().append_value(val);
2411                        }
2412                        list_builder.append(true);
2413                    }
2414                    Some(Value::List(arr)) => {
2415                        for v in arr {
2416                            list_builder
2417                                .values()
2418                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
2419                        }
2420                        list_builder.append(true);
2421                    }
2422                    _ => {
2423                        // Append dim nulls to inner values, then mark row as null
2424                        for _ in 0..*dim {
2425                            list_builder.values().append_null();
2426                        }
2427                        list_builder.append(false);
2428                    }
2429                }
2430            }
2431            Ok(Arc::new(list_builder.finish()))
2432        }
2433        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2434            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
2435            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2436            for vid in vids {
2437                match get_property_value(vid, props_map, prop_name) {
2438                    Some(Value::Temporal(tv)) => match tv {
2439                        uni_common::TemporalValue::DateTime {
2440                            nanos_since_epoch, ..
2441                        }
2442                        | uni_common::TemporalValue::LocalDateTime {
2443                            nanos_since_epoch, ..
2444                        } => {
2445                            builder.append_value(nanos_since_epoch);
2446                        }
2447                        uni_common::TemporalValue::Date { days_since_epoch } => {
2448                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2449                        }
2450                        _ => builder.append_null(),
2451                    },
2452                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
2453                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2454                        Err(_) => builder.append_null(),
2455                    },
2456                    Some(Value::Int(n)) => {
2457                        builder.append_value(n);
2458                    }
2459                    _ => builder.append_null(),
2460                }
2461            }
2462            Ok(Arc::new(builder.finish()))
2463        }
2464        DataType::Date32 => {
2465            let mut builder = Date32Builder::new();
2466            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2467            for vid in vids {
2468                match get_property_value(vid, props_map, prop_name) {
2469                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2470                        builder.append_value(days_since_epoch);
2471                    }
2472                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2473                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2474                        Err(_) => builder.append_null(),
2475                    },
2476                    Some(Value::Int(n)) => {
2477                        builder.append_value(n as i32);
2478                    }
2479                    _ => builder.append_null(),
2480                }
2481            }
2482            Ok(Arc::new(builder.finish()))
2483        }
2484        DataType::Time64(TimeUnit::Nanosecond) => {
2485            let mut builder = Time64NanosecondBuilder::new();
2486            for vid in vids {
2487                match get_property_value(vid, props_map, prop_name) {
2488                    Some(Value::Temporal(
2489                        uni_common::TemporalValue::LocalTime {
2490                            nanos_since_midnight,
2491                        }
2492                        | uni_common::TemporalValue::Time {
2493                            nanos_since_midnight,
2494                            ..
2495                        },
2496                    )) => {
2497                        builder.append_value(nanos_since_midnight);
2498                    }
2499                    Some(Value::Temporal(_)) => builder.append_null(),
2500                    Some(Value::String(s)) => {
2501                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2502                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2503                        {
2504                            Ok(t) => {
2505                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2506                                    + t.nanosecond() as i64;
2507                                builder.append_value(nanos);
2508                            }
2509                            Err(_) => builder.append_null(),
2510                        }
2511                    }
2512                    Some(Value::Int(n)) => {
2513                        builder.append_value(n);
2514                    }
2515                    _ => builder.append_null(),
2516                }
2517            }
2518            Ok(Arc::new(builder.finish()))
2519        }
2520        DataType::Interval(IntervalUnit::MonthDayNano) => {
2521            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2522                Vec::with_capacity(vids.len());
2523            for vid in vids {
2524                match get_property_value(vid, props_map, prop_name) {
2525                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
2526                        months,
2527                        days,
2528                        nanos,
2529                    })) => {
2530                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2531                            months: months as i32,
2532                            days: days as i32,
2533                            nanoseconds: nanos,
2534                        }));
2535                    }
2536                    Some(Value::Int(_n)) => {
2537                        values.push(None);
2538                    }
2539                    _ => values.push(None),
2540                }
2541            }
2542            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2543            Ok(Arc::new(arr))
2544        }
2545        DataType::List(inner_field) => {
2546            build_list_property_column(vids, props_map, prop_name, inner_field)
2547        }
2548        DataType::Struct(fields) => {
2549            build_struct_property_column(vids, props_map, prop_name, fields)
2550        }
2551        DataType::FixedSizeBinary(24) => {
2552            // BTIC temporal interval columns: encode as FixedSizeBinary(24)
2553            use arrow_array::builder::FixedSizeBinaryBuilder;
2554            const BTIC_LEN: i32 = 24;
2555            let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
2556            for vid in vids {
2557                match get_property_value(vid, props_map, prop_name) {
2558                    Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
2559                        match uni_btic::Btic::new(lo, hi, meta) {
2560                            Ok(b) => {
2561                                builder
2562                                    .append_value(uni_btic::encode::encode(&b))
2563                                    .map_err(arrow_err)?;
2564                            }
2565                            Err(e) => {
2566                                tracing::warn!(
2567                                    "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
2568                                    prop_name,
2569                                    lo,
2570                                    hi,
2571                                    meta,
2572                                    e
2573                                );
2574                                builder.append_null()
2575                            }
2576                        }
2577                    }
2578                    Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
2579                        Ok(b) => {
2580                            builder
2581                                .append_value(uni_btic::encode::encode(&b))
2582                                .map_err(arrow_err)?;
2583                        }
2584                        Err(e) => {
2585                            tracing::warn!(
2586                                "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
2587                                prop_name,
2588                                s,
2589                                e
2590                            );
2591                            builder.append_null()
2592                        }
2593                    },
2594                    _ => builder.append_null(),
2595                }
2596            }
2597            Ok(Arc::new(builder.finish()))
2598        }
2599        // Default: convert to string
2600        _ => {
2601            let mut builder = StringBuilder::new();
2602            for vid in vids {
2603                match get_property_value(vid, props_map, prop_name) {
2604                    Some(Value::Null) | None => builder.append_null(),
2605                    Some(other) => builder.append_value(other.to_string()),
2606                }
2607            }
2608            Ok(Arc::new(builder.finish()))
2609        }
2610    }
2611}
2612
2613/// Build a List-typed Arrow column from list property values.
2614fn build_list_property_column(
2615    vids: &[Vid],
2616    props_map: &HashMap<Vid, Properties>,
2617    prop_name: &str,
2618    inner_field: &Arc<Field>,
2619) -> DFResult<ArrayRef> {
2620    match inner_field.data_type() {
2621        DataType::Utf8 => {
2622            let mut builder = ListBuilder::new(StringBuilder::new());
2623            for vid in vids {
2624                match get_property_value(vid, props_map, prop_name) {
2625                    Some(Value::List(arr)) => {
2626                        for v in arr {
2627                            match v {
2628                                Value::String(s) => builder.values().append_value(s),
2629                                Value::Null => builder.values().append_null(),
2630                                other => builder.values().append_value(format!("{other:?}")),
2631                            }
2632                        }
2633                        builder.append(true);
2634                    }
2635                    _ => builder.append(false),
2636                }
2637            }
2638            Ok(Arc::new(builder.finish()))
2639        }
2640        DataType::Int64 => {
2641            let mut builder = ListBuilder::new(Int64Builder::new());
2642            for vid in vids {
2643                match get_property_value(vid, props_map, prop_name) {
2644                    Some(Value::List(arr)) => {
2645                        for v in arr {
2646                            match v.as_i64() {
2647                                Some(n) => builder.values().append_value(n),
2648                                None => builder.values().append_null(),
2649                            }
2650                        }
2651                        builder.append(true);
2652                    }
2653                    _ => builder.append(false),
2654                }
2655            }
2656            Ok(Arc::new(builder.finish()))
2657        }
2658        DataType::Float64 => {
2659            let mut builder = ListBuilder::new(Float64Builder::new());
2660            for vid in vids {
2661                match get_property_value(vid, props_map, prop_name) {
2662                    Some(Value::List(arr)) => {
2663                        for v in arr {
2664                            match v.as_f64() {
2665                                Some(n) => builder.values().append_value(n),
2666                                None => builder.values().append_null(),
2667                            }
2668                        }
2669                        builder.append(true);
2670                    }
2671                    _ => builder.append(false),
2672                }
2673            }
2674            Ok(Arc::new(builder.finish()))
2675        }
2676        DataType::Boolean => {
2677            let mut builder = ListBuilder::new(BooleanBuilder::new());
2678            for vid in vids {
2679                match get_property_value(vid, props_map, prop_name) {
2680                    Some(Value::List(arr)) => {
2681                        for v in arr {
2682                            match v.as_bool() {
2683                                Some(b) => builder.values().append_value(b),
2684                                None => builder.values().append_null(),
2685                            }
2686                        }
2687                        builder.append(true);
2688                    }
2689                    _ => builder.append(false),
2690                }
2691            }
2692            Ok(Arc::new(builder.finish()))
2693        }
2694        DataType::Struct(fields) => {
2695            // Map types are List(Struct(key, value)) — build struct inner elements
2696            build_list_of_structs_column(vids, props_map, prop_name, fields)
2697        }
2698        // Fallback: serialize inner elements as strings
2699        _ => {
2700            let mut builder = ListBuilder::new(StringBuilder::new());
2701            for vid in vids {
2702                match get_property_value(vid, props_map, prop_name) {
2703                    Some(Value::List(arr)) => {
2704                        for v in arr {
2705                            match v {
2706                                Value::Null => builder.values().append_null(),
2707                                other => builder.values().append_value(format!("{other:?}")),
2708                            }
2709                        }
2710                        builder.append(true);
2711                    }
2712                    _ => builder.append(false),
2713                }
2714            }
2715            Ok(Arc::new(builder.finish()))
2716        }
2717    }
2718}
2719
2720/// Build a List(Struct(...)) column, used for Map-type properties.
2721///
2722/// Handles two value representations:
2723/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
2724/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
2725fn build_list_of_structs_column(
2726    vids: &[Vid],
2727    props_map: &HashMap<Vid, Properties>,
2728    prop_name: &str,
2729    fields: &Fields,
2730) -> DFResult<ArrayRef> {
2731    use arrow_array::StructArray;
2732
2733    let values: Vec<Option<Value>> = vids
2734        .iter()
2735        .map(|vid| get_property_value(vid, props_map, prop_name))
2736        .collect();
2737
2738    // Convert each row's value to an owned Vec of Maps (key-value pairs).
2739    // This normalizes both List-of-maps and Map representations.
2740    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2741        .iter()
2742        .map(|val| match val {
2743            Some(Value::List(arr)) => {
2744                let objs: Vec<HashMap<String, Value>> = arr
2745                    .iter()
2746                    .filter_map(|v| {
2747                        if let Value::Map(m) = v {
2748                            Some(m.clone())
2749                        } else {
2750                            None
2751                        }
2752                    })
2753                    .collect();
2754                if objs.is_empty() { None } else { Some(objs) }
2755            }
2756            Some(Value::Map(obj)) => {
2757                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
2758                let kv_pairs: Vec<HashMap<String, Value>> = obj
2759                    .iter()
2760                    .map(|(k, v)| {
2761                        let mut m = HashMap::new();
2762                        m.insert("key".to_string(), Value::String(k.clone()));
2763                        m.insert("value".to_string(), v.clone());
2764                        m
2765                    })
2766                    .collect();
2767                Some(kv_pairs)
2768            }
2769            _ => None,
2770        })
2771        .collect();
2772
2773    let total_items: usize = rows
2774        .iter()
2775        .filter_map(|r| r.as_ref())
2776        .map(|v| v.len())
2777        .sum();
2778
2779    // Build child arrays for each field in the struct
2780    let child_arrays: Vec<ArrayRef> = fields
2781        .iter()
2782        .map(|field| {
2783            let field_name = field.name();
2784            match field.data_type() {
2785                DataType::Utf8 => {
2786                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2787                    for obj in rows.iter().flatten().flatten() {
2788                        match obj.get(field_name) {
2789                            Some(Value::String(s)) => builder.append_value(s),
2790                            Some(Value::Null) | None => builder.append_null(),
2791                            Some(other) => builder.append_value(format!("{other:?}")),
2792                        }
2793                    }
2794                    Arc::new(builder.finish()) as ArrayRef
2795                }
2796                DataType::Int64 => {
2797                    let mut builder = Int64Builder::with_capacity(total_items);
2798                    for obj in rows.iter().flatten().flatten() {
2799                        match obj.get(field_name).and_then(|v| v.as_i64()) {
2800                            Some(n) => builder.append_value(n),
2801                            None => builder.append_null(),
2802                        }
2803                    }
2804                    Arc::new(builder.finish()) as ArrayRef
2805                }
2806                DataType::Float64 => {
2807                    let mut builder = Float64Builder::with_capacity(total_items);
2808                    for obj in rows.iter().flatten().flatten() {
2809                        match obj.get(field_name).and_then(|v| v.as_f64()) {
2810                            Some(n) => builder.append_value(n),
2811                            None => builder.append_null(),
2812                        }
2813                    }
2814                    Arc::new(builder.finish()) as ArrayRef
2815                }
2816                // Fallback: serialize as string
2817                _ => {
2818                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2819                    for obj in rows.iter().flatten().flatten() {
2820                        match obj.get(field_name) {
2821                            Some(Value::Null) | None => builder.append_null(),
2822                            Some(other) => builder.append_value(format!("{other:?}")),
2823                        }
2824                    }
2825                    Arc::new(builder.finish()) as ArrayRef
2826                }
2827            }
2828        })
2829        .collect();
2830
2831    // Build struct array from children
2832    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2833        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2834
2835    // Build list offsets
2836    let mut offsets = Vec::with_capacity(vids.len() + 1);
2837    let mut nulls = Vec::with_capacity(vids.len());
2838    let mut offset = 0i32;
2839    offsets.push(offset);
2840    for row in &rows {
2841        match row {
2842            Some(objs) => {
2843                offset += objs.len() as i32;
2844                offsets.push(offset);
2845                nulls.push(true);
2846            }
2847            None => {
2848                offsets.push(offset);
2849                nulls.push(false);
2850            }
2851        }
2852    }
2853
2854    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2855    let list_array = arrow_array::ListArray::try_new(
2856        list_field,
2857        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2858        Arc::new(struct_array),
2859        Some(arrow::buffer::NullBuffer::from(nulls)),
2860    )
2861    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2862
2863    Ok(Arc::new(list_array))
2864}
2865
2866/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
2867/// so that `build_struct_property_column` can extract fields uniformly.
2868fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2869    use uni_common::value::TemporalValue;
2870    let mut m = HashMap::new();
2871    match tv {
2872        TemporalValue::DateTime {
2873            nanos_since_epoch,
2874            offset_seconds,
2875            timezone_name,
2876        } => {
2877            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2878            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2879            if let Some(tz) = timezone_name {
2880                m.insert("timezone_name".into(), Value::String(tz.clone()));
2881            }
2882        }
2883        TemporalValue::LocalDateTime { nanos_since_epoch } => {
2884            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2885        }
2886        TemporalValue::Time {
2887            nanos_since_midnight,
2888            offset_seconds,
2889        } => {
2890            m.insert(
2891                "nanos_since_midnight".into(),
2892                Value::Int(*nanos_since_midnight),
2893            );
2894            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2895        }
2896        TemporalValue::LocalTime {
2897            nanos_since_midnight,
2898        } => {
2899            m.insert(
2900                "nanos_since_midnight".into(),
2901                Value::Int(*nanos_since_midnight),
2902            );
2903        }
2904        TemporalValue::Date { days_since_epoch } => {
2905            m.insert(
2906                "days_since_epoch".into(),
2907                Value::Int(*days_since_epoch as i64),
2908            );
2909        }
2910        TemporalValue::Duration {
2911            months,
2912            days,
2913            nanos,
2914        } => {
2915            m.insert("months".into(), Value::Int(*months));
2916            m.insert("days".into(), Value::Int(*days));
2917            m.insert("nanos".into(), Value::Int(*nanos));
2918        }
2919        TemporalValue::Btic { lo, hi, meta } => {
2920            m.insert("lo".into(), Value::Int(*lo));
2921            m.insert("hi".into(), Value::Int(*hi));
2922            m.insert("meta".into(), Value::Int(*meta as i64));
2923        }
2924    }
2925    m
2926}
2927
2928/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
2929fn build_struct_property_column(
2930    vids: &[Vid],
2931    props_map: &HashMap<Vid, Properties>,
2932    prop_name: &str,
2933    fields: &Fields,
2934) -> DFResult<ArrayRef> {
2935    use arrow_array::StructArray;
2936
2937    // Convert raw values, expanding Temporal values into Map representation
2938    // so the struct field extraction below works uniformly.
2939    let values: Vec<Option<Value>> = vids
2940        .iter()
2941        .map(|vid| {
2942            let val = get_property_value(vid, props_map, prop_name);
2943            match val {
2944                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2945                other => other,
2946            }
2947        })
2948        .collect();
2949
2950    let child_arrays: Vec<ArrayRef> = fields
2951        .iter()
2952        .map(|field| {
2953            let field_name = field.name();
2954            match field.data_type() {
2955                DataType::Float64 => {
2956                    let mut builder = Float64Builder::with_capacity(vids.len());
2957                    for val in &values {
2958                        match val {
2959                            Some(Value::Map(obj)) => {
2960                                match obj.get(field_name).and_then(|v| v.as_f64()) {
2961                                    Some(n) => builder.append_value(n),
2962                                    None => builder.append_null(),
2963                                }
2964                            }
2965                            _ => builder.append_null(),
2966                        }
2967                    }
2968                    Arc::new(builder.finish()) as ArrayRef
2969                }
2970                DataType::Utf8 => {
2971                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2972                    for val in &values {
2973                        match val {
2974                            Some(Value::Map(obj)) => match obj.get(field_name) {
2975                                Some(Value::String(s)) => builder.append_value(s),
2976                                Some(Value::Null) | None => builder.append_null(),
2977                                Some(other) => builder.append_value(format!("{other:?}")),
2978                            },
2979                            _ => builder.append_null(),
2980                        }
2981                    }
2982                    Arc::new(builder.finish()) as ArrayRef
2983                }
2984                DataType::Int64 => {
2985                    let mut builder = Int64Builder::with_capacity(vids.len());
2986                    for val in &values {
2987                        match val {
2988                            Some(Value::Map(obj)) => {
2989                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2990                                    Some(n) => builder.append_value(n),
2991                                    None => builder.append_null(),
2992                                }
2993                            }
2994                            _ => builder.append_null(),
2995                        }
2996                    }
2997                    Arc::new(builder.finish()) as ArrayRef
2998                }
2999                DataType::Timestamp(_, _) => {
3000                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3001                    for val in &values {
3002                        match val {
3003                            Some(Value::Map(obj)) => {
3004                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3005                                    Some(n) => builder.append_value(n),
3006                                    None => builder.append_null(),
3007                                }
3008                            }
3009                            _ => builder.append_null(),
3010                        }
3011                    }
3012                    Arc::new(builder.finish()) as ArrayRef
3013                }
3014                DataType::Int32 => {
3015                    let mut builder = Int32Builder::with_capacity(vids.len());
3016                    for val in &values {
3017                        match val {
3018                            Some(Value::Map(obj)) => {
3019                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3020                                    Some(n) => builder.append_value(n as i32),
3021                                    None => builder.append_null(),
3022                                }
3023                            }
3024                            _ => builder.append_null(),
3025                        }
3026                    }
3027                    Arc::new(builder.finish()) as ArrayRef
3028                }
3029                DataType::Time64(_) => {
3030                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3031                    for val in &values {
3032                        match val {
3033                            Some(Value::Map(obj)) => {
3034                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3035                                    Some(n) => builder.append_value(n),
3036                                    None => builder.append_null(),
3037                                }
3038                            }
3039                            _ => builder.append_null(),
3040                        }
3041                    }
3042                    Arc::new(builder.finish()) as ArrayRef
3043                }
3044                // Fallback: serialize as string
3045                _ => {
3046                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3047                    for val in &values {
3048                        match val {
3049                            Some(Value::Map(obj)) => match obj.get(field_name) {
3050                                Some(Value::Null) | None => builder.append_null(),
3051                                Some(other) => builder.append_value(format!("{other:?}")),
3052                            },
3053                            _ => builder.append_null(),
3054                        }
3055                    }
3056                    Arc::new(builder.finish()) as ArrayRef
3057                }
3058            }
3059        })
3060        .collect();
3061
3062    // Build null bitmap — null when the value is null/missing
3063    let nulls: Vec<bool> = values
3064        .iter()
3065        .map(|v| matches!(v, Some(Value::Map(_))))
3066        .collect();
3067
3068    let struct_array = StructArray::try_new(
3069        fields.clone(),
3070        child_arrays,
3071        Some(arrow::buffer::NullBuffer::from(nulls)),
3072    )
3073    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3074
3075    Ok(Arc::new(struct_array))
3076}
3077
3078impl Stream for GraphScanStream {
3079    type Item = DFResult<RecordBatch>;
3080
3081    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3082        loop {
3083            // Use a temporary to avoid borrow issues
3084            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3085
3086            match state {
3087                GraphScanState::Init => {
3088                    // Create the future with cloned data for ownership
3089                    let graph_ctx = self.graph_ctx.clone();
3090                    let label = self.label.clone();
3091                    let variable = self.variable.clone();
3092                    let properties = self.properties.clone();
3093                    let is_edge_scan = self.is_edge_scan;
3094                    let is_schemaless = self.is_schemaless;
3095                    let schema = self.schema.clone();
3096
3097                    let fut = async move {
3098                        graph_ctx.check_timeout().map_err(|e| {
3099                            datafusion::error::DataFusionError::Execution(e.to_string())
3100                        })?;
3101
3102                        let batch = if is_edge_scan {
3103                            columnar_scan_edge_batch_static(
3104                                &graph_ctx,
3105                                &label,
3106                                &variable,
3107                                &properties,
3108                                &schema,
3109                            )
3110                            .await?
3111                        } else if is_schemaless {
3112                            columnar_scan_schemaless_vertex_batch_static(
3113                                &graph_ctx,
3114                                &label,
3115                                &variable,
3116                                &properties,
3117                                &schema,
3118                            )
3119                            .await?
3120                        } else {
3121                            columnar_scan_vertex_batch_static(
3122                                &graph_ctx,
3123                                &label,
3124                                &variable,
3125                                &properties,
3126                                &schema,
3127                            )
3128                            .await?
3129                        };
3130                        Ok(Some(batch))
3131                    };
3132
3133                    self.state = GraphScanState::Executing(Box::pin(fut));
3134                    // Continue loop to poll the future
3135                }
3136                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3137                    Poll::Ready(Ok(batch)) => {
3138                        self.state = GraphScanState::Done;
3139                        self.metrics
3140                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3141                        return Poll::Ready(batch.map(Ok));
3142                    }
3143                    Poll::Ready(Err(e)) => {
3144                        self.state = GraphScanState::Done;
3145                        return Poll::Ready(Some(Err(e)));
3146                    }
3147                    Poll::Pending => {
3148                        self.state = GraphScanState::Executing(fut);
3149                        return Poll::Pending;
3150                    }
3151                },
3152                GraphScanState::Done => {
3153                    return Poll::Ready(None);
3154                }
3155            }
3156        }
3157    }
3158}
3159
3160impl RecordBatchStream for GraphScanStream {
3161    fn schema(&self) -> SchemaRef {
3162        self.schema.clone()
3163    }
3164}
3165
3166#[cfg(test)]
3167mod tests {
3168    use super::*;
3169
3170    #[test]
3171    fn test_build_vertex_schema() {
3172        let uni_schema = UniSchema::default();
3173        let schema = GraphScanExec::build_vertex_schema(
3174            "n",
3175            "Person",
3176            &["name".to_string(), "age".to_string()],
3177            &uni_schema,
3178        );
3179
3180        assert_eq!(schema.fields().len(), 4);
3181        assert_eq!(schema.field(0).name(), "n._vid");
3182        assert_eq!(schema.field(1).name(), "n._labels");
3183        assert_eq!(schema.field(2).name(), "n.name");
3184        assert_eq!(schema.field(3).name(), "n.age");
3185    }
3186
3187    #[test]
3188    fn test_build_edge_schema() {
3189        let uni_schema = UniSchema::default();
3190        let schema =
3191            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3192
3193        assert_eq!(schema.fields().len(), 4);
3194        assert_eq!(schema.field(0).name(), "r._eid");
3195        assert_eq!(schema.field(1).name(), "r._src_vid");
3196        assert_eq!(schema.field(2).name(), "r._dst_vid");
3197        assert_eq!(schema.field(3).name(), "r.weight");
3198    }
3199
3200    #[test]
3201    fn test_build_schemaless_vertex_schema() {
3202        let empty_schema = uni_common::core::schema::Schema::default();
3203        let schema = GraphScanExec::build_schemaless_vertex_schema(
3204            "n",
3205            &["name".to_string(), "age".to_string()],
3206            &empty_schema,
3207        );
3208
3209        assert_eq!(schema.fields().len(), 4);
3210        assert_eq!(schema.field(0).name(), "n._vid");
3211        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3212        assert_eq!(schema.field(1).name(), "n._labels");
3213        assert_eq!(schema.field(2).name(), "n.name");
3214        // With empty schema, falls back to LargeBinary
3215        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3216        assert_eq!(schema.field(3).name(), "n.age");
3217        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3218    }
3219
3220    #[test]
3221    fn test_schemaless_all_scan_has_empty_label() {
3222        let empty_schema = uni_common::core::schema::Schema::default();
3223        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3224
3225        // Verify the schema has _vid and _labels columns for a scan with no properties
3226        assert_eq!(schema.fields().len(), 2);
3227        assert_eq!(schema.field(0).name(), "n._vid");
3228        assert_eq!(schema.field(1).name(), "n._labels");
3229    }
3230
3231    #[test]
3232    fn test_cypher_value_all_props_extraction() {
3233        // Simulate _all_props encoding using encode_cypher_value helper
3234        let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3235        let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3236
3237        // Decode and extract "age" value
3238        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3239        match decoded {
3240            uni_common::Value::Map(map) => {
3241                let age_val = map.get("age").unwrap();
3242                assert_eq!(age_val, &uni_common::Value::Int(30));
3243            }
3244            _ => panic!("Expected Map"),
3245        }
3246
3247        // Also test single value encoding
3248        let single_val = serde_json::json!(30);
3249        let single_bytes = encode_cypher_value(&single_val).unwrap();
3250        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3251        assert_eq!(single_decoded, uni_common::Value::Int(30));
3252    }
3253
3254    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3255    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3256        let schema = Arc::new(Schema::new(vec![
3257            Field::new("_vid", DataType::UInt64, false),
3258            Field::new("_deleted", DataType::Boolean, false),
3259            Field::new("_version", DataType::UInt64, false),
3260            Field::new("name", DataType::Utf8, true),
3261        ]));
3262        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3263        let names: Vec<String> = vids
3264            .iter()
3265            .zip(versions.iter())
3266            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3267            .collect();
3268        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3269
3270        RecordBatch::try_new(
3271            schema,
3272            vec![
3273                Arc::new(UInt64Array::from(vids.to_vec())),
3274                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3275                Arc::new(UInt64Array::from(versions.to_vec())),
3276                Arc::new(name_arr),
3277            ],
3278        )
3279        .unwrap()
3280    }
3281
3282    #[test]
3283    fn test_mvcc_dedup_multiple_versions() {
3284        // VID 1 at versions 3, 1, 5 — should keep version 5
3285        // VID 2 at versions 2, 4 — should keep version 4
3286        let batch = make_mvcc_batch(
3287            &[1, 1, 1, 2, 2],
3288            &[3, 1, 5, 2, 4],
3289            &[false, false, false, false, false],
3290        );
3291
3292        let result = mvcc_dedup_batch(&batch).unwrap();
3293        assert_eq!(result.num_rows(), 2);
3294
3295        let vid_col = result
3296            .column_by_name("_vid")
3297            .unwrap()
3298            .as_any()
3299            .downcast_ref::<UInt64Array>()
3300            .unwrap();
3301        let ver_col = result
3302            .column_by_name("_version")
3303            .unwrap()
3304            .as_any()
3305            .downcast_ref::<UInt64Array>()
3306            .unwrap();
3307        let name_col = result
3308            .column_by_name("name")
3309            .unwrap()
3310            .as_any()
3311            .downcast_ref::<arrow_array::StringArray>()
3312            .unwrap();
3313
3314        // VID 1 → version 5, VID 2 → version 4
3315        assert_eq!(vid_col.value(0), 1);
3316        assert_eq!(ver_col.value(0), 5);
3317        assert_eq!(name_col.value(0), "v1_ver5");
3318
3319        assert_eq!(vid_col.value(1), 2);
3320        assert_eq!(ver_col.value(1), 4);
3321        assert_eq!(name_col.value(1), "v2_ver4");
3322    }
3323
3324    #[test]
3325    fn test_mvcc_dedup_single_rows() {
3326        // Each VID appears once — nothing should change
3327        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3328        let result = mvcc_dedup_batch(&batch).unwrap();
3329        assert_eq!(result.num_rows(), 3);
3330    }
3331
3332    #[test]
3333    fn test_mvcc_dedup_empty() {
3334        let batch = make_mvcc_batch(&[], &[], &[]);
3335        let result = mvcc_dedup_batch(&batch).unwrap();
3336        assert_eq!(result.num_rows(), 0);
3337    }
3338
3339    #[test]
3340    fn test_filter_l0_tombstones_removes_tombstoned() {
3341        use crate::query::df_graph::L0Context;
3342
3343        // Create a batch with VIDs 1, 2, 3
3344        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3345
3346        // Create L0 context with VID 2 tombstoned
3347        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3348        {
3349            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
3350            // But we can't easily create one with tombstones through the constructor.
3351            // Use a direct approach.
3352        }
3353        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3354        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3355
3356        let l0_ctx = L0Context {
3357            current_l0: Some(l0_buf),
3358            transaction_l0: None,
3359            pending_flush_l0s: vec![],
3360        };
3361
3362        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3363        assert_eq!(result.num_rows(), 2);
3364
3365        let vid_col = result
3366            .column_by_name("_vid")
3367            .unwrap()
3368            .as_any()
3369            .downcast_ref::<UInt64Array>()
3370            .unwrap();
3371        assert_eq!(vid_col.value(0), 1);
3372        assert_eq!(vid_col.value(1), 3);
3373    }
3374
3375    #[test]
3376    fn test_filter_l0_tombstones_none() {
3377        use crate::query::df_graph::L0Context;
3378
3379        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3380        let l0_ctx = L0Context::default();
3381
3382        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3383        assert_eq!(result.num_rows(), 3);
3384    }
3385
3386    #[test]
3387    fn test_map_to_output_schema_basic() {
3388        use crate::query::df_graph::L0Context;
3389
3390        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
3391        let lance_schema = Arc::new(Schema::new(vec![
3392            Field::new("_vid", DataType::UInt64, false),
3393            Field::new("_deleted", DataType::Boolean, false),
3394            Field::new("_version", DataType::UInt64, false),
3395            Field::new("name", DataType::Utf8, true),
3396        ]));
3397        let name_arr: arrow_array::StringArray =
3398            vec![Some("Alice"), Some("Bob")].into_iter().collect();
3399        let batch = RecordBatch::try_new(
3400            lance_schema,
3401            vec![
3402                Arc::new(UInt64Array::from(vec![1u64, 2])),
3403                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3404                Arc::new(UInt64Array::from(vec![1u64, 1])),
3405                Arc::new(name_arr),
3406            ],
3407        )
3408        .unwrap();
3409
3410        // Output schema: n._vid, n._labels, n.name
3411        let output_schema = Arc::new(Schema::new(vec![
3412            Field::new("n._vid", DataType::UInt64, false),
3413            Field::new("n._labels", labels_data_type(), true),
3414            Field::new("n.name", DataType::Utf8, true),
3415        ]));
3416
3417        let l0_ctx = L0Context::default();
3418        let result = map_to_output_schema(
3419            &batch,
3420            "Person",
3421            "n",
3422            &["name".to_string()],
3423            &output_schema,
3424            &l0_ctx,
3425        )
3426        .unwrap();
3427
3428        assert_eq!(result.num_rows(), 2);
3429        assert_eq!(result.schema().fields().len(), 3);
3430        assert_eq!(result.schema().field(0).name(), "n._vid");
3431        assert_eq!(result.schema().field(1).name(), "n._labels");
3432        assert_eq!(result.schema().field(2).name(), "n.name");
3433
3434        // Check name values carried through
3435        let name_col = result
3436            .column(2)
3437            .as_any()
3438            .downcast_ref::<arrow_array::StringArray>()
3439            .unwrap();
3440        assert_eq!(name_col.value(0), "Alice");
3441        assert_eq!(name_col.value(1), "Bob");
3442    }
3443}