Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down.
89    filter: Option<Arc<dyn PhysicalExpr>>,
90
91    /// Whether this is an edge scan (vs vertex scan).
92    is_edge_scan: bool,
93
94    /// Whether this is a schemaless scan (uses main table instead of per-label table).
95    is_schemaless: bool,
96
97    /// Output schema with materialized property columns.
98    schema: SchemaRef,
99
100    /// Cached plan properties.
101    properties: PlanProperties,
102
103    /// Metrics for execution tracking.
104    metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("GraphScanExec")
110            .field("label", &self.label)
111            .field("variable", &self.variable)
112            .field("projected_properties", &self.projected_properties)
113            .field("is_edge_scan", &self.is_edge_scan)
114            .finish()
115    }
116}
117
118impl GraphScanExec {
119    /// Create a new graph scan for vertices.
120    ///
121    /// Scans all vertices of the given label from storage and L0 buffers,
122    /// then materializes the requested properties.
123    pub fn new_vertex_scan(
124        graph_ctx: Arc<GraphExecutionContext>,
125        label: impl Into<String>,
126        variable: impl Into<String>,
127        projected_properties: Vec<String>,
128        filter: Option<Arc<dyn PhysicalExpr>>,
129    ) -> Self {
130        let label = label.into();
131        let variable = variable.into();
132
133        // Build output schema with proper types from Uni schema
134        let uni_schema = graph_ctx.storage().schema_manager().schema();
135        let schema =
136            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138        let properties = compute_plan_properties(schema.clone());
139
140        Self {
141            graph_ctx,
142            label,
143            variable,
144            projected_properties,
145            filter,
146            is_edge_scan: false,
147            is_schemaless: false,
148            schema,
149            properties,
150            metrics: ExecutionPlanMetricsSet::new(),
151        }
152    }
153
154    /// Create a new schemaless vertex scan.
155    ///
156    /// Scans the main vertices table for vertices with the given label name.
157    /// Properties are extracted from props_json (all treated as Utf8/JSON).
158    /// This is used for labels that aren't in the schema.
159    pub fn new_schemaless_vertex_scan(
160        graph_ctx: Arc<GraphExecutionContext>,
161        label_name: impl Into<String>,
162        variable: impl Into<String>,
163        projected_properties: Vec<String>,
164        filter: Option<Arc<dyn PhysicalExpr>>,
165    ) -> Self {
166        let label = label_name.into();
167        let variable = variable.into();
168
169        // Filter out system columns that are already materialized as dedicated columns
170        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
171        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
172        // they would create duplicate columns with conflicting types.
173        let projected_properties: Vec<String> = projected_properties
174            .into_iter()
175            .filter(|p| p != "_vid" && p != "_labels")
176            .collect();
177
178        let uni_schema = graph_ctx.storage().schema_manager().schema();
179        let schema =
180            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
181        let properties = compute_plan_properties(schema.clone());
182
183        Self {
184            graph_ctx,
185            label,
186            variable,
187            projected_properties,
188            filter,
189            is_edge_scan: false,
190            is_schemaless: true,
191            schema,
192            properties,
193            metrics: ExecutionPlanMetricsSet::new(),
194        }
195    }
196
197    /// Create a new multi-label vertex scan using the main vertices table.
198    ///
199    /// Scans for vertices that have ALL specified labels (intersection semantics).
200    /// Properties are extracted from props_json (schemaless).
201    pub fn new_multi_label_vertex_scan(
202        graph_ctx: Arc<GraphExecutionContext>,
203        labels: Vec<String>,
204        variable: impl Into<String>,
205        projected_properties: Vec<String>,
206        filter: Option<Arc<dyn PhysicalExpr>>,
207    ) -> Self {
208        let variable = variable.into();
209        let projected_properties: Vec<String> = projected_properties
210            .into_iter()
211            .filter(|p| p != "_vid" && p != "_labels")
212            .collect();
213        let uni_schema = graph_ctx.storage().schema_manager().schema();
214        let schema =
215            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
216        let properties = compute_plan_properties(schema.clone());
217
218        // Encode labels as colon-separated for the stream to parse
219        let encoded_labels = labels.join(":");
220
221        Self {
222            graph_ctx,
223            label: encoded_labels,
224            variable,
225            projected_properties,
226            filter,
227            is_edge_scan: false,
228            is_schemaless: true,
229            schema,
230            properties,
231            metrics: ExecutionPlanMetricsSet::new(),
232        }
233    }
234
235    /// Create a new schemaless scan for all vertices.
236    ///
237    /// Scans the main vertices table for all vertices regardless of label.
238    /// Properties are extracted from props_json with types resolved from the schema.
239    /// This is used for `MATCH (n)` without label filter.
240    pub fn new_schemaless_all_scan(
241        graph_ctx: Arc<GraphExecutionContext>,
242        variable: impl Into<String>,
243        projected_properties: Vec<String>,
244        filter: Option<Arc<dyn PhysicalExpr>>,
245    ) -> Self {
246        let variable = variable.into();
247        let projected_properties: Vec<String> = projected_properties
248            .into_iter()
249            .filter(|p| p != "_vid" && p != "_labels")
250            .collect();
251
252        let uni_schema = graph_ctx.storage().schema_manager().schema();
253        let schema =
254            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
255        let properties = compute_plan_properties(schema.clone());
256
257        Self {
258            graph_ctx,
259            label: String::new(), // Empty label signals "scan all vertices"
260            variable,
261            projected_properties,
262            filter,
263            is_edge_scan: false,
264            is_schemaless: true,
265            schema,
266            properties,
267            metrics: ExecutionPlanMetricsSet::new(),
268        }
269    }
270
271    /// Build schema for schemaless vertex scan.
272    ///
273    /// Resolves property types from all labels in the schema. Falls back to
274    /// LargeBinary (CypherValue encoding) for properties not found in any
275    /// label's schema.
276    fn build_schemaless_vertex_schema(
277        variable: &str,
278        properties: &[String],
279        uni_schema: &uni_common::core::schema::Schema,
280    ) -> SchemaRef {
281        // Merge property metadata from all labels for type resolution.
282        let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
283            std::collections::HashMap::new();
284        for label_props in uni_schema.properties.values() {
285            for (name, meta) in label_props {
286                merged.entry(name.as_str()).or_insert(meta);
287            }
288        }
289
290        let mut fields = vec![
291            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
292            Field::new(format!("{}._labels", variable), labels_data_type(), true),
293        ];
294
295        for prop in properties {
296            let col_name = format!("{}.{}", variable, prop);
297            let arrow_type = merged
298                .get(prop.as_str())
299                .map(|meta| meta.r#type.to_arrow())
300                .unwrap_or(DataType::LargeBinary);
301            fields.push(Field::new(&col_name, arrow_type, true));
302        }
303
304        Arc::new(Schema::new(fields))
305    }
306
307    /// Create a new graph scan for edges.
308    ///
309    /// Scans all edges of the given type from storage and L0 buffers,
310    /// then materializes the requested properties.
311    pub fn new_edge_scan(
312        graph_ctx: Arc<GraphExecutionContext>,
313        edge_type: impl Into<String>,
314        variable: impl Into<String>,
315        projected_properties: Vec<String>,
316        filter: Option<Arc<dyn PhysicalExpr>>,
317    ) -> Self {
318        let label = edge_type.into();
319        let variable = variable.into();
320
321        // Build output schema with proper types from Uni schema
322        let uni_schema = graph_ctx.storage().schema_manager().schema();
323        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
324
325        let properties = compute_plan_properties(schema.clone());
326
327        Self {
328            graph_ctx,
329            label,
330            variable,
331            projected_properties,
332            filter,
333            is_edge_scan: true,
334            is_schemaless: false,
335            schema,
336            properties,
337            metrics: ExecutionPlanMetricsSet::new(),
338        }
339    }
340
341    /// Build output schema for vertex scan with proper Arrow types.
342    fn build_vertex_schema(
343        variable: &str,
344        label: &str,
345        properties: &[String],
346        uni_schema: &UniSchema,
347    ) -> SchemaRef {
348        let mut fields = vec![
349            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
350            Field::new(format!("{}._labels", variable), labels_data_type(), true),
351        ];
352        let label_props = uni_schema.properties.get(label);
353        for prop in properties {
354            let col_name = format!("{}.{}", variable, prop);
355            let arrow_type = resolve_property_type(prop, label_props);
356            fields.push(Field::new(&col_name, arrow_type, true));
357        }
358        Arc::new(Schema::new(fields))
359    }
360
361    /// Build output schema for edge scan with proper Arrow types.
362    fn build_edge_schema(
363        variable: &str,
364        edge_type: &str,
365        properties: &[String],
366        uni_schema: &UniSchema,
367    ) -> SchemaRef {
368        let mut fields = vec![
369            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
370            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
371            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
372        ];
373        let edge_props = uni_schema.properties.get(edge_type);
374        for prop in properties {
375            let col_name = format!("{}.{}", variable, prop);
376            let arrow_type = resolve_property_type(prop, edge_props);
377            fields.push(Field::new(&col_name, arrow_type, true));
378        }
379        Arc::new(Schema::new(fields))
380    }
381}
382
383impl DisplayAs for GraphScanExec {
384    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
386        write!(
387            f,
388            "GraphScanExec: {}={}, properties={:?}",
389            scan_type, self.label, self.projected_properties
390        )?;
391        if self.filter.is_some() {
392            write!(f, ", filter=<pushed>")?;
393        }
394        Ok(())
395    }
396}
397
398impl ExecutionPlan for GraphScanExec {
399    fn name(&self) -> &str {
400        "GraphScanExec"
401    }
402
403    fn as_any(&self) -> &dyn Any {
404        self
405    }
406
407    fn schema(&self) -> SchemaRef {
408        self.schema.clone()
409    }
410
411    fn properties(&self) -> &PlanProperties {
412        &self.properties
413    }
414
415    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
416        vec![]
417    }
418
419    fn with_new_children(
420        self: Arc<Self>,
421        children: Vec<Arc<dyn ExecutionPlan>>,
422    ) -> DFResult<Arc<dyn ExecutionPlan>> {
423        if children.is_empty() {
424            Ok(self)
425        } else {
426            Err(datafusion::error::DataFusionError::Plan(
427                "GraphScanExec does not accept children".to_string(),
428            ))
429        }
430    }
431
432    fn execute(
433        &self,
434        partition: usize,
435        _context: Arc<TaskContext>,
436    ) -> DFResult<SendableRecordBatchStream> {
437        let metrics = BaselineMetrics::new(&self.metrics, partition);
438
439        Ok(Box::pin(GraphScanStream::new(
440            self.graph_ctx.clone(),
441            self.label.clone(),
442            self.variable.clone(),
443            self.projected_properties.clone(),
444            self.is_edge_scan,
445            self.is_schemaless,
446            self.schema.clone(),
447            metrics,
448        )))
449    }
450
451    fn metrics(&self) -> Option<MetricsSet> {
452        Some(self.metrics.clone_inner())
453    }
454}
455
456/// State machine for graph scan stream execution.
457enum GraphScanState {
458    /// Initial state, ready to start scanning.
459    Init,
460    /// Executing the async scan.
461    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
462    /// Stream is done.
463    Done,
464}
465
466/// Stream that scans vertices or edges and materializes properties.
467///
468/// For known-label vertex scans, uses a single columnar Lance query with
469/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
470/// to the two-phase VID-scan + property-materialize flow.
471struct GraphScanStream {
472    /// Graph execution context.
473    graph_ctx: Arc<GraphExecutionContext>,
474
475    /// Label (vertex) or edge type name.
476    label: String,
477
478    /// Variable name for column prefixing (e.g., "n" in `n.name`).
479    variable: String,
480
481    /// Properties to materialize.
482    properties: Vec<String>,
483
484    /// Whether this is an edge scan.
485    is_edge_scan: bool,
486
487    /// Whether this is a schemaless scan.
488    is_schemaless: bool,
489
490    /// Output schema.
491    schema: SchemaRef,
492
493    /// Stream state.
494    state: GraphScanState,
495
496    /// Metrics.
497    metrics: BaselineMetrics,
498}
499
500impl GraphScanStream {
501    /// Create a new graph scan stream.
502    #[expect(clippy::too_many_arguments)]
503    fn new(
504        graph_ctx: Arc<GraphExecutionContext>,
505        label: String,
506        variable: String,
507        properties: Vec<String>,
508        is_edge_scan: bool,
509        is_schemaless: bool,
510        schema: SchemaRef,
511        metrics: BaselineMetrics,
512    ) -> Self {
513        Self {
514            graph_ctx,
515            label,
516            variable,
517            properties,
518            is_edge_scan,
519            is_schemaless,
520            schema,
521            state: GraphScanState::Init,
522            metrics,
523        }
524    }
525}
526
527/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
528///
529/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
530/// preserving original value types for overflow/unknown properties.
531pub(crate) fn resolve_property_type(
532    prop: &str,
533    schema_props: Option<
534        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
535    >,
536) -> DataType {
537    if prop == "overflow_json" {
538        DataType::LargeBinary
539    } else {
540        schema_props
541            .and_then(|props| props.get(prop))
542            .map(|meta| meta.r#type.to_arrow())
543            .unwrap_or(DataType::LargeBinary)
544    }
545}
546
547// ============================================================================
548// Columnar-first scan helpers
549// ============================================================================
550
551/// MVCC deduplication: keep only the highest-version row for each `_vid`.
552///
553/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
554/// _vid (= the highest version). This is a pure Arrow-compute operation.
555#[cfg(test)]
556fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
557    mvcc_dedup_batch_by(batch, "_vid")
558}
559
560/// Dedup a Lance batch and return `Some` only when rows remain.
561///
562/// Wraps the common pattern of dedup + empty-check that appears in every
563/// columnar scan path (vertex, edge, schemaless).
564fn mvcc_dedup_to_option(
565    batch: Option<RecordBatch>,
566    id_column: &str,
567) -> DFResult<Option<RecordBatch>> {
568    match batch {
569        Some(b) => {
570            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
571            Ok(if deduped.num_rows() > 0 {
572                Some(deduped)
573            } else {
574                None
575            })
576        }
577        None => Ok(None),
578    }
579}
580
581/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
582/// result. Returns an empty batch (against `output_schema`) when both inputs
583/// are empty.
584fn merge_lance_and_l0(
585    lance_deduped: Option<RecordBatch>,
586    l0_batch: RecordBatch,
587    internal_schema: &SchemaRef,
588    id_column: &str,
589) -> DFResult<Option<RecordBatch>> {
590    let has_l0 = l0_batch.num_rows() > 0;
591    match (lance_deduped, has_l0) {
592        (Some(lance), true) => {
593            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
594                .map_err(arrow_err)?;
595            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
596        }
597        (Some(lance), false) => Ok(Some(lance)),
598        (None, true) => Ok(Some(l0_batch)),
599        (None, false) => Ok(None),
600    }
601}
602
603/// Push `col_name` into `columns` if not already present.
604///
605/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
606/// that creates a temporary `String` allocation on every check.
607fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
608    if !columns.iter().any(|c| c == col_name) {
609        columns.push(col_name.to_string());
610    }
611}
612
613/// Extract a property value from an overflow_json CypherValue blob.
614///
615/// Returns the raw CypherValue bytes for `prop` if found in the blob,
616/// or `None` if the blob is null or the key is absent.
617fn extract_from_overflow_blob(
618    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
619    row: usize,
620    prop: &str,
621) -> Option<Vec<u8>> {
622    let arr = overflow_arr?;
623    if arr.is_null(row) {
624        return None;
625    }
626    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
627}
628
629/// Build a `LargeBinary` column by extracting a property from overflow_json
630/// blobs, with L0 buffer overlay.
631///
632/// For each row, checks L0 buffers first (later buffers take precedence).
633/// If the property is not in L0, falls back to extracting from the
634/// overflow_json CypherValue blob.
635fn build_overflow_property_column(
636    num_rows: usize,
637    vid_arr: &UInt64Array,
638    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
639    prop: &str,
640    l0_ctx: &crate::query::df_graph::L0Context,
641) -> ArrayRef {
642    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
643    for i in 0..num_rows {
644        let vid = Vid::from(vid_arr.value(i));
645
646        // Check L0 buffers (later overwrites earlier)
647        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
648
649        if let Some(val_opt) = l0_val {
650            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
651        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
652            builder.append_value(&bytes);
653        } else {
654            builder.append_null();
655        }
656    }
657    Arc::new(builder.finish())
658}
659
660/// Resolve a property value from the L0 visibility chain.
661///
662/// Returns `Some(Some(val))` when the property exists with a non-null value,
663/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
664/// has the property.
665fn resolve_l0_property(
666    vid: &Vid,
667    prop: &str,
668    l0_ctx: &crate::query::df_graph::L0Context,
669) -> Option<Option<Value>> {
670    let mut result = None;
671    for l0 in l0_ctx.iter_l0_buffers() {
672        let guard = l0.read();
673        if let Some(props) = guard.vertex_properties.get(vid)
674            && let Some(val) = props.get(prop)
675        {
676            result = Some(Some(val.clone()));
677        }
678    }
679    result
680}
681
682/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
683///
684/// Non-null values are JSON-encoded then CypherValue-encoded.
685/// Null values (or encoding failures) produce null entries.
686fn append_value_as_cypher_binary(
687    builder: &mut arrow_array::builder::LargeBinaryBuilder,
688    val: Option<&Value>,
689) {
690    match val {
691        Some(v) if !v.is_null() => {
692            let json_val: serde_json::Value = v.clone().into();
693            match encode_cypher_value(&json_val) {
694                Ok(bytes) => builder.append_value(bytes),
695                Err(_) => builder.append_null(),
696            }
697        }
698        _ => builder.append_null(),
699    }
700}
701
702/// Build the `_all_props` column by overlaying L0 buffer properties onto
703/// the batch's `props_json` column.
704///
705/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
706/// properties (in visibility order: pending → current → transaction), and
707/// re-encodes the result. This ensures `properties()` and `keys()` reflect
708/// uncommitted L0 mutations.
709fn build_all_props_column_with_l0_overlay(
710    num_rows: usize,
711    vid_arr: &UInt64Array,
712    props_arr: Option<&arrow_array::LargeBinaryArray>,
713    l0_ctx: &crate::query::df_graph::L0Context,
714) -> ArrayRef {
715    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
716    for i in 0..num_rows {
717        let vid = Vid::from(vid_arr.value(i));
718
719        // 1. Decode props_json blob from storage
720        let mut merged_props = serde_json::Map::new();
721        if let Some(arr) = props_arr
722            && !arr.is_null(i)
723            && let Ok(uni_common::Value::Map(map)) =
724                uni_common::cypher_value_codec::decode(arr.value(i))
725        {
726            for (k, v) in map {
727                let json_val: serde_json::Value = v.into();
728                merged_props.insert(k, json_val);
729            }
730        }
731
732        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
733        for l0 in l0_ctx.iter_l0_buffers() {
734            let guard = l0.read();
735            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
736                for (k, v) in l0_props {
737                    let json_val: serde_json::Value = v.clone().into();
738                    merged_props.insert(k.clone(), json_val);
739                }
740            }
741        }
742
743        // 3. Encode merged result
744        if merged_props.is_empty() {
745            builder.append_null();
746        } else {
747            let json_obj = serde_json::Value::Object(merged_props);
748            match encode_cypher_value(&json_obj) {
749                Ok(bytes) => builder.append_value(bytes),
750                Err(_) => builder.append_null(),
751            }
752        }
753    }
754    Arc::new(builder.finish())
755}
756
757/// Build `_all_props` for a schema-based scan by merging:
758/// 1. Schema-defined columns from the batch
759/// 2. Overflow_json properties
760/// 3. L0 buffer properties
761fn build_all_props_column_for_schema_scan(
762    batch: &RecordBatch,
763    vid_arr: &UInt64Array,
764    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
765    projected_properties: &[String],
766    l0_ctx: &crate::query::df_graph::L0Context,
767) -> ArrayRef {
768    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
769    let schema_props: Vec<&str> = projected_properties
770        .iter()
771        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
772        .map(String::as_str)
773        .collect();
774
775    let num_rows = batch.num_rows();
776    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
777    for i in 0..num_rows {
778        let vid = Vid::from(vid_arr.value(i));
779        let mut merged_props = serde_json::Map::new();
780
781        // 1. Schema-defined columns
782        for &prop in &schema_props {
783            if let Some(col) = batch.column_by_name(prop) {
784                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
785                if !val.is_null() {
786                    let json_val: serde_json::Value = val.into();
787                    merged_props.insert(prop.to_string(), json_val);
788                }
789            }
790        }
791
792        // 2. Overflow_json properties
793        if let Some(arr) = overflow_arr
794            && !arr.is_null(i)
795            && let Ok(uni_common::Value::Map(map)) =
796                uni_common::cypher_value_codec::decode(arr.value(i))
797        {
798            for (k, v) in map {
799                let json_val: serde_json::Value = v.into();
800                merged_props.insert(k, json_val);
801            }
802        }
803
804        // 3. L0 buffer overlay (pending → current → transaction)
805        for l0 in l0_ctx.iter_l0_buffers() {
806            let guard = l0.read();
807            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
808                for (k, v) in l0_props {
809                    let json_val: serde_json::Value = v.clone().into();
810                    merged_props.insert(k.clone(), json_val);
811                }
812            }
813        }
814
815        if merged_props.is_empty() {
816            builder.append_null();
817        } else {
818            let json_obj = serde_json::Value::Object(merged_props);
819            match encode_cypher_value(&json_obj) {
820                Ok(bytes) => builder.append_value(bytes),
821                Err(_) => builder.append_null(),
822            }
823        }
824    }
825    Arc::new(builder.finish())
826}
827
828/// MVCC deduplication: keep only the highest-version row for each unique value
829/// in the given `id_column`.
830///
831/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
832/// each id (= the highest version). This is a pure Arrow-compute operation.
833fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
834    if batch.num_rows() == 0 {
835        return Ok(batch.clone());
836    }
837
838    let id_col = batch
839        .column_by_name(id_column)
840        .ok_or_else(|| {
841            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
842        })?
843        .clone();
844    let version_col = batch
845        .column_by_name("_version")
846        .ok_or_else(|| {
847            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
848        })?
849        .clone();
850
851    // Sort by (id_column ASC, _version DESC)
852    let sort_columns = vec![
853        arrow::compute::SortColumn {
854            values: id_col,
855            options: Some(arrow::compute::SortOptions {
856                descending: false,
857                nulls_first: false,
858            }),
859        },
860        arrow::compute::SortColumn {
861            values: version_col,
862            options: Some(arrow::compute::SortOptions {
863                descending: true,
864                nulls_first: false,
865            }),
866        },
867    ];
868    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
869
870    // Reorder all columns by sorted indices
871    let sorted_columns: Vec<ArrayRef> = batch
872        .columns()
873        .iter()
874        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
875        .collect::<Result<_, _>>()
876        .map_err(arrow_err)?;
877    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
878
879    // Build dedup mask: keep first occurrence of each id
880    let sorted_id = sorted
881        .column_by_name(id_column)
882        .unwrap()
883        .as_any()
884        .downcast_ref::<UInt64Array>()
885        .unwrap();
886
887    let mut keep = vec![false; sorted.num_rows()];
888    if !keep.is_empty() {
889        keep[0] = true;
890        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
891            if sorted_id.value(i) != sorted_id.value(i - 1) {
892                *flag = true;
893            }
894        }
895    }
896
897    let mask = arrow_array::BooleanArray::from(keep);
898    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
899}
900
901/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
902fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
903    if batch.num_rows() == 0 {
904        return Ok(batch.clone());
905    }
906    let op_col = match batch.column_by_name("op") {
907        Some(col) => col
908            .as_any()
909            .downcast_ref::<arrow_array::UInt8Array>()
910            .unwrap(),
911        None => return Ok(batch.clone()),
912    };
913    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
914    let mask = arrow_array::BooleanArray::from(keep);
915    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
916}
917
918/// Filter out rows where `_deleted = true` after MVCC dedup.
919fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
920    if batch.num_rows() == 0 {
921        return Ok(batch.clone());
922    }
923    let deleted_col = match batch.column_by_name("_deleted") {
924        Some(col) => col
925            .as_any()
926            .downcast_ref::<arrow_array::BooleanArray>()
927            .unwrap(),
928        None => return Ok(batch.clone()),
929    };
930    let keep: Vec<bool> = (0..deleted_col.len())
931        .map(|i| !deleted_col.value(i))
932        .collect();
933    let mask = arrow_array::BooleanArray::from(keep);
934    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
935}
936
937/// Filter out rows whose `_vid` appears in L0 tombstones.
938fn filter_l0_tombstones(
939    batch: &RecordBatch,
940    l0_ctx: &crate::query::df_graph::L0Context,
941) -> DFResult<RecordBatch> {
942    if batch.num_rows() == 0 {
943        return Ok(batch.clone());
944    }
945
946    let mut tombstones: HashSet<u64> = HashSet::new();
947    for l0 in l0_ctx.iter_l0_buffers() {
948        let guard = l0.read();
949        for vid in guard.vertex_tombstones.iter() {
950            tombstones.insert(vid.as_u64());
951        }
952    }
953
954    if tombstones.is_empty() {
955        return Ok(batch.clone());
956    }
957
958    let vid_col = batch
959        .column_by_name("_vid")
960        .ok_or_else(|| {
961            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
962        })?
963        .as_any()
964        .downcast_ref::<UInt64Array>()
965        .unwrap();
966
967    let keep: Vec<bool> = (0..vid_col.len())
968        .map(|i| !tombstones.contains(&vid_col.value(i)))
969        .collect();
970    let mask = arrow_array::BooleanArray::from(keep);
971    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
972}
973
974/// Filter out rows whose `eid` appears in L0 edge tombstones.
975fn filter_l0_edge_tombstones(
976    batch: &RecordBatch,
977    l0_ctx: &crate::query::df_graph::L0Context,
978) -> DFResult<RecordBatch> {
979    if batch.num_rows() == 0 {
980        return Ok(batch.clone());
981    }
982
983    let mut tombstones: HashSet<u64> = HashSet::new();
984    for l0 in l0_ctx.iter_l0_buffers() {
985        let guard = l0.read();
986        for eid in guard.tombstones.keys() {
987            tombstones.insert(eid.as_u64());
988        }
989    }
990
991    if tombstones.is_empty() {
992        return Ok(batch.clone());
993    }
994
995    let eid_col = batch
996        .column_by_name("eid")
997        .ok_or_else(|| {
998            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
999        })?
1000        .as_any()
1001        .downcast_ref::<UInt64Array>()
1002        .unwrap();
1003
1004    let keep: Vec<bool> = (0..eid_col.len())
1005        .map(|i| !tombstones.contains(&eid_col.value(i)))
1006        .collect();
1007    let mask = arrow_array::BooleanArray::from(keep);
1008    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1009}
1010
1011/// Build a RecordBatch from L0 buffer data for a given label, matching the
1012/// Lance query's column set.
1013///
1014/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1015/// with later buffers overwriting earlier ones for the same VID.
1016fn build_l0_vertex_batch(
1017    l0_ctx: &crate::query::df_graph::L0Context,
1018    label: &str,
1019    lance_schema: &SchemaRef,
1020    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1021) -> DFResult<RecordBatch> {
1022    // Collect all L0 vertex data, merging in visibility order
1023    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1024    let mut tombstones: HashSet<u64> = HashSet::new();
1025
1026    for l0 in l0_ctx.iter_l0_buffers() {
1027        let guard = l0.read();
1028        // Collect tombstones
1029        for vid in guard.vertex_tombstones.iter() {
1030            tombstones.insert(vid.as_u64());
1031        }
1032        // Collect vertices for this label
1033        for vid in guard.vids_for_label(label) {
1034            let vid_u64 = vid.as_u64();
1035            if tombstones.contains(&vid_u64) {
1036                continue;
1037            }
1038            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1039            let entry = vid_data
1040                .entry(vid_u64)
1041                .or_insert_with(|| (Properties::new(), 0));
1042            // Merge properties (later L0 overwrites)
1043            if let Some(props) = guard.vertex_properties.get(&vid) {
1044                for (k, v) in props {
1045                    entry.0.insert(k.clone(), v.clone());
1046                }
1047            }
1048            // Take the highest version
1049            if version > entry.1 {
1050                entry.1 = version;
1051            }
1052        }
1053    }
1054
1055    // Remove tombstoned VIDs
1056    for t in &tombstones {
1057        vid_data.remove(t);
1058    }
1059
1060    if vid_data.is_empty() {
1061        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1062    }
1063
1064    // Sort VIDs for deterministic output
1065    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1066    vids.sort_unstable();
1067
1068    let num_rows = vids.len();
1069    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1070
1071    // Determine which schema property names exist
1072    let schema_prop_names: HashSet<&str> = label_props
1073        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1074        .unwrap_or_default();
1075
1076    for field in lance_schema.fields() {
1077        let col_name = field.name().as_str();
1078        match col_name {
1079            "_vid" => {
1080                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1081            }
1082            "_deleted" => {
1083                // L0 vertices are always live (tombstoned ones are already excluded)
1084                let vals = vec![false; num_rows];
1085                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1086            }
1087            "_version" => {
1088                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1089                columns.push(Arc::new(UInt64Array::from(vals)));
1090            }
1091            "overflow_json" => {
1092                // Collect non-schema properties as CypherValue
1093                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1094                for vid_u64 in &vids {
1095                    let (props, _) = &vid_data[vid_u64];
1096                    let mut overflow = serde_json::Map::new();
1097                    for (k, v) in props {
1098                        if k == "ext_id" || k.starts_with('_') {
1099                            continue;
1100                        }
1101                        if !schema_prop_names.contains(k.as_str()) {
1102                            let json_val: serde_json::Value = v.clone().into();
1103                            overflow.insert(k.clone(), json_val);
1104                        }
1105                    }
1106                    if overflow.is_empty() {
1107                        builder.append_null();
1108                    } else {
1109                        let json_val = serde_json::Value::Object(overflow);
1110                        match encode_cypher_value(&json_val) {
1111                            Ok(bytes) => builder.append_value(bytes),
1112                            Err(_) => builder.append_null(),
1113                        }
1114                    }
1115                }
1116                columns.push(Arc::new(builder.finish()));
1117            }
1118            _ => {
1119                // Schema property column: convert L0 Value → Arrow typed value
1120                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1121                columns.push(col);
1122            }
1123        }
1124    }
1125
1126    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1127}
1128
1129/// Build a single Arrow column from L0 property values.
1130///
1131/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1132fn build_l0_property_column(
1133    vids: &[u64],
1134    vid_data: &HashMap<u64, (Properties, u64)>,
1135    prop_name: &str,
1136    data_type: &DataType,
1137) -> DFResult<ArrayRef> {
1138    // Convert to Vid keys for reuse of existing build_property_column_static
1139    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1140    let props_map: HashMap<Vid, Properties> = vid_data
1141        .iter()
1142        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1143        .collect();
1144
1145    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1146}
1147
1148/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1149/// the DeltaDataset Lance table's column set.
1150///
1151/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1152/// with later buffers overwriting earlier ones for the same EID.
1153fn build_l0_edge_batch(
1154    l0_ctx: &crate::query::df_graph::L0Context,
1155    edge_type: &str,
1156    internal_schema: &SchemaRef,
1157    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1158) -> DFResult<RecordBatch> {
1159    // Collect all L0 edge data, merging in visibility order
1160    // eid -> (src_vid, dst_vid, properties, version)
1161    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1162    let mut tombstones: HashSet<u64> = HashSet::new();
1163
1164    for l0 in l0_ctx.iter_l0_buffers() {
1165        let guard = l0.read();
1166        // Collect tombstones
1167        for eid in guard.tombstones.keys() {
1168            tombstones.insert(eid.as_u64());
1169        }
1170        // Collect edges for this type
1171        for eid in guard.eids_for_type(edge_type) {
1172            let eid_u64 = eid.as_u64();
1173            if tombstones.contains(&eid_u64) {
1174                continue;
1175            }
1176            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1177                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1178                None => continue,
1179            };
1180            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1181            let entry = eid_data
1182                .entry(eid_u64)
1183                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1184            // Merge properties (later L0 overwrites)
1185            if let Some(props) = guard.edge_properties.get(&eid) {
1186                for (k, v) in props {
1187                    entry.2.insert(k.clone(), v.clone());
1188                }
1189            }
1190            // Update endpoints from latest L0 layer
1191            entry.0 = src_vid;
1192            entry.1 = dst_vid;
1193            // Take the highest version
1194            if version > entry.3 {
1195                entry.3 = version;
1196            }
1197        }
1198    }
1199
1200    // Remove tombstoned EIDs
1201    for t in &tombstones {
1202        eid_data.remove(t);
1203    }
1204
1205    if eid_data.is_empty() {
1206        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1207    }
1208
1209    // Sort EIDs for deterministic output
1210    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1211    eids.sort_unstable();
1212
1213    let num_rows = eids.len();
1214    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1215
1216    // Determine which schema property names exist
1217    let schema_prop_names: HashSet<&str> = type_props
1218        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1219        .unwrap_or_default();
1220
1221    for field in internal_schema.fields() {
1222        let col_name = field.name().as_str();
1223        match col_name {
1224            "eid" => {
1225                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1226            }
1227            "src_vid" => {
1228                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1229                columns.push(Arc::new(UInt64Array::from(vals)));
1230            }
1231            "dst_vid" => {
1232                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1233                columns.push(Arc::new(UInt64Array::from(vals)));
1234            }
1235            "op" => {
1236                // L0 edges are always live (tombstoned ones already excluded)
1237                let vals = vec![0u8; num_rows];
1238                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1239            }
1240            "_version" => {
1241                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1242                columns.push(Arc::new(UInt64Array::from(vals)));
1243            }
1244            "overflow_json" => {
1245                // Collect non-schema properties as CypherValue
1246                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1247                for eid_u64 in &eids {
1248                    let (_, _, props, _) = &eid_data[eid_u64];
1249                    let mut overflow = serde_json::Map::new();
1250                    for (k, v) in props {
1251                        if k.starts_with('_') {
1252                            continue;
1253                        }
1254                        if !schema_prop_names.contains(k.as_str()) {
1255                            let json_val: serde_json::Value = v.clone().into();
1256                            overflow.insert(k.clone(), json_val);
1257                        }
1258                    }
1259                    if overflow.is_empty() {
1260                        builder.append_null();
1261                    } else {
1262                        let json_val = serde_json::Value::Object(overflow);
1263                        match encode_cypher_value(&json_val) {
1264                            Ok(bytes) => builder.append_value(bytes),
1265                            Err(_) => builder.append_null(),
1266                        }
1267                    }
1268                }
1269                columns.push(Arc::new(builder.finish()));
1270            }
1271            _ => {
1272                // Schema property column: convert L0 Value → Arrow typed value
1273                let col =
1274                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1275                columns.push(col);
1276            }
1277        }
1278    }
1279
1280    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1281}
1282
1283/// Build a single Arrow column from L0 edge property values.
1284///
1285/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1286fn build_l0_edge_property_column(
1287    eids: &[u64],
1288    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1289    prop_name: &str,
1290    data_type: &DataType,
1291) -> DFResult<ArrayRef> {
1292    // Convert to Vid keys for reuse of existing build_property_column_static
1293    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1294    let props_map: HashMap<Vid, Properties> = eid_data
1295        .iter()
1296        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1297        .collect();
1298
1299    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1300}
1301
1302/// Build the `_labels` column for known-label vertices.
1303///
1304/// Reads `_labels` from the stored Lance batch if available. Falls back to
1305/// `[label]` when the column is absent (legacy data). Additional labels from
1306/// L0 buffers are merged in.
1307fn build_labels_column_for_known_label(
1308    vid_arr: &UInt64Array,
1309    label: &str,
1310    l0_ctx: &crate::query::df_graph::L0Context,
1311    batch_labels_col: Option<&arrow_array::ListArray>,
1312) -> DFResult<ArrayRef> {
1313    use uni_store::storage::arrow_convert::labels_from_list_array;
1314
1315    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1316
1317    for i in 0..vid_arr.len() {
1318        let vid = Vid::from(vid_arr.value(i));
1319
1320        // Start with labels from the stored column, falling back to [label]
1321        let mut labels = match batch_labels_col {
1322            Some(list_arr) => {
1323                let stored = labels_from_list_array(list_arr, i);
1324                if stored.is_empty() {
1325                    vec![label.to_string()]
1326                } else {
1327                    stored
1328                }
1329            }
1330            None => vec![label.to_string()],
1331        };
1332
1333        // Ensure the scanned label is present (defensive)
1334        if !labels.iter().any(|l| l == label) {
1335            labels.push(label.to_string());
1336        }
1337
1338        // Merge additional labels from L0 buffers
1339        for l0 in l0_ctx.iter_l0_buffers() {
1340            let guard = l0.read();
1341            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1342                for lbl in l0_labels {
1343                    if !labels.contains(lbl) {
1344                        labels.push(lbl.clone());
1345                    }
1346                }
1347            }
1348        }
1349
1350        let values = labels_builder.values();
1351        for lbl in &labels {
1352            values.append_value(lbl);
1353        }
1354        labels_builder.append(true);
1355    }
1356
1357    Ok(Arc::new(labels_builder.finish()))
1358}
1359
1360/// Map a Lance-schema batch to the DataFusion output schema.
1361///
1362/// The output schema has `{variable}.{property}` column names, while Lance
1363/// uses bare property names. This function performs the positional mapping,
1364/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1365fn map_to_output_schema(
1366    batch: &RecordBatch,
1367    label: &str,
1368    _variable: &str,
1369    projected_properties: &[String],
1370    output_schema: &SchemaRef,
1371    l0_ctx: &crate::query::df_graph::L0Context,
1372) -> DFResult<RecordBatch> {
1373    if batch.num_rows() == 0 {
1374        return Ok(RecordBatch::new_empty(output_schema.clone()));
1375    }
1376
1377    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1378
1379    // 1. {var}._vid
1380    let vid_col = batch
1381        .column_by_name("_vid")
1382        .ok_or_else(|| {
1383            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1384        })?
1385        .clone();
1386    let vid_arr = vid_col
1387        .as_any()
1388        .downcast_ref::<UInt64Array>()
1389        .ok_or_else(|| {
1390            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1391        })?;
1392
1393    // 2. {var}._labels — read from stored column, overlay L0 additions
1394    let batch_labels_col = batch
1395        .column_by_name("_labels")
1396        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1397    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1398    columns.push(vid_col.clone());
1399    columns.push(labels_col);
1400
1401    // 3. Projected properties
1402    // Pre-load overflow_json column for extracting non-schema properties
1403    let overflow_arr = batch
1404        .column_by_name("overflow_json")
1405        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1406
1407    for prop in projected_properties {
1408        if prop == "overflow_json" {
1409            match batch.column_by_name("overflow_json") {
1410                Some(col) => columns.push(col.clone()),
1411                None => {
1412                    // No overflow_json in Lance — return null column
1413                    columns.push(arrow_array::new_null_array(
1414                        &DataType::LargeBinary,
1415                        batch.num_rows(),
1416                    ));
1417                }
1418            }
1419        } else if prop == "_all_props" {
1420            // Build _all_props from overflow_json + L0 overlay.
1421            // Fast path: if no L0 buffer has vertex property mutations AND
1422            // there are no schema columns to merge, pass through overflow_json.
1423            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1424                let guard = l0.read();
1425                !guard.vertex_properties.is_empty()
1426            });
1427            // Check if this label has schema-defined columns (besides system columns)
1428            let has_schema_cols = projected_properties
1429                .iter()
1430                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1431
1432            if !any_l0_has_vertex_props && !has_schema_cols {
1433                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1434                match batch.column_by_name("overflow_json") {
1435                    Some(col) => columns.push(col.clone()),
1436                    None => {
1437                        columns.push(arrow_array::new_null_array(
1438                            &DataType::LargeBinary,
1439                            batch.num_rows(),
1440                        ));
1441                    }
1442                }
1443            } else {
1444                // Need to merge: schema columns + overflow_json + L0 overlay
1445                let col = build_all_props_column_for_schema_scan(
1446                    batch,
1447                    vid_arr,
1448                    overflow_arr,
1449                    projected_properties,
1450                    l0_ctx,
1451                );
1452                columns.push(col);
1453            }
1454        } else {
1455            match batch.column_by_name(prop) {
1456                Some(col) => columns.push(col.clone()),
1457                None => {
1458                    // Column missing in Lance -- extract from overflow_json
1459                    // CypherValue blob with L0 overlay
1460                    let col = build_overflow_property_column(
1461                        batch.num_rows(),
1462                        vid_arr,
1463                        overflow_arr,
1464                        prop,
1465                        l0_ctx,
1466                    );
1467                    columns.push(col);
1468                }
1469            }
1470        }
1471    }
1472
1473    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1474}
1475
1476/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1477///
1478/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1479/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1480/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1481/// `_version` are dropped.
1482fn map_edge_to_output_schema(
1483    batch: &RecordBatch,
1484    variable: &str,
1485    projected_properties: &[String],
1486    output_schema: &SchemaRef,
1487) -> DFResult<RecordBatch> {
1488    if batch.num_rows() == 0 {
1489        return Ok(RecordBatch::new_empty(output_schema.clone()));
1490    }
1491
1492    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1493
1494    // 1. {var}._eid
1495    let eid_col = batch
1496        .column_by_name("eid")
1497        .ok_or_else(|| {
1498            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1499        })?
1500        .clone();
1501    columns.push(eid_col);
1502
1503    // 2. {var}._src_vid
1504    let src_col = batch
1505        .column_by_name("src_vid")
1506        .ok_or_else(|| {
1507            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1508        })?
1509        .clone();
1510    columns.push(src_col);
1511
1512    // 3. {var}._dst_vid
1513    let dst_col = batch
1514        .column_by_name("dst_vid")
1515        .ok_or_else(|| {
1516            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1517        })?
1518        .clone();
1519    columns.push(dst_col);
1520
1521    // 4. Projected properties
1522    for prop in projected_properties {
1523        if prop == "overflow_json" {
1524            match batch.column_by_name("overflow_json") {
1525                Some(col) => columns.push(col.clone()),
1526                None => {
1527                    columns.push(arrow_array::new_null_array(
1528                        &DataType::LargeBinary,
1529                        batch.num_rows(),
1530                    ));
1531                }
1532            }
1533        } else {
1534            match batch.column_by_name(prop) {
1535                Some(col) => columns.push(col.clone()),
1536                None => {
1537                    // Column missing in Lance — extract from overflow_json CypherValue blob
1538                    // (mirrors the vertex path in map_to_output_schema)
1539                    let overflow_arr = batch
1540                        .column_by_name("overflow_json")
1541                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1542
1543                    if let Some(arr) = overflow_arr {
1544                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1545                        for i in 0..batch.num_rows() {
1546                            if !arr.is_null(i) {
1547                                let blob = arr.value(i);
1548                                // Fast path: extract map entry without decoding entire map
1549                                if let Some(sub_bytes) =
1550                                    uni_common::cypher_value_codec::extract_map_entry_raw(
1551                                        blob, prop,
1552                                    )
1553                                {
1554                                    builder.append_value(&sub_bytes);
1555                                } else {
1556                                    builder.append_null();
1557                                }
1558                            } else {
1559                                builder.append_null();
1560                            }
1561                        }
1562                        columns.push(Arc::new(builder.finish()));
1563                    } else {
1564                        // No overflow_json column either — return null column
1565                        let target_field = output_schema
1566                            .fields()
1567                            .iter()
1568                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
1569                        let dt = target_field
1570                            .map(|f| f.data_type().clone())
1571                            .unwrap_or(DataType::LargeBinary);
1572                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1573                    }
1574                }
1575            }
1576        }
1577    }
1578
1579    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1580}
1581
1582/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
1583///
1584/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
1585/// for known-label vertex scans. Reads all needed columns in a single Lance query,
1586/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
1587/// and maps to the output schema.
1588async fn columnar_scan_vertex_batch_static(
1589    graph_ctx: &GraphExecutionContext,
1590    label: &str,
1591    variable: &str,
1592    projected_properties: &[String],
1593    output_schema: &SchemaRef,
1594) -> DFResult<RecordBatch> {
1595    let storage = graph_ctx.storage();
1596    let l0_ctx = graph_ctx.l0_context();
1597    let uni_schema = storage.schema_manager().schema();
1598    let label_props = uni_schema.properties.get(label);
1599
1600    // Build the list of columns to request from Lance
1601    let mut lance_columns: Vec<String> = vec![
1602        "_vid".to_string(),
1603        "_deleted".to_string(),
1604        "_version".to_string(),
1605    ];
1606    for prop in projected_properties {
1607        if prop == "overflow_json" {
1608            push_column_if_absent(&mut lance_columns, "overflow_json");
1609        } else {
1610            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1611            if exists_in_schema {
1612                push_column_if_absent(&mut lance_columns, prop);
1613            }
1614        }
1615    }
1616
1617    // Ensure overflow_json is present when any projected property is not in the schema
1618    let needs_overflow = projected_properties
1619        .iter()
1620        .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1621    if needs_overflow {
1622        push_column_if_absent(&mut lance_columns, "overflow_json");
1623    }
1624
1625    // Try to query Lance via StorageManager domain method
1626    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1627    let lance_batch = storage
1628        .scan_vertex_table(label, &lance_columns_refs, None)
1629        .await
1630        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1631
1632    // MVCC dedup the Lance batch
1633    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1634
1635    // Build the internal Lance schema for L0 batch construction.
1636    // Use the Lance batch schema if available, otherwise build from scratch.
1637    let internal_schema = match &lance_deduped {
1638        Some(batch) => batch.schema(),
1639        None => {
1640            let mut fields = vec![
1641                Field::new("_vid", DataType::UInt64, false),
1642                Field::new("_deleted", DataType::Boolean, false),
1643                Field::new("_version", DataType::UInt64, false),
1644            ];
1645            for col in &lance_columns {
1646                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1647                    continue;
1648                }
1649                if col == "overflow_json" {
1650                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1651                } else {
1652                    let arrow_type = label_props
1653                        .and_then(|lp| lp.get(col.as_str()))
1654                        .map(|meta| meta.r#type.to_arrow())
1655                        .unwrap_or(DataType::LargeBinary);
1656                    fields.push(Field::new(col, arrow_type, true));
1657                }
1658            }
1659            Arc::new(Schema::new(fields))
1660        }
1661    };
1662
1663    // Build L0 batch
1664    let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1665
1666    // Merge Lance + L0
1667    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1668    else {
1669        return Ok(RecordBatch::new_empty(output_schema.clone()));
1670    };
1671
1672    // Filter out MVCC deletion tombstones (_deleted = true)
1673    let merged = filter_deleted_rows(&merged)?;
1674    if merged.num_rows() == 0 {
1675        return Ok(RecordBatch::new_empty(output_schema.clone()));
1676    }
1677
1678    // Filter L0 tombstones
1679    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1680
1681    if filtered.num_rows() == 0 {
1682        return Ok(RecordBatch::new_empty(output_schema.clone()));
1683    }
1684
1685    // Map to output schema
1686    map_to_output_schema(
1687        &filtered,
1688        label,
1689        variable,
1690        projected_properties,
1691        output_schema,
1692        l0_ctx,
1693    )
1694}
1695
1696/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
1697///
1698/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
1699/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
1700/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
1701/// to the output schema.
1702async fn columnar_scan_edge_batch_static(
1703    graph_ctx: &GraphExecutionContext,
1704    edge_type: &str,
1705    variable: &str,
1706    projected_properties: &[String],
1707    output_schema: &SchemaRef,
1708) -> DFResult<RecordBatch> {
1709    let storage = graph_ctx.storage();
1710    let l0_ctx = graph_ctx.l0_context();
1711    let uni_schema = storage.schema_manager().schema();
1712    let type_props = uni_schema.properties.get(edge_type);
1713
1714    // Build the list of columns to request from DeltaDataset Lance table
1715    let mut lance_columns: Vec<String> = vec![
1716        "eid".to_string(),
1717        "src_vid".to_string(),
1718        "dst_vid".to_string(),
1719        "op".to_string(),
1720        "_version".to_string(),
1721    ];
1722    for prop in projected_properties {
1723        if prop == "overflow_json" {
1724            push_column_if_absent(&mut lance_columns, "overflow_json");
1725        } else {
1726            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1727            if exists_in_schema {
1728                push_column_if_absent(&mut lance_columns, prop);
1729            }
1730        }
1731    }
1732
1733    // Ensure overflow_json is present when any projected property is not in the schema
1734    let needs_overflow = projected_properties
1735        .iter()
1736        .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1737    if needs_overflow {
1738        push_column_if_absent(&mut lance_columns, "overflow_json");
1739    }
1740
1741    // Try to query DeltaDataset (forward direction) via StorageManager domain method
1742    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1743    let lance_batch = storage
1744        .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
1745        .await
1746        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1747
1748    // MVCC dedup the Lance batch (by eid)
1749    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1750
1751    // Build the internal schema for L0 batch construction.
1752    // Use the Lance batch schema if available, otherwise build from scratch.
1753    let internal_schema = match &lance_deduped {
1754        Some(batch) => batch.schema(),
1755        None => {
1756            let mut fields = vec![
1757                Field::new("eid", DataType::UInt64, false),
1758                Field::new("src_vid", DataType::UInt64, false),
1759                Field::new("dst_vid", DataType::UInt64, false),
1760                Field::new("op", DataType::UInt8, false),
1761                Field::new("_version", DataType::UInt64, false),
1762            ];
1763            for col in &lance_columns {
1764                if matches!(
1765                    col.as_str(),
1766                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1767                ) {
1768                    continue;
1769                }
1770                if col == "overflow_json" {
1771                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1772                } else {
1773                    let arrow_type = type_props
1774                        .and_then(|tp| tp.get(col.as_str()))
1775                        .map(|meta| meta.r#type.to_arrow())
1776                        .unwrap_or(DataType::LargeBinary);
1777                    fields.push(Field::new(col, arrow_type, true));
1778                }
1779            }
1780            Arc::new(Schema::new(fields))
1781        }
1782    };
1783
1784    // Build L0 batch
1785    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1786
1787    // Merge Lance + L0
1788    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1789        return Ok(RecordBatch::new_empty(output_schema.clone()));
1790    };
1791
1792    // Filter out MVCC deletion ops (op != 0) after dedup
1793    let merged = filter_deleted_edge_ops(&merged)?;
1794    if merged.num_rows() == 0 {
1795        return Ok(RecordBatch::new_empty(output_schema.clone()));
1796    }
1797
1798    // Filter L0 edge tombstones
1799    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1800
1801    if filtered.num_rows() == 0 {
1802        return Ok(RecordBatch::new_empty(output_schema.clone()));
1803    }
1804
1805    // Map to output schema
1806    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1807}
1808
1809/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
1810///
1811/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
1812/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
1813/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
1814/// buffer data, filters tombstones, and maps to the output schema.
1815async fn columnar_scan_schemaless_vertex_batch_static(
1816    graph_ctx: &GraphExecutionContext,
1817    label: &str,
1818    variable: &str,
1819    projected_properties: &[String],
1820    output_schema: &SchemaRef,
1821) -> DFResult<RecordBatch> {
1822    let storage = graph_ctx.storage();
1823    let l0_ctx = graph_ctx.l0_context();
1824
1825    // Build the Lance filter expression — do NOT filter _deleted here;
1826    // MVCC dedup must see deletion tombstones to pick the highest version.
1827    let filter = {
1828        let mut parts = Vec::new();
1829
1830        // Label filter
1831        if !label.is_empty() {
1832            if label.contains(':') {
1833                // Multi-label: each label must be present
1834                for lbl in label.split(':') {
1835                    parts.push(format!("array_contains(labels, '{}')", lbl));
1836                }
1837            } else {
1838                parts.push(format!("array_contains(labels, '{}')", label));
1839            }
1840        }
1841
1842        if parts.is_empty() {
1843            None
1844        } else {
1845            Some(parts.join(" AND "))
1846        }
1847    };
1848
1849    // Single Lance query via StorageManager domain method
1850    let lance_batch = storage
1851        .scan_main_vertex_table(
1852            &["_vid", "_deleted", "labels", "props_json", "_version"],
1853            filter.as_deref(),
1854        )
1855        .await
1856        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1857
1858    // MVCC dedup the Lance batch
1859    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1860
1861    // Build the internal schema for L0 batch construction.
1862    // Use the Lance batch schema if available, otherwise build from scratch.
1863    let internal_schema = match &lance_deduped {
1864        Some(batch) => batch.schema(),
1865        None => Arc::new(Schema::new(vec![
1866            Field::new("_vid", DataType::UInt64, false),
1867            Field::new("_deleted", DataType::Boolean, false),
1868            Field::new("labels", labels_data_type(), false),
1869            Field::new("props_json", DataType::LargeBinary, true),
1870            Field::new("_version", DataType::UInt64, false),
1871        ])),
1872    };
1873
1874    // Build L0 batch
1875    let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1876
1877    // Merge Lance + L0
1878    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1879    else {
1880        return Ok(RecordBatch::new_empty(output_schema.clone()));
1881    };
1882
1883    // Filter out MVCC deletion tombstones (_deleted = true)
1884    let merged = filter_deleted_rows(&merged)?;
1885    if merged.num_rows() == 0 {
1886        return Ok(RecordBatch::new_empty(output_schema.clone()));
1887    }
1888
1889    // Filter L0 tombstones
1890    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1891
1892    if filtered.num_rows() == 0 {
1893        return Ok(RecordBatch::new_empty(output_schema.clone()));
1894    }
1895
1896    // Map to output schema
1897    map_to_schemaless_output_schema(
1898        &filtered,
1899        variable,
1900        projected_properties,
1901        output_schema,
1902        l0_ctx,
1903    )
1904}
1905
1906/// Build a RecordBatch from L0 buffer data for schemaless vertices.
1907///
1908/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1909/// with later buffers overwriting earlier ones for the same VID. Produces a batch
1910/// matching the internal schema: `_vid, labels, props_json, _version`.
1911fn build_l0_schemaless_vertex_batch(
1912    l0_ctx: &crate::query::df_graph::L0Context,
1913    label: &str,
1914    internal_schema: &SchemaRef,
1915) -> DFResult<RecordBatch> {
1916    // Collect all L0 vertex data, merging in visibility order
1917    // vid -> (merged_props, highest_version, labels)
1918    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1919    let mut tombstones: HashSet<u64> = HashSet::new();
1920
1921    // Parse multi-label filter
1922    let label_filter: Vec<&str> = if label.is_empty() {
1923        vec![]
1924    } else if label.contains(':') {
1925        label.split(':').collect()
1926    } else {
1927        vec![label]
1928    };
1929
1930    for l0 in l0_ctx.iter_l0_buffers() {
1931        let guard = l0.read();
1932
1933        // Collect tombstones
1934        for vid in guard.vertex_tombstones.iter() {
1935            tombstones.insert(vid.as_u64());
1936        }
1937
1938        // Collect VIDs matching the label filter
1939        let vids: Vec<Vid> = if label_filter.is_empty() {
1940            guard.all_vertex_vids()
1941        } else if label_filter.len() == 1 {
1942            guard.vids_for_label(label_filter[0])
1943        } else {
1944            guard.vids_with_all_labels(&label_filter)
1945        };
1946
1947        for vid in vids {
1948            let vid_u64 = vid.as_u64();
1949            if tombstones.contains(&vid_u64) {
1950                continue;
1951            }
1952            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1953            let entry = vid_data
1954                .entry(vid_u64)
1955                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1956
1957            // Merge properties (later L0 overwrites)
1958            if let Some(props) = guard.vertex_properties.get(&vid) {
1959                for (k, v) in props {
1960                    entry.0.insert(k.clone(), v.clone());
1961                }
1962            }
1963            // Take the highest version
1964            if version > entry.1 {
1965                entry.1 = version;
1966            }
1967            // Update labels from latest L0 layer
1968            if let Some(labels) = guard.vertex_labels.get(&vid) {
1969                entry.2 = labels.clone();
1970            }
1971        }
1972    }
1973
1974    // Remove tombstoned VIDs
1975    for t in &tombstones {
1976        vid_data.remove(t);
1977    }
1978
1979    if vid_data.is_empty() {
1980        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1981    }
1982
1983    // Sort VIDs for deterministic output
1984    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1985    vids.sort_unstable();
1986
1987    let num_rows = vids.len();
1988    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1989
1990    for field in internal_schema.fields() {
1991        match field.name().as_str() {
1992            "_vid" => {
1993                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1994            }
1995            "labels" => {
1996                let mut labels_builder = ListBuilder::new(StringBuilder::new());
1997                for vid_u64 in &vids {
1998                    let (_, _, labels) = &vid_data[vid_u64];
1999                    let values = labels_builder.values();
2000                    for lbl in labels {
2001                        values.append_value(lbl);
2002                    }
2003                    labels_builder.append(true);
2004                }
2005                columns.push(Arc::new(labels_builder.finish()));
2006            }
2007            "props_json" => {
2008                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2009                for vid_u64 in &vids {
2010                    let (props, _, _) = &vid_data[vid_u64];
2011                    if props.is_empty() {
2012                        builder.append_null();
2013                    } else {
2014                        // Encode properties as CypherValue blob
2015                        let json_obj: serde_json::Value = {
2016                            let mut map = serde_json::Map::new();
2017                            for (k, v) in props {
2018                                let json_val: serde_json::Value = v.clone().into();
2019                                map.insert(k.clone(), json_val);
2020                            }
2021                            serde_json::Value::Object(map)
2022                        };
2023                        match encode_cypher_value(&json_obj) {
2024                            Ok(bytes) => builder.append_value(bytes),
2025                            Err(_) => builder.append_null(),
2026                        }
2027                    }
2028                }
2029                columns.push(Arc::new(builder.finish()));
2030            }
2031            "_deleted" => {
2032                // L0 vertices are always live (tombstoned ones already excluded)
2033                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2034                    false;
2035                    num_rows
2036                ])));
2037            }
2038            "_version" => {
2039                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2040                columns.push(Arc::new(UInt64Array::from(vals)));
2041            }
2042            _ => {
2043                // Unexpected column — fill with nulls
2044                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2045            }
2046        }
2047    }
2048
2049    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2050}
2051
2052/// Map an internal-schema schemaless batch to the DataFusion output schema.
2053///
2054/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2055/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2056/// Individual properties are extracted from the `props_json` CypherValue blob by
2057/// decoding to a Map and extracting the sub-value.
2058fn map_to_schemaless_output_schema(
2059    batch: &RecordBatch,
2060    _variable: &str,
2061    projected_properties: &[String],
2062    output_schema: &SchemaRef,
2063    l0_ctx: &crate::query::df_graph::L0Context,
2064) -> DFResult<RecordBatch> {
2065    if batch.num_rows() == 0 {
2066        return Ok(RecordBatch::new_empty(output_schema.clone()));
2067    }
2068
2069    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2070
2071    // 1. {var}._vid — passthrough
2072    let vid_col = batch
2073        .column_by_name("_vid")
2074        .ok_or_else(|| {
2075            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2076        })?
2077        .clone();
2078    let vid_arr = vid_col
2079        .as_any()
2080        .downcast_ref::<UInt64Array>()
2081        .ok_or_else(|| {
2082            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2083        })?;
2084    columns.push(vid_col.clone());
2085
2086    // 2. {var}._labels — from labels column with L0 overlay
2087    let labels_col = batch.column_by_name("labels");
2088    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2089
2090    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2091    for i in 0..vid_arr.len() {
2092        let vid_u64 = vid_arr.value(i);
2093        let vid = Vid::from(vid_u64);
2094
2095        // Start with labels from the batch
2096        let mut row_labels: Vec<String> = Vec::new();
2097        if let Some(arr) = labels_arr
2098            && !arr.is_null(i)
2099        {
2100            let list_val = arr.value(i);
2101            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2102                for j in 0..str_arr.len() {
2103                    if !str_arr.is_null(j) {
2104                        row_labels.push(str_arr.value(j).to_string());
2105                    }
2106                }
2107            }
2108        }
2109
2110        // Overlay L0 labels
2111        for l0 in l0_ctx.iter_l0_buffers() {
2112            let guard = l0.read();
2113            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2114                for lbl in l0_labels {
2115                    if !row_labels.contains(lbl) {
2116                        row_labels.push(lbl.clone());
2117                    }
2118                }
2119            }
2120        }
2121
2122        let values = labels_builder.values();
2123        for lbl in &row_labels {
2124            values.append_value(lbl);
2125        }
2126        labels_builder.append(true);
2127    }
2128    columns.push(Arc::new(labels_builder.finish()));
2129
2130    // 3. Projected properties — extract from props_json
2131    let props_col = batch.column_by_name("props_json");
2132    let props_arr =
2133        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2134
2135    for prop in projected_properties {
2136        if prop == "_all_props" {
2137            // Fast path: if no L0 buffer has vertex property mutations,
2138            // the raw props_json passthrough is correct.
2139            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2140                let guard = l0.read();
2141                !guard.vertex_properties.is_empty()
2142            });
2143            if !any_l0_has_vertex_props {
2144                match props_col {
2145                    Some(col) => columns.push(col.clone()),
2146                    None => {
2147                        columns.push(arrow_array::new_null_array(
2148                            &DataType::LargeBinary,
2149                            batch.num_rows(),
2150                        ));
2151                    }
2152                }
2153            } else {
2154                let col = build_all_props_column_with_l0_overlay(
2155                    batch.num_rows(),
2156                    vid_arr,
2157                    props_arr,
2158                    l0_ctx,
2159                );
2160                columns.push(col);
2161            }
2162        } else {
2163            // Extract individual property from CypherValue blob with L0 overlay.
2164            // The raw column is LargeBinary (CypherValue-encoded). If the output
2165            // schema expects a typed column (e.g., Utf8 for String properties),
2166            // decode the CypherValue and build the correct Arrow type.
2167            let expected_type = output_schema
2168                .field_with_name(&format!("{_variable}.{prop}"))
2169                .map(|f| f.data_type().clone())
2170                .unwrap_or(DataType::LargeBinary);
2171
2172            if expected_type == DataType::LargeBinary {
2173                let col = build_overflow_property_column(
2174                    batch.num_rows(),
2175                    vid_arr,
2176                    props_arr,
2177                    prop,
2178                    l0_ctx,
2179                );
2180                columns.push(col);
2181            } else {
2182                // Decode CypherValue to the expected type via build_property_column_static.
2183                let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2184                for i in 0..batch.num_rows() {
2185                    let vid = Vid::from(vid_arr.value(i));
2186                    let resolved =
2187                        resolve_l0_property(&vid, prop, l0_ctx)
2188                            .flatten()
2189                            .or_else(|| {
2190                                extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2191                                    uni_common::cypher_value_codec::decode(&bytes).ok()
2192                                })
2193                            });
2194                    if let Some(val) = resolved {
2195                        prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2196                    }
2197                }
2198                let vids: Vec<Vid> = (0..batch.num_rows())
2199                    .map(|i| Vid::from(vid_arr.value(i)))
2200                    .collect();
2201                let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2202                    .unwrap_or_else(|_| {
2203                        arrow_array::new_null_array(&expected_type, batch.num_rows())
2204                    });
2205                columns.push(col);
2206            }
2207        }
2208    }
2209
2210    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2211}
2212
2213/// Get the property value for a VID, returning None if not found.
2214pub(crate) fn get_property_value(
2215    vid: &Vid,
2216    props_map: &HashMap<Vid, Properties>,
2217    prop_name: &str,
2218) -> Option<Value> {
2219    if prop_name == "_all_props" {
2220        return props_map.get(vid).map(|p| {
2221            let map: HashMap<String, Value> =
2222                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2223            Value::Map(map)
2224        });
2225    }
2226    props_map
2227        .get(vid)
2228        .and_then(|props| props.get(prop_name))
2229        .cloned()
2230}
2231
2232/// Encode a `serde_json::Value` as CypherValue binary (MessagePack-tagged).
2233///
2234/// Converts from serde_json::Value -> uni_common::Value -> CypherValue bytes.
2235pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2236    let uni_val: uni_common::Value = val.clone().into();
2237    Ok(uni_common::cypher_value_codec::encode(&uni_val))
2238}
2239
2240/// Build a numeric column from property values using the specified builder and extractor.
2241macro_rules! build_numeric_column {
2242    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2243        let mut builder = <$builder_ty>::new();
2244        for vid in $vids {
2245            match get_property_value(vid, $props_map, $prop_name) {
2246                Some(ref v) => {
2247                    if let Some(val) = $extractor(v) {
2248                        builder.append_value($cast(val));
2249                    } else {
2250                        builder.append_null();
2251                    }
2252                }
2253                None => builder.append_null(),
2254            }
2255        }
2256        Ok(Arc::new(builder.finish()) as ArrayRef)
2257    }};
2258}
2259
2260/// Build an Arrow column from property values (static version).
2261pub(crate) fn build_property_column_static(
2262    vids: &[Vid],
2263    props_map: &HashMap<Vid, Properties>,
2264    prop_name: &str,
2265    data_type: &DataType,
2266) -> DFResult<ArrayRef> {
2267    match data_type {
2268        DataType::LargeBinary => {
2269            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2270            use arrow_array::builder::LargeBinaryBuilder;
2271            let mut builder = LargeBinaryBuilder::new();
2272
2273            for vid in vids {
2274                match get_property_value(vid, props_map, prop_name) {
2275                    Some(Value::Null) | None => builder.append_null(),
2276                    Some(Value::Bytes(bytes)) => {
2277                        builder.append_value(&bytes);
2278                    }
2279                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2280                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2281                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2282                        let bytes: Vec<u8> = arr
2283                            .iter()
2284                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2285                            .collect();
2286                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2287                            builder.append_value(&bytes);
2288                        } else {
2289                            let json_val: serde_json::Value = Value::List(arr).into();
2290                            match encode_cypher_value(&json_val) {
2291                                Ok(encoded) => builder.append_value(encoded),
2292                                Err(_) => builder.append_null(),
2293                            }
2294                        }
2295                    }
2296                    Some(val) => {
2297                        // Value from PropertyManager — convert to serde_json and re-encode to CypherValue binary
2298                        let json_val: serde_json::Value = val.into();
2299                        match encode_cypher_value(&json_val) {
2300                            Ok(bytes) => builder.append_value(bytes),
2301                            Err(_) => builder.append_null(),
2302                        }
2303                    }
2304                }
2305            }
2306            Ok(Arc::new(builder.finish()))
2307        }
2308        DataType::Binary => {
2309            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2310            let mut builder = BinaryBuilder::new();
2311            for vid in vids {
2312                let bytes = get_property_value(vid, props_map, prop_name)
2313                    .filter(|v| !v.is_null())
2314                    .and_then(|v| {
2315                        let json_val: serde_json::Value = v.into();
2316                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2317                    })
2318                    .and_then(|crdt| crdt.to_msgpack().ok());
2319                match bytes {
2320                    Some(b) => builder.append_value(&b),
2321                    None => builder.append_null(),
2322                }
2323            }
2324            Ok(Arc::new(builder.finish()))
2325        }
2326        DataType::Utf8 => {
2327            let mut builder = StringBuilder::new();
2328            for vid in vids {
2329                match get_property_value(vid, props_map, prop_name) {
2330                    Some(Value::String(s)) => builder.append_value(s),
2331                    Some(Value::Null) | None => builder.append_null(),
2332                    Some(other) => builder.append_value(other.to_string()),
2333                }
2334            }
2335            Ok(Arc::new(builder.finish()))
2336        }
2337        DataType::Int64 => {
2338            build_numeric_column!(
2339                vids,
2340                props_map,
2341                prop_name,
2342                Int64Builder,
2343                |v: &Value| v.as_i64(),
2344                |v| v
2345            )
2346        }
2347        DataType::Int32 => {
2348            build_numeric_column!(
2349                vids,
2350                props_map,
2351                prop_name,
2352                Int32Builder,
2353                |v: &Value| v.as_i64(),
2354                |v: i64| v as i32
2355            )
2356        }
2357        DataType::Float64 => {
2358            build_numeric_column!(
2359                vids,
2360                props_map,
2361                prop_name,
2362                Float64Builder,
2363                |v: &Value| v.as_f64(),
2364                |v| v
2365            )
2366        }
2367        DataType::Float32 => {
2368            build_numeric_column!(
2369                vids,
2370                props_map,
2371                prop_name,
2372                Float32Builder,
2373                |v: &Value| v.as_f64(),
2374                |v: f64| v as f32
2375            )
2376        }
2377        DataType::Boolean => {
2378            let mut builder = BooleanBuilder::new();
2379            for vid in vids {
2380                match get_property_value(vid, props_map, prop_name) {
2381                    Some(Value::Bool(b)) => builder.append_value(b),
2382                    _ => builder.append_null(),
2383                }
2384            }
2385            Ok(Arc::new(builder.finish()))
2386        }
2387        DataType::UInt64 => {
2388            build_numeric_column!(
2389                vids,
2390                props_map,
2391                prop_name,
2392                UInt64Builder,
2393                |v: &Value| v.as_u64(),
2394                |v| v
2395            )
2396        }
2397        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2398            // Vector properties: FixedSizeList(Float32, N)
2399            let values_builder = Float32Builder::new();
2400            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2401            for vid in vids {
2402                match get_property_value(vid, props_map, prop_name) {
2403                    Some(Value::Vector(v)) => {
2404                        for val in v {
2405                            list_builder.values().append_value(val);
2406                        }
2407                        list_builder.append(true);
2408                    }
2409                    Some(Value::List(arr)) => {
2410                        for v in arr {
2411                            list_builder
2412                                .values()
2413                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
2414                        }
2415                        list_builder.append(true);
2416                    }
2417                    _ => {
2418                        // Append dim nulls to inner values, then mark row as null
2419                        for _ in 0..*dim {
2420                            list_builder.values().append_null();
2421                        }
2422                        list_builder.append(false);
2423                    }
2424                }
2425            }
2426            Ok(Arc::new(list_builder.finish()))
2427        }
2428        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2429            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
2430            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2431            for vid in vids {
2432                match get_property_value(vid, props_map, prop_name) {
2433                    Some(Value::Temporal(tv)) => match tv {
2434                        uni_common::TemporalValue::DateTime {
2435                            nanos_since_epoch, ..
2436                        }
2437                        | uni_common::TemporalValue::LocalDateTime {
2438                            nanos_since_epoch, ..
2439                        } => {
2440                            builder.append_value(nanos_since_epoch);
2441                        }
2442                        uni_common::TemporalValue::Date { days_since_epoch } => {
2443                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2444                        }
2445                        _ => builder.append_null(),
2446                    },
2447                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
2448                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2449                        Err(_) => builder.append_null(),
2450                    },
2451                    Some(Value::Int(n)) => {
2452                        builder.append_value(n);
2453                    }
2454                    _ => builder.append_null(),
2455                }
2456            }
2457            Ok(Arc::new(builder.finish()))
2458        }
2459        DataType::Date32 => {
2460            let mut builder = Date32Builder::new();
2461            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2462            for vid in vids {
2463                match get_property_value(vid, props_map, prop_name) {
2464                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2465                        builder.append_value(days_since_epoch);
2466                    }
2467                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2468                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2469                        Err(_) => builder.append_null(),
2470                    },
2471                    Some(Value::Int(n)) => {
2472                        builder.append_value(n as i32);
2473                    }
2474                    _ => builder.append_null(),
2475                }
2476            }
2477            Ok(Arc::new(builder.finish()))
2478        }
2479        DataType::Time64(TimeUnit::Nanosecond) => {
2480            let mut builder = Time64NanosecondBuilder::new();
2481            for vid in vids {
2482                match get_property_value(vid, props_map, prop_name) {
2483                    Some(Value::Temporal(
2484                        uni_common::TemporalValue::LocalTime {
2485                            nanos_since_midnight,
2486                        }
2487                        | uni_common::TemporalValue::Time {
2488                            nanos_since_midnight,
2489                            ..
2490                        },
2491                    )) => {
2492                        builder.append_value(nanos_since_midnight);
2493                    }
2494                    Some(Value::Temporal(_)) => builder.append_null(),
2495                    Some(Value::String(s)) => {
2496                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2497                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2498                        {
2499                            Ok(t) => {
2500                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2501                                    + t.nanosecond() as i64;
2502                                builder.append_value(nanos);
2503                            }
2504                            Err(_) => builder.append_null(),
2505                        }
2506                    }
2507                    Some(Value::Int(n)) => {
2508                        builder.append_value(n);
2509                    }
2510                    _ => builder.append_null(),
2511                }
2512            }
2513            Ok(Arc::new(builder.finish()))
2514        }
2515        DataType::Interval(IntervalUnit::MonthDayNano) => {
2516            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2517                Vec::with_capacity(vids.len());
2518            for vid in vids {
2519                match get_property_value(vid, props_map, prop_name) {
2520                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
2521                        months,
2522                        days,
2523                        nanos,
2524                    })) => {
2525                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2526                            months: months as i32,
2527                            days: days as i32,
2528                            nanoseconds: nanos,
2529                        }));
2530                    }
2531                    Some(Value::Int(_n)) => {
2532                        values.push(None);
2533                    }
2534                    _ => values.push(None),
2535                }
2536            }
2537            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2538            Ok(Arc::new(arr))
2539        }
2540        DataType::List(inner_field) => {
2541            build_list_property_column(vids, props_map, prop_name, inner_field)
2542        }
2543        DataType::Struct(fields) => {
2544            build_struct_property_column(vids, props_map, prop_name, fields)
2545        }
2546        // Default: convert to string
2547        _ => {
2548            let mut builder = StringBuilder::new();
2549            for vid in vids {
2550                match get_property_value(vid, props_map, prop_name) {
2551                    Some(Value::Null) | None => builder.append_null(),
2552                    Some(other) => builder.append_value(other.to_string()),
2553                }
2554            }
2555            Ok(Arc::new(builder.finish()))
2556        }
2557    }
2558}
2559
2560/// Build a List-typed Arrow column from list property values.
2561fn build_list_property_column(
2562    vids: &[Vid],
2563    props_map: &HashMap<Vid, Properties>,
2564    prop_name: &str,
2565    inner_field: &Arc<Field>,
2566) -> DFResult<ArrayRef> {
2567    match inner_field.data_type() {
2568        DataType::Utf8 => {
2569            let mut builder = ListBuilder::new(StringBuilder::new());
2570            for vid in vids {
2571                match get_property_value(vid, props_map, prop_name) {
2572                    Some(Value::List(arr)) => {
2573                        for v in arr {
2574                            match v {
2575                                Value::String(s) => builder.values().append_value(s),
2576                                Value::Null => builder.values().append_null(),
2577                                other => builder.values().append_value(format!("{other:?}")),
2578                            }
2579                        }
2580                        builder.append(true);
2581                    }
2582                    _ => builder.append(false),
2583                }
2584            }
2585            Ok(Arc::new(builder.finish()))
2586        }
2587        DataType::Int64 => {
2588            let mut builder = ListBuilder::new(Int64Builder::new());
2589            for vid in vids {
2590                match get_property_value(vid, props_map, prop_name) {
2591                    Some(Value::List(arr)) => {
2592                        for v in arr {
2593                            match v.as_i64() {
2594                                Some(n) => builder.values().append_value(n),
2595                                None => builder.values().append_null(),
2596                            }
2597                        }
2598                        builder.append(true);
2599                    }
2600                    _ => builder.append(false),
2601                }
2602            }
2603            Ok(Arc::new(builder.finish()))
2604        }
2605        DataType::Float64 => {
2606            let mut builder = ListBuilder::new(Float64Builder::new());
2607            for vid in vids {
2608                match get_property_value(vid, props_map, prop_name) {
2609                    Some(Value::List(arr)) => {
2610                        for v in arr {
2611                            match v.as_f64() {
2612                                Some(n) => builder.values().append_value(n),
2613                                None => builder.values().append_null(),
2614                            }
2615                        }
2616                        builder.append(true);
2617                    }
2618                    _ => builder.append(false),
2619                }
2620            }
2621            Ok(Arc::new(builder.finish()))
2622        }
2623        DataType::Boolean => {
2624            let mut builder = ListBuilder::new(BooleanBuilder::new());
2625            for vid in vids {
2626                match get_property_value(vid, props_map, prop_name) {
2627                    Some(Value::List(arr)) => {
2628                        for v in arr {
2629                            match v.as_bool() {
2630                                Some(b) => builder.values().append_value(b),
2631                                None => builder.values().append_null(),
2632                            }
2633                        }
2634                        builder.append(true);
2635                    }
2636                    _ => builder.append(false),
2637                }
2638            }
2639            Ok(Arc::new(builder.finish()))
2640        }
2641        DataType::Struct(fields) => {
2642            // Map types are List(Struct(key, value)) — build struct inner elements
2643            build_list_of_structs_column(vids, props_map, prop_name, fields)
2644        }
2645        // Fallback: serialize inner elements as strings
2646        _ => {
2647            let mut builder = ListBuilder::new(StringBuilder::new());
2648            for vid in vids {
2649                match get_property_value(vid, props_map, prop_name) {
2650                    Some(Value::List(arr)) => {
2651                        for v in arr {
2652                            match v {
2653                                Value::Null => builder.values().append_null(),
2654                                other => builder.values().append_value(format!("{other:?}")),
2655                            }
2656                        }
2657                        builder.append(true);
2658                    }
2659                    _ => builder.append(false),
2660                }
2661            }
2662            Ok(Arc::new(builder.finish()))
2663        }
2664    }
2665}
2666
2667/// Build a List(Struct(...)) column, used for Map-type properties.
2668///
2669/// Handles two value representations:
2670/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
2671/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
2672fn build_list_of_structs_column(
2673    vids: &[Vid],
2674    props_map: &HashMap<Vid, Properties>,
2675    prop_name: &str,
2676    fields: &Fields,
2677) -> DFResult<ArrayRef> {
2678    use arrow_array::StructArray;
2679
2680    let values: Vec<Option<Value>> = vids
2681        .iter()
2682        .map(|vid| get_property_value(vid, props_map, prop_name))
2683        .collect();
2684
2685    // Convert each row's value to an owned Vec of Maps (key-value pairs).
2686    // This normalizes both List-of-maps and Map representations.
2687    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2688        .iter()
2689        .map(|val| match val {
2690            Some(Value::List(arr)) => {
2691                let objs: Vec<HashMap<String, Value>> = arr
2692                    .iter()
2693                    .filter_map(|v| {
2694                        if let Value::Map(m) = v {
2695                            Some(m.clone())
2696                        } else {
2697                            None
2698                        }
2699                    })
2700                    .collect();
2701                if objs.is_empty() { None } else { Some(objs) }
2702            }
2703            Some(Value::Map(obj)) => {
2704                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
2705                let kv_pairs: Vec<HashMap<String, Value>> = obj
2706                    .iter()
2707                    .map(|(k, v)| {
2708                        let mut m = HashMap::new();
2709                        m.insert("key".to_string(), Value::String(k.clone()));
2710                        m.insert("value".to_string(), v.clone());
2711                        m
2712                    })
2713                    .collect();
2714                Some(kv_pairs)
2715            }
2716            _ => None,
2717        })
2718        .collect();
2719
2720    let total_items: usize = rows
2721        .iter()
2722        .filter_map(|r| r.as_ref())
2723        .map(|v| v.len())
2724        .sum();
2725
2726    // Build child arrays for each field in the struct
2727    let child_arrays: Vec<ArrayRef> = fields
2728        .iter()
2729        .map(|field| {
2730            let field_name = field.name();
2731            match field.data_type() {
2732                DataType::Utf8 => {
2733                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2734                    for obj in rows.iter().flatten().flatten() {
2735                        match obj.get(field_name) {
2736                            Some(Value::String(s)) => builder.append_value(s),
2737                            Some(Value::Null) | None => builder.append_null(),
2738                            Some(other) => builder.append_value(format!("{other:?}")),
2739                        }
2740                    }
2741                    Arc::new(builder.finish()) as ArrayRef
2742                }
2743                DataType::Int64 => {
2744                    let mut builder = Int64Builder::with_capacity(total_items);
2745                    for obj in rows.iter().flatten().flatten() {
2746                        match obj.get(field_name).and_then(|v| v.as_i64()) {
2747                            Some(n) => builder.append_value(n),
2748                            None => builder.append_null(),
2749                        }
2750                    }
2751                    Arc::new(builder.finish()) as ArrayRef
2752                }
2753                DataType::Float64 => {
2754                    let mut builder = Float64Builder::with_capacity(total_items);
2755                    for obj in rows.iter().flatten().flatten() {
2756                        match obj.get(field_name).and_then(|v| v.as_f64()) {
2757                            Some(n) => builder.append_value(n),
2758                            None => builder.append_null(),
2759                        }
2760                    }
2761                    Arc::new(builder.finish()) as ArrayRef
2762                }
2763                // Fallback: serialize as string
2764                _ => {
2765                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2766                    for obj in rows.iter().flatten().flatten() {
2767                        match obj.get(field_name) {
2768                            Some(Value::Null) | None => builder.append_null(),
2769                            Some(other) => builder.append_value(format!("{other:?}")),
2770                        }
2771                    }
2772                    Arc::new(builder.finish()) as ArrayRef
2773                }
2774            }
2775        })
2776        .collect();
2777
2778    // Build struct array from children
2779    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2780        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2781
2782    // Build list offsets
2783    let mut offsets = Vec::with_capacity(vids.len() + 1);
2784    let mut nulls = Vec::with_capacity(vids.len());
2785    let mut offset = 0i32;
2786    offsets.push(offset);
2787    for row in &rows {
2788        match row {
2789            Some(objs) => {
2790                offset += objs.len() as i32;
2791                offsets.push(offset);
2792                nulls.push(true);
2793            }
2794            None => {
2795                offsets.push(offset);
2796                nulls.push(false);
2797            }
2798        }
2799    }
2800
2801    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2802    let list_array = arrow_array::ListArray::try_new(
2803        list_field,
2804        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2805        Arc::new(struct_array),
2806        Some(arrow::buffer::NullBuffer::from(nulls)),
2807    )
2808    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2809
2810    Ok(Arc::new(list_array))
2811}
2812
2813/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
2814/// so that `build_struct_property_column` can extract fields uniformly.
2815fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2816    use uni_common::value::TemporalValue;
2817    let mut m = HashMap::new();
2818    match tv {
2819        TemporalValue::DateTime {
2820            nanos_since_epoch,
2821            offset_seconds,
2822            timezone_name,
2823        } => {
2824            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2825            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2826            if let Some(tz) = timezone_name {
2827                m.insert("timezone_name".into(), Value::String(tz.clone()));
2828            }
2829        }
2830        TemporalValue::LocalDateTime { nanos_since_epoch } => {
2831            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2832        }
2833        TemporalValue::Time {
2834            nanos_since_midnight,
2835            offset_seconds,
2836        } => {
2837            m.insert(
2838                "nanos_since_midnight".into(),
2839                Value::Int(*nanos_since_midnight),
2840            );
2841            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2842        }
2843        TemporalValue::LocalTime {
2844            nanos_since_midnight,
2845        } => {
2846            m.insert(
2847                "nanos_since_midnight".into(),
2848                Value::Int(*nanos_since_midnight),
2849            );
2850        }
2851        TemporalValue::Date { days_since_epoch } => {
2852            m.insert(
2853                "days_since_epoch".into(),
2854                Value::Int(*days_since_epoch as i64),
2855            );
2856        }
2857        TemporalValue::Duration {
2858            months,
2859            days,
2860            nanos,
2861        } => {
2862            m.insert("months".into(), Value::Int(*months));
2863            m.insert("days".into(), Value::Int(*days));
2864            m.insert("nanos".into(), Value::Int(*nanos));
2865        }
2866    }
2867    m
2868}
2869
2870/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
2871fn build_struct_property_column(
2872    vids: &[Vid],
2873    props_map: &HashMap<Vid, Properties>,
2874    prop_name: &str,
2875    fields: &Fields,
2876) -> DFResult<ArrayRef> {
2877    use arrow_array::StructArray;
2878
2879    // Convert raw values, expanding Temporal values into Map representation
2880    // so the struct field extraction below works uniformly.
2881    let values: Vec<Option<Value>> = vids
2882        .iter()
2883        .map(|vid| {
2884            let val = get_property_value(vid, props_map, prop_name);
2885            match val {
2886                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2887                other => other,
2888            }
2889        })
2890        .collect();
2891
2892    let child_arrays: Vec<ArrayRef> = fields
2893        .iter()
2894        .map(|field| {
2895            let field_name = field.name();
2896            match field.data_type() {
2897                DataType::Float64 => {
2898                    let mut builder = Float64Builder::with_capacity(vids.len());
2899                    for val in &values {
2900                        match val {
2901                            Some(Value::Map(obj)) => {
2902                                match obj.get(field_name).and_then(|v| v.as_f64()) {
2903                                    Some(n) => builder.append_value(n),
2904                                    None => builder.append_null(),
2905                                }
2906                            }
2907                            _ => builder.append_null(),
2908                        }
2909                    }
2910                    Arc::new(builder.finish()) as ArrayRef
2911                }
2912                DataType::Utf8 => {
2913                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2914                    for val in &values {
2915                        match val {
2916                            Some(Value::Map(obj)) => match obj.get(field_name) {
2917                                Some(Value::String(s)) => builder.append_value(s),
2918                                Some(Value::Null) | None => builder.append_null(),
2919                                Some(other) => builder.append_value(format!("{other:?}")),
2920                            },
2921                            _ => builder.append_null(),
2922                        }
2923                    }
2924                    Arc::new(builder.finish()) as ArrayRef
2925                }
2926                DataType::Int64 => {
2927                    let mut builder = Int64Builder::with_capacity(vids.len());
2928                    for val in &values {
2929                        match val {
2930                            Some(Value::Map(obj)) => {
2931                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2932                                    Some(n) => builder.append_value(n),
2933                                    None => builder.append_null(),
2934                                }
2935                            }
2936                            _ => builder.append_null(),
2937                        }
2938                    }
2939                    Arc::new(builder.finish()) as ArrayRef
2940                }
2941                DataType::Timestamp(_, _) => {
2942                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
2943                    for val in &values {
2944                        match val {
2945                            Some(Value::Map(obj)) => {
2946                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2947                                    Some(n) => builder.append_value(n),
2948                                    None => builder.append_null(),
2949                                }
2950                            }
2951                            _ => builder.append_null(),
2952                        }
2953                    }
2954                    Arc::new(builder.finish()) as ArrayRef
2955                }
2956                DataType::Int32 => {
2957                    let mut builder = Int32Builder::with_capacity(vids.len());
2958                    for val in &values {
2959                        match val {
2960                            Some(Value::Map(obj)) => {
2961                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2962                                    Some(n) => builder.append_value(n as i32),
2963                                    None => builder.append_null(),
2964                                }
2965                            }
2966                            _ => builder.append_null(),
2967                        }
2968                    }
2969                    Arc::new(builder.finish()) as ArrayRef
2970                }
2971                DataType::Time64(_) => {
2972                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
2973                    for val in &values {
2974                        match val {
2975                            Some(Value::Map(obj)) => {
2976                                match obj.get(field_name).and_then(|v| v.as_i64()) {
2977                                    Some(n) => builder.append_value(n),
2978                                    None => builder.append_null(),
2979                                }
2980                            }
2981                            _ => builder.append_null(),
2982                        }
2983                    }
2984                    Arc::new(builder.finish()) as ArrayRef
2985                }
2986                // Fallback: serialize as string
2987                _ => {
2988                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2989                    for val in &values {
2990                        match val {
2991                            Some(Value::Map(obj)) => match obj.get(field_name) {
2992                                Some(Value::Null) | None => builder.append_null(),
2993                                Some(other) => builder.append_value(format!("{other:?}")),
2994                            },
2995                            _ => builder.append_null(),
2996                        }
2997                    }
2998                    Arc::new(builder.finish()) as ArrayRef
2999                }
3000            }
3001        })
3002        .collect();
3003
3004    // Build null bitmap — null when the value is null/missing
3005    let nulls: Vec<bool> = values
3006        .iter()
3007        .map(|v| matches!(v, Some(Value::Map(_))))
3008        .collect();
3009
3010    let struct_array = StructArray::try_new(
3011        fields.clone(),
3012        child_arrays,
3013        Some(arrow::buffer::NullBuffer::from(nulls)),
3014    )
3015    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3016
3017    Ok(Arc::new(struct_array))
3018}
3019
3020impl Stream for GraphScanStream {
3021    type Item = DFResult<RecordBatch>;
3022
3023    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3024        loop {
3025            // Use a temporary to avoid borrow issues
3026            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3027
3028            match state {
3029                GraphScanState::Init => {
3030                    // Create the future with cloned data for ownership
3031                    let graph_ctx = self.graph_ctx.clone();
3032                    let label = self.label.clone();
3033                    let variable = self.variable.clone();
3034                    let properties = self.properties.clone();
3035                    let is_edge_scan = self.is_edge_scan;
3036                    let is_schemaless = self.is_schemaless;
3037                    let schema = self.schema.clone();
3038
3039                    let fut = async move {
3040                        graph_ctx.check_timeout().map_err(|e| {
3041                            datafusion::error::DataFusionError::Execution(e.to_string())
3042                        })?;
3043
3044                        let batch = if is_edge_scan {
3045                            columnar_scan_edge_batch_static(
3046                                &graph_ctx,
3047                                &label,
3048                                &variable,
3049                                &properties,
3050                                &schema,
3051                            )
3052                            .await?
3053                        } else if is_schemaless {
3054                            columnar_scan_schemaless_vertex_batch_static(
3055                                &graph_ctx,
3056                                &label,
3057                                &variable,
3058                                &properties,
3059                                &schema,
3060                            )
3061                            .await?
3062                        } else {
3063                            columnar_scan_vertex_batch_static(
3064                                &graph_ctx,
3065                                &label,
3066                                &variable,
3067                                &properties,
3068                                &schema,
3069                            )
3070                            .await?
3071                        };
3072                        Ok(Some(batch))
3073                    };
3074
3075                    self.state = GraphScanState::Executing(Box::pin(fut));
3076                    // Continue loop to poll the future
3077                }
3078                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3079                    Poll::Ready(Ok(batch)) => {
3080                        self.state = GraphScanState::Done;
3081                        self.metrics
3082                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3083                        return Poll::Ready(batch.map(Ok));
3084                    }
3085                    Poll::Ready(Err(e)) => {
3086                        self.state = GraphScanState::Done;
3087                        return Poll::Ready(Some(Err(e)));
3088                    }
3089                    Poll::Pending => {
3090                        self.state = GraphScanState::Executing(fut);
3091                        return Poll::Pending;
3092                    }
3093                },
3094                GraphScanState::Done => {
3095                    return Poll::Ready(None);
3096                }
3097            }
3098        }
3099    }
3100}
3101
3102impl RecordBatchStream for GraphScanStream {
3103    fn schema(&self) -> SchemaRef {
3104        self.schema.clone()
3105    }
3106}
3107
3108#[cfg(test)]
3109mod tests {
3110    use super::*;
3111
3112    #[test]
3113    fn test_build_vertex_schema() {
3114        let uni_schema = UniSchema::default();
3115        let schema = GraphScanExec::build_vertex_schema(
3116            "n",
3117            "Person",
3118            &["name".to_string(), "age".to_string()],
3119            &uni_schema,
3120        );
3121
3122        assert_eq!(schema.fields().len(), 4);
3123        assert_eq!(schema.field(0).name(), "n._vid");
3124        assert_eq!(schema.field(1).name(), "n._labels");
3125        assert_eq!(schema.field(2).name(), "n.name");
3126        assert_eq!(schema.field(3).name(), "n.age");
3127    }
3128
3129    #[test]
3130    fn test_build_edge_schema() {
3131        let uni_schema = UniSchema::default();
3132        let schema =
3133            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3134
3135        assert_eq!(schema.fields().len(), 4);
3136        assert_eq!(schema.field(0).name(), "r._eid");
3137        assert_eq!(schema.field(1).name(), "r._src_vid");
3138        assert_eq!(schema.field(2).name(), "r._dst_vid");
3139        assert_eq!(schema.field(3).name(), "r.weight");
3140    }
3141
3142    #[test]
3143    fn test_build_schemaless_vertex_schema() {
3144        let empty_schema = uni_common::core::schema::Schema::default();
3145        let schema = GraphScanExec::build_schemaless_vertex_schema(
3146            "n",
3147            &["name".to_string(), "age".to_string()],
3148            &empty_schema,
3149        );
3150
3151        assert_eq!(schema.fields().len(), 4);
3152        assert_eq!(schema.field(0).name(), "n._vid");
3153        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3154        assert_eq!(schema.field(1).name(), "n._labels");
3155        assert_eq!(schema.field(2).name(), "n.name");
3156        // With empty schema, falls back to LargeBinary
3157        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3158        assert_eq!(schema.field(3).name(), "n.age");
3159        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3160    }
3161
3162    #[test]
3163    fn test_schemaless_all_scan_has_empty_label() {
3164        let empty_schema = uni_common::core::schema::Schema::default();
3165        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3166
3167        // Verify the schema has _vid and _labels columns for a scan with no properties
3168        assert_eq!(schema.fields().len(), 2);
3169        assert_eq!(schema.field(0).name(), "n._vid");
3170        assert_eq!(schema.field(1).name(), "n._labels");
3171    }
3172
3173    #[test]
3174    fn test_cypher_value_all_props_extraction() {
3175        // Simulate _all_props encoding using encode_cypher_value helper
3176        let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3177        let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3178
3179        // Decode and extract "age" value
3180        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3181        match decoded {
3182            uni_common::Value::Map(map) => {
3183                let age_val = map.get("age").unwrap();
3184                assert_eq!(age_val, &uni_common::Value::Int(30));
3185            }
3186            _ => panic!("Expected Map"),
3187        }
3188
3189        // Also test single value encoding
3190        let single_val = serde_json::json!(30);
3191        let single_bytes = encode_cypher_value(&single_val).unwrap();
3192        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3193        assert_eq!(single_decoded, uni_common::Value::Int(30));
3194    }
3195
3196    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3197    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3198        let schema = Arc::new(Schema::new(vec![
3199            Field::new("_vid", DataType::UInt64, false),
3200            Field::new("_deleted", DataType::Boolean, false),
3201            Field::new("_version", DataType::UInt64, false),
3202            Field::new("name", DataType::Utf8, true),
3203        ]));
3204        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3205        let names: Vec<String> = vids
3206            .iter()
3207            .zip(versions.iter())
3208            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3209            .collect();
3210        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3211
3212        RecordBatch::try_new(
3213            schema,
3214            vec![
3215                Arc::new(UInt64Array::from(vids.to_vec())),
3216                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3217                Arc::new(UInt64Array::from(versions.to_vec())),
3218                Arc::new(name_arr),
3219            ],
3220        )
3221        .unwrap()
3222    }
3223
3224    #[test]
3225    fn test_mvcc_dedup_multiple_versions() {
3226        // VID 1 at versions 3, 1, 5 — should keep version 5
3227        // VID 2 at versions 2, 4 — should keep version 4
3228        let batch = make_mvcc_batch(
3229            &[1, 1, 1, 2, 2],
3230            &[3, 1, 5, 2, 4],
3231            &[false, false, false, false, false],
3232        );
3233
3234        let result = mvcc_dedup_batch(&batch).unwrap();
3235        assert_eq!(result.num_rows(), 2);
3236
3237        let vid_col = result
3238            .column_by_name("_vid")
3239            .unwrap()
3240            .as_any()
3241            .downcast_ref::<UInt64Array>()
3242            .unwrap();
3243        let ver_col = result
3244            .column_by_name("_version")
3245            .unwrap()
3246            .as_any()
3247            .downcast_ref::<UInt64Array>()
3248            .unwrap();
3249        let name_col = result
3250            .column_by_name("name")
3251            .unwrap()
3252            .as_any()
3253            .downcast_ref::<arrow_array::StringArray>()
3254            .unwrap();
3255
3256        // VID 1 → version 5, VID 2 → version 4
3257        assert_eq!(vid_col.value(0), 1);
3258        assert_eq!(ver_col.value(0), 5);
3259        assert_eq!(name_col.value(0), "v1_ver5");
3260
3261        assert_eq!(vid_col.value(1), 2);
3262        assert_eq!(ver_col.value(1), 4);
3263        assert_eq!(name_col.value(1), "v2_ver4");
3264    }
3265
3266    #[test]
3267    fn test_mvcc_dedup_single_rows() {
3268        // Each VID appears once — nothing should change
3269        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3270        let result = mvcc_dedup_batch(&batch).unwrap();
3271        assert_eq!(result.num_rows(), 3);
3272    }
3273
3274    #[test]
3275    fn test_mvcc_dedup_empty() {
3276        let batch = make_mvcc_batch(&[], &[], &[]);
3277        let result = mvcc_dedup_batch(&batch).unwrap();
3278        assert_eq!(result.num_rows(), 0);
3279    }
3280
3281    #[test]
3282    fn test_filter_l0_tombstones_removes_tombstoned() {
3283        use crate::query::df_graph::L0Context;
3284
3285        // Create a batch with VIDs 1, 2, 3
3286        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3287
3288        // Create L0 context with VID 2 tombstoned
3289        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3290        {
3291            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
3292            // But we can't easily create one with tombstones through the constructor.
3293            // Use a direct approach.
3294        }
3295        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3296        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3297
3298        let l0_ctx = L0Context {
3299            current_l0: Some(l0_buf),
3300            transaction_l0: None,
3301            pending_flush_l0s: vec![],
3302        };
3303
3304        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3305        assert_eq!(result.num_rows(), 2);
3306
3307        let vid_col = result
3308            .column_by_name("_vid")
3309            .unwrap()
3310            .as_any()
3311            .downcast_ref::<UInt64Array>()
3312            .unwrap();
3313        assert_eq!(vid_col.value(0), 1);
3314        assert_eq!(vid_col.value(1), 3);
3315    }
3316
3317    #[test]
3318    fn test_filter_l0_tombstones_none() {
3319        use crate::query::df_graph::L0Context;
3320
3321        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3322        let l0_ctx = L0Context::default();
3323
3324        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3325        assert_eq!(result.num_rows(), 3);
3326    }
3327
3328    #[test]
3329    fn test_map_to_output_schema_basic() {
3330        use crate::query::df_graph::L0Context;
3331
3332        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
3333        let lance_schema = Arc::new(Schema::new(vec![
3334            Field::new("_vid", DataType::UInt64, false),
3335            Field::new("_deleted", DataType::Boolean, false),
3336            Field::new("_version", DataType::UInt64, false),
3337            Field::new("name", DataType::Utf8, true),
3338        ]));
3339        let name_arr: arrow_array::StringArray =
3340            vec![Some("Alice"), Some("Bob")].into_iter().collect();
3341        let batch = RecordBatch::try_new(
3342            lance_schema,
3343            vec![
3344                Arc::new(UInt64Array::from(vec![1u64, 2])),
3345                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3346                Arc::new(UInt64Array::from(vec![1u64, 1])),
3347                Arc::new(name_arr),
3348            ],
3349        )
3350        .unwrap();
3351
3352        // Output schema: n._vid, n._labels, n.name
3353        let output_schema = Arc::new(Schema::new(vec![
3354            Field::new("n._vid", DataType::UInt64, false),
3355            Field::new("n._labels", labels_data_type(), true),
3356            Field::new("n.name", DataType::Utf8, true),
3357        ]));
3358
3359        let l0_ctx = L0Context::default();
3360        let result = map_to_output_schema(
3361            &batch,
3362            "Person",
3363            "n",
3364            &["name".to_string()],
3365            &output_schema,
3366            &l0_ctx,
3367        )
3368        .unwrap();
3369
3370        assert_eq!(result.num_rows(), 2);
3371        assert_eq!(result.schema().fields().len(), 3);
3372        assert_eq!(result.schema().field(0).name(), "n._vid");
3373        assert_eq!(result.schema().field(1).name(), "n._labels");
3374        assert_eq!(result.schema().field(2).name(), "n.name");
3375
3376        // Check name values carried through
3377        let name_col = result
3378            .column(2)
3379            .as_any()
3380            .downcast_ref::<arrow_array::StringArray>()
3381            .unwrap();
3382        assert_eq!(name_col.value(0), "Alice");
3383        assert_eq!(name_col.value(1), "Bob");
3384    }
3385}