Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down.
89    filter: Option<Arc<dyn PhysicalExpr>>,
90
91    /// Whether this is an edge scan (vs vertex scan).
92    is_edge_scan: bool,
93
94    /// Whether this is a schemaless scan (uses main table instead of per-label table).
95    is_schemaless: bool,
96
97    /// Output schema with materialized property columns.
98    schema: SchemaRef,
99
100    /// Cached plan properties.
101    properties: PlanProperties,
102
103    /// Metrics for execution tracking.
104    metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("GraphScanExec")
110            .field("label", &self.label)
111            .field("variable", &self.variable)
112            .field("projected_properties", &self.projected_properties)
113            .field("is_edge_scan", &self.is_edge_scan)
114            .finish()
115    }
116}
117
118impl GraphScanExec {
119    /// Create a new graph scan for vertices.
120    ///
121    /// Scans all vertices of the given label from storage and L0 buffers,
122    /// then materializes the requested properties.
123    pub fn new_vertex_scan(
124        graph_ctx: Arc<GraphExecutionContext>,
125        label: impl Into<String>,
126        variable: impl Into<String>,
127        projected_properties: Vec<String>,
128        filter: Option<Arc<dyn PhysicalExpr>>,
129    ) -> Self {
130        let label = label.into();
131        let variable = variable.into();
132
133        // Build output schema with proper types from Uni schema
134        let uni_schema = graph_ctx.storage().schema_manager().schema();
135        let schema =
136            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138        let properties = compute_plan_properties(schema.clone());
139
140        Self {
141            graph_ctx,
142            label,
143            variable,
144            projected_properties,
145            filter,
146            is_edge_scan: false,
147            is_schemaless: false,
148            schema,
149            properties,
150            metrics: ExecutionPlanMetricsSet::new(),
151        }
152    }
153
154    /// Create a new schemaless vertex scan.
155    ///
156    /// Scans the main vertices table for vertices with the given label name.
157    /// Properties are extracted from props_json (all treated as Utf8/JSON).
158    /// This is used for labels that aren't in the schema.
159    pub fn new_schemaless_vertex_scan(
160        graph_ctx: Arc<GraphExecutionContext>,
161        label_name: impl Into<String>,
162        variable: impl Into<String>,
163        projected_properties: Vec<String>,
164        filter: Option<Arc<dyn PhysicalExpr>>,
165    ) -> Self {
166        let label = label_name.into();
167        let variable = variable.into();
168
169        // Filter out system columns that are already materialized as dedicated columns
170        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
171        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
172        // they would create duplicate columns with conflicting types.
173        let projected_properties: Vec<String> = projected_properties
174            .into_iter()
175            .filter(|p| p != "_vid" && p != "_labels")
176            .collect();
177
178        // Build schema - all properties are Utf8 (from JSON) except _vid
179        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
180        let properties = compute_plan_properties(schema.clone());
181
182        Self {
183            graph_ctx,
184            label,
185            variable,
186            projected_properties,
187            filter,
188            is_edge_scan: false,
189            is_schemaless: true,
190            schema,
191            properties,
192            metrics: ExecutionPlanMetricsSet::new(),
193        }
194    }
195
196    /// Create a new multi-label vertex scan using the main vertices table.
197    ///
198    /// Scans for vertices that have ALL specified labels (intersection semantics).
199    /// Properties are extracted from props_json (schemaless).
200    pub fn new_multi_label_vertex_scan(
201        graph_ctx: Arc<GraphExecutionContext>,
202        labels: Vec<String>,
203        variable: impl Into<String>,
204        projected_properties: Vec<String>,
205        filter: Option<Arc<dyn PhysicalExpr>>,
206    ) -> Self {
207        let variable = variable.into();
208        let projected_properties: Vec<String> = projected_properties
209            .into_iter()
210            .filter(|p| p != "_vid" && p != "_labels")
211            .collect();
212        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
213        let properties = compute_plan_properties(schema.clone());
214
215        // Encode labels as colon-separated for the stream to parse
216        let encoded_labels = labels.join(":");
217
218        Self {
219            graph_ctx,
220            label: encoded_labels,
221            variable,
222            projected_properties,
223            filter,
224            is_edge_scan: false,
225            is_schemaless: true,
226            schema,
227            properties,
228            metrics: ExecutionPlanMetricsSet::new(),
229        }
230    }
231
232    /// Create a new schemaless scan for all vertices.
233    ///
234    /// Scans the main vertices table for all vertices regardless of label.
235    /// Properties are extracted from props_json (all treated as Utf8/JSON).
236    /// This is used for `MATCH (n)` without label filter.
237    pub fn new_schemaless_all_scan(
238        graph_ctx: Arc<GraphExecutionContext>,
239        variable: impl Into<String>,
240        projected_properties: Vec<String>,
241        filter: Option<Arc<dyn PhysicalExpr>>,
242    ) -> Self {
243        let variable = variable.into();
244        let projected_properties: Vec<String> = projected_properties
245            .into_iter()
246            .filter(|p| p != "_vid" && p != "_labels")
247            .collect();
248
249        // Build schema - all properties are Utf8 (from JSON) except _vid
250        let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
251        let properties = compute_plan_properties(schema.clone());
252
253        Self {
254            graph_ctx,
255            label: String::new(), // Empty label signals "scan all vertices"
256            variable,
257            projected_properties,
258            filter,
259            is_edge_scan: false,
260            is_schemaless: true,
261            schema,
262            properties,
263            metrics: ExecutionPlanMetricsSet::new(),
264        }
265    }
266
267    /// Build schema for schemaless vertex scan (all properties as LargeBinary/CypherValue).
268    fn build_schemaless_vertex_schema(variable: &str, properties: &[String]) -> SchemaRef {
269        let mut fields = vec![
270            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
271            Field::new(format!("{}._labels", variable), labels_data_type(), true),
272        ];
273
274        for prop in properties {
275            let col_name = format!("{}.{}", variable, prop);
276            // Schemaless properties use LargeBinary with CypherValue encoding to preserve types
277            fields.push(Field::new(&col_name, DataType::LargeBinary, true));
278        }
279
280        Arc::new(Schema::new(fields))
281    }
282
283    /// Create a new graph scan for edges.
284    ///
285    /// Scans all edges of the given type from storage and L0 buffers,
286    /// then materializes the requested properties.
287    pub fn new_edge_scan(
288        graph_ctx: Arc<GraphExecutionContext>,
289        edge_type: impl Into<String>,
290        variable: impl Into<String>,
291        projected_properties: Vec<String>,
292        filter: Option<Arc<dyn PhysicalExpr>>,
293    ) -> Self {
294        let label = edge_type.into();
295        let variable = variable.into();
296
297        // Build output schema with proper types from Uni schema
298        let uni_schema = graph_ctx.storage().schema_manager().schema();
299        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
300
301        let properties = compute_plan_properties(schema.clone());
302
303        Self {
304            graph_ctx,
305            label,
306            variable,
307            projected_properties,
308            filter,
309            is_edge_scan: true,
310            is_schemaless: false,
311            schema,
312            properties,
313            metrics: ExecutionPlanMetricsSet::new(),
314        }
315    }
316
317    /// Build output schema for vertex scan with proper Arrow types.
318    fn build_vertex_schema(
319        variable: &str,
320        label: &str,
321        properties: &[String],
322        uni_schema: &UniSchema,
323    ) -> SchemaRef {
324        let mut fields = vec![
325            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
326            Field::new(format!("{}._labels", variable), labels_data_type(), true),
327        ];
328        let label_props = uni_schema.properties.get(label);
329        for prop in properties {
330            let col_name = format!("{}.{}", variable, prop);
331            let arrow_type = resolve_property_type(prop, label_props);
332            fields.push(Field::new(&col_name, arrow_type, true));
333        }
334        Arc::new(Schema::new(fields))
335    }
336
337    /// Build output schema for edge scan with proper Arrow types.
338    fn build_edge_schema(
339        variable: &str,
340        edge_type: &str,
341        properties: &[String],
342        uni_schema: &UniSchema,
343    ) -> SchemaRef {
344        let mut fields = vec![
345            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
346            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
347            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
348        ];
349        let edge_props = uni_schema.properties.get(edge_type);
350        for prop in properties {
351            let col_name = format!("{}.{}", variable, prop);
352            let arrow_type = resolve_property_type(prop, edge_props);
353            fields.push(Field::new(&col_name, arrow_type, true));
354        }
355        Arc::new(Schema::new(fields))
356    }
357}
358
359impl DisplayAs for GraphScanExec {
360    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
362        write!(
363            f,
364            "GraphScanExec: {}={}, properties={:?}",
365            scan_type, self.label, self.projected_properties
366        )?;
367        if self.filter.is_some() {
368            write!(f, ", filter=<pushed>")?;
369        }
370        Ok(())
371    }
372}
373
374impl ExecutionPlan for GraphScanExec {
375    fn name(&self) -> &str {
376        "GraphScanExec"
377    }
378
379    fn as_any(&self) -> &dyn Any {
380        self
381    }
382
383    fn schema(&self) -> SchemaRef {
384        self.schema.clone()
385    }
386
387    fn properties(&self) -> &PlanProperties {
388        &self.properties
389    }
390
391    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
392        vec![]
393    }
394
395    fn with_new_children(
396        self: Arc<Self>,
397        children: Vec<Arc<dyn ExecutionPlan>>,
398    ) -> DFResult<Arc<dyn ExecutionPlan>> {
399        if children.is_empty() {
400            Ok(self)
401        } else {
402            Err(datafusion::error::DataFusionError::Plan(
403                "GraphScanExec does not accept children".to_string(),
404            ))
405        }
406    }
407
408    fn execute(
409        &self,
410        partition: usize,
411        _context: Arc<TaskContext>,
412    ) -> DFResult<SendableRecordBatchStream> {
413        let metrics = BaselineMetrics::new(&self.metrics, partition);
414
415        Ok(Box::pin(GraphScanStream::new(
416            self.graph_ctx.clone(),
417            self.label.clone(),
418            self.variable.clone(),
419            self.projected_properties.clone(),
420            self.is_edge_scan,
421            self.is_schemaless,
422            self.schema.clone(),
423            metrics,
424        )))
425    }
426
427    fn metrics(&self) -> Option<MetricsSet> {
428        Some(self.metrics.clone_inner())
429    }
430}
431
432/// State machine for graph scan stream execution.
433enum GraphScanState {
434    /// Initial state, ready to start scanning.
435    Init,
436    /// Executing the async scan.
437    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
438    /// Stream is done.
439    Done,
440}
441
442/// Stream that scans vertices or edges and materializes properties.
443///
444/// For known-label vertex scans, uses a single columnar Lance query with
445/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
446/// to the two-phase VID-scan + property-materialize flow.
447struct GraphScanStream {
448    /// Graph execution context.
449    graph_ctx: Arc<GraphExecutionContext>,
450
451    /// Label (vertex) or edge type name.
452    label: String,
453
454    /// Variable name for column prefixing (e.g., "n" in `n.name`).
455    variable: String,
456
457    /// Properties to materialize.
458    properties: Vec<String>,
459
460    /// Whether this is an edge scan.
461    is_edge_scan: bool,
462
463    /// Whether this is a schemaless scan.
464    is_schemaless: bool,
465
466    /// Output schema.
467    schema: SchemaRef,
468
469    /// Stream state.
470    state: GraphScanState,
471
472    /// Metrics.
473    metrics: BaselineMetrics,
474}
475
476impl GraphScanStream {
477    /// Create a new graph scan stream.
478    #[expect(clippy::too_many_arguments)]
479    fn new(
480        graph_ctx: Arc<GraphExecutionContext>,
481        label: String,
482        variable: String,
483        properties: Vec<String>,
484        is_edge_scan: bool,
485        is_schemaless: bool,
486        schema: SchemaRef,
487        metrics: BaselineMetrics,
488    ) -> Self {
489        Self {
490            graph_ctx,
491            label,
492            variable,
493            properties,
494            is_edge_scan,
495            is_schemaless,
496            schema,
497            state: GraphScanState::Init,
498            metrics,
499        }
500    }
501}
502
503/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
504///
505/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
506/// preserving original value types for overflow/unknown properties.
507pub(crate) fn resolve_property_type(
508    prop: &str,
509    schema_props: Option<
510        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
511    >,
512) -> DataType {
513    if prop == "overflow_json" {
514        DataType::LargeBinary
515    } else {
516        schema_props
517            .and_then(|props| props.get(prop))
518            .map(|meta| meta.r#type.to_arrow())
519            .unwrap_or(DataType::LargeBinary)
520    }
521}
522
523// ============================================================================
524// Columnar-first scan helpers
525// ============================================================================
526
527/// MVCC deduplication: keep only the highest-version row for each `_vid`.
528///
529/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
530/// _vid (= the highest version). This is a pure Arrow-compute operation.
531#[cfg(test)]
532fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
533    mvcc_dedup_batch_by(batch, "_vid")
534}
535
536/// Dedup a Lance batch and return `Some` only when rows remain.
537///
538/// Wraps the common pattern of dedup + empty-check that appears in every
539/// columnar scan path (vertex, edge, schemaless).
540fn mvcc_dedup_to_option(
541    batch: Option<RecordBatch>,
542    id_column: &str,
543) -> DFResult<Option<RecordBatch>> {
544    match batch {
545        Some(b) => {
546            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
547            Ok(if deduped.num_rows() > 0 {
548                Some(deduped)
549            } else {
550                None
551            })
552        }
553        None => Ok(None),
554    }
555}
556
557/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
558/// result. Returns an empty batch (against `output_schema`) when both inputs
559/// are empty.
560fn merge_lance_and_l0(
561    lance_deduped: Option<RecordBatch>,
562    l0_batch: RecordBatch,
563    internal_schema: &SchemaRef,
564    id_column: &str,
565) -> DFResult<Option<RecordBatch>> {
566    let has_l0 = l0_batch.num_rows() > 0;
567    match (lance_deduped, has_l0) {
568        (Some(lance), true) => {
569            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
570                .map_err(arrow_err)?;
571            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
572        }
573        (Some(lance), false) => Ok(Some(lance)),
574        (None, true) => Ok(Some(l0_batch)),
575        (None, false) => Ok(None),
576    }
577}
578
579/// Push `col_name` into `columns` if not already present.
580///
581/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
582/// that creates a temporary `String` allocation on every check.
583fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
584    if !columns.iter().any(|c| c == col_name) {
585        columns.push(col_name.to_string());
586    }
587}
588
589/// Extract a property value from an overflow_json CypherValue blob.
590///
591/// Returns the raw CypherValue bytes for `prop` if found in the blob,
592/// or `None` if the blob is null or the key is absent.
593fn extract_from_overflow_blob(
594    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
595    row: usize,
596    prop: &str,
597) -> Option<Vec<u8>> {
598    let arr = overflow_arr?;
599    if arr.is_null(row) {
600        return None;
601    }
602    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
603}
604
605/// Build a `LargeBinary` column by extracting a property from overflow_json
606/// blobs, with L0 buffer overlay.
607///
608/// For each row, checks L0 buffers first (later buffers take precedence).
609/// If the property is not in L0, falls back to extracting from the
610/// overflow_json CypherValue blob.
611fn build_overflow_property_column(
612    num_rows: usize,
613    vid_arr: &UInt64Array,
614    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
615    prop: &str,
616    l0_ctx: &crate::query::df_graph::L0Context,
617) -> ArrayRef {
618    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
619    for i in 0..num_rows {
620        let vid = Vid::from(vid_arr.value(i));
621
622        // Check L0 buffers (later overwrites earlier)
623        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
624
625        if let Some(val_opt) = l0_val {
626            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
627        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
628            builder.append_value(&bytes);
629        } else {
630            builder.append_null();
631        }
632    }
633    Arc::new(builder.finish())
634}
635
636/// Resolve a property value from the L0 visibility chain.
637///
638/// Returns `Some(Some(val))` when the property exists with a non-null value,
639/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
640/// has the property.
641fn resolve_l0_property(
642    vid: &Vid,
643    prop: &str,
644    l0_ctx: &crate::query::df_graph::L0Context,
645) -> Option<Option<Value>> {
646    let mut result = None;
647    for l0 in l0_ctx.iter_l0_buffers() {
648        let guard = l0.read();
649        if let Some(props) = guard.vertex_properties.get(vid)
650            && let Some(val) = props.get(prop)
651        {
652            result = Some(Some(val.clone()));
653        }
654    }
655    result
656}
657
658/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
659///
660/// Non-null values are JSON-encoded then CypherValue-encoded.
661/// Null values (or encoding failures) produce null entries.
662fn append_value_as_cypher_binary(
663    builder: &mut arrow_array::builder::LargeBinaryBuilder,
664    val: Option<&Value>,
665) {
666    match val {
667        Some(v) if !v.is_null() => {
668            let json_val: serde_json::Value = v.clone().into();
669            match encode_cypher_value(&json_val) {
670                Ok(bytes) => builder.append_value(bytes),
671                Err(_) => builder.append_null(),
672            }
673        }
674        _ => builder.append_null(),
675    }
676}
677
678/// Build the `_all_props` column by overlaying L0 buffer properties onto
679/// the batch's `props_json` column.
680///
681/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
682/// properties (in visibility order: pending → current → transaction), and
683/// re-encodes the result. This ensures `properties()` and `keys()` reflect
684/// uncommitted L0 mutations.
685fn build_all_props_column_with_l0_overlay(
686    num_rows: usize,
687    vid_arr: &UInt64Array,
688    props_arr: Option<&arrow_array::LargeBinaryArray>,
689    l0_ctx: &crate::query::df_graph::L0Context,
690) -> ArrayRef {
691    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
692    for i in 0..num_rows {
693        let vid = Vid::from(vid_arr.value(i));
694
695        // 1. Decode props_json blob from storage
696        let mut merged_props = serde_json::Map::new();
697        if let Some(arr) = props_arr
698            && !arr.is_null(i)
699            && let Ok(uni_common::Value::Map(map)) =
700                uni_common::cypher_value_codec::decode(arr.value(i))
701        {
702            for (k, v) in map {
703                let json_val: serde_json::Value = v.into();
704                merged_props.insert(k, json_val);
705            }
706        }
707
708        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
709        for l0 in l0_ctx.iter_l0_buffers() {
710            let guard = l0.read();
711            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
712                for (k, v) in l0_props {
713                    let json_val: serde_json::Value = v.clone().into();
714                    merged_props.insert(k.clone(), json_val);
715                }
716            }
717        }
718
719        // 3. Encode merged result
720        if merged_props.is_empty() {
721            builder.append_null();
722        } else {
723            let json_obj = serde_json::Value::Object(merged_props);
724            match encode_cypher_value(&json_obj) {
725                Ok(bytes) => builder.append_value(bytes),
726                Err(_) => builder.append_null(),
727            }
728        }
729    }
730    Arc::new(builder.finish())
731}
732
733/// Build `_all_props` for a schema-based scan by merging:
734/// 1. Schema-defined columns from the batch
735/// 2. Overflow_json properties
736/// 3. L0 buffer properties
737fn build_all_props_column_for_schema_scan(
738    batch: &RecordBatch,
739    vid_arr: &UInt64Array,
740    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
741    projected_properties: &[String],
742    l0_ctx: &crate::query::df_graph::L0Context,
743) -> ArrayRef {
744    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
745    let schema_props: Vec<&str> = projected_properties
746        .iter()
747        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
748        .map(String::as_str)
749        .collect();
750
751    let num_rows = batch.num_rows();
752    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
753    for i in 0..num_rows {
754        let vid = Vid::from(vid_arr.value(i));
755        let mut merged_props = serde_json::Map::new();
756
757        // 1. Schema-defined columns
758        for &prop in &schema_props {
759            if let Some(col) = batch.column_by_name(prop) {
760                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
761                if !val.is_null() {
762                    let json_val: serde_json::Value = val.into();
763                    merged_props.insert(prop.to_string(), json_val);
764                }
765            }
766        }
767
768        // 2. Overflow_json properties
769        if let Some(arr) = overflow_arr
770            && !arr.is_null(i)
771            && let Ok(uni_common::Value::Map(map)) =
772                uni_common::cypher_value_codec::decode(arr.value(i))
773        {
774            for (k, v) in map {
775                let json_val: serde_json::Value = v.into();
776                merged_props.insert(k, json_val);
777            }
778        }
779
780        // 3. L0 buffer overlay (pending → current → transaction)
781        for l0 in l0_ctx.iter_l0_buffers() {
782            let guard = l0.read();
783            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
784                for (k, v) in l0_props {
785                    let json_val: serde_json::Value = v.clone().into();
786                    merged_props.insert(k.clone(), json_val);
787                }
788            }
789        }
790
791        if merged_props.is_empty() {
792            builder.append_null();
793        } else {
794            let json_obj = serde_json::Value::Object(merged_props);
795            match encode_cypher_value(&json_obj) {
796                Ok(bytes) => builder.append_value(bytes),
797                Err(_) => builder.append_null(),
798            }
799        }
800    }
801    Arc::new(builder.finish())
802}
803
804/// MVCC deduplication: keep only the highest-version row for each unique value
805/// in the given `id_column`.
806///
807/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
808/// each id (= the highest version). This is a pure Arrow-compute operation.
809fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
810    if batch.num_rows() == 0 {
811        return Ok(batch.clone());
812    }
813
814    let id_col = batch
815        .column_by_name(id_column)
816        .ok_or_else(|| {
817            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
818        })?
819        .clone();
820    let version_col = batch
821        .column_by_name("_version")
822        .ok_or_else(|| {
823            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
824        })?
825        .clone();
826
827    // Sort by (id_column ASC, _version DESC)
828    let sort_columns = vec![
829        arrow::compute::SortColumn {
830            values: id_col,
831            options: Some(arrow::compute::SortOptions {
832                descending: false,
833                nulls_first: false,
834            }),
835        },
836        arrow::compute::SortColumn {
837            values: version_col,
838            options: Some(arrow::compute::SortOptions {
839                descending: true,
840                nulls_first: false,
841            }),
842        },
843    ];
844    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
845
846    // Reorder all columns by sorted indices
847    let sorted_columns: Vec<ArrayRef> = batch
848        .columns()
849        .iter()
850        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
851        .collect::<Result<_, _>>()
852        .map_err(arrow_err)?;
853    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
854
855    // Build dedup mask: keep first occurrence of each id
856    let sorted_id = sorted
857        .column_by_name(id_column)
858        .unwrap()
859        .as_any()
860        .downcast_ref::<UInt64Array>()
861        .unwrap();
862
863    let mut keep = vec![false; sorted.num_rows()];
864    if !keep.is_empty() {
865        keep[0] = true;
866        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
867            if sorted_id.value(i) != sorted_id.value(i - 1) {
868                *flag = true;
869            }
870        }
871    }
872
873    let mask = arrow_array::BooleanArray::from(keep);
874    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
875}
876
877/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
878fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
879    if batch.num_rows() == 0 {
880        return Ok(batch.clone());
881    }
882    let op_col = match batch.column_by_name("op") {
883        Some(col) => col
884            .as_any()
885            .downcast_ref::<arrow_array::UInt8Array>()
886            .unwrap(),
887        None => return Ok(batch.clone()),
888    };
889    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
890    let mask = arrow_array::BooleanArray::from(keep);
891    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
892}
893
894/// Filter out rows where `_deleted = true` after MVCC dedup.
895fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
896    if batch.num_rows() == 0 {
897        return Ok(batch.clone());
898    }
899    let deleted_col = match batch.column_by_name("_deleted") {
900        Some(col) => col
901            .as_any()
902            .downcast_ref::<arrow_array::BooleanArray>()
903            .unwrap(),
904        None => return Ok(batch.clone()),
905    };
906    let keep: Vec<bool> = (0..deleted_col.len())
907        .map(|i| !deleted_col.value(i))
908        .collect();
909    let mask = arrow_array::BooleanArray::from(keep);
910    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
911}
912
913/// Filter out rows whose `_vid` appears in L0 tombstones.
914fn filter_l0_tombstones(
915    batch: &RecordBatch,
916    l0_ctx: &crate::query::df_graph::L0Context,
917) -> DFResult<RecordBatch> {
918    if batch.num_rows() == 0 {
919        return Ok(batch.clone());
920    }
921
922    let mut tombstones: HashSet<u64> = HashSet::new();
923    for l0 in l0_ctx.iter_l0_buffers() {
924        let guard = l0.read();
925        for vid in guard.vertex_tombstones.iter() {
926            tombstones.insert(vid.as_u64());
927        }
928    }
929
930    if tombstones.is_empty() {
931        return Ok(batch.clone());
932    }
933
934    let vid_col = batch
935        .column_by_name("_vid")
936        .ok_or_else(|| {
937            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
938        })?
939        .as_any()
940        .downcast_ref::<UInt64Array>()
941        .unwrap();
942
943    let keep: Vec<bool> = (0..vid_col.len())
944        .map(|i| !tombstones.contains(&vid_col.value(i)))
945        .collect();
946    let mask = arrow_array::BooleanArray::from(keep);
947    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
948}
949
950/// Filter out rows whose `eid` appears in L0 edge tombstones.
951fn filter_l0_edge_tombstones(
952    batch: &RecordBatch,
953    l0_ctx: &crate::query::df_graph::L0Context,
954) -> DFResult<RecordBatch> {
955    if batch.num_rows() == 0 {
956        return Ok(batch.clone());
957    }
958
959    let mut tombstones: HashSet<u64> = HashSet::new();
960    for l0 in l0_ctx.iter_l0_buffers() {
961        let guard = l0.read();
962        for eid in guard.tombstones.keys() {
963            tombstones.insert(eid.as_u64());
964        }
965    }
966
967    if tombstones.is_empty() {
968        return Ok(batch.clone());
969    }
970
971    let eid_col = batch
972        .column_by_name("eid")
973        .ok_or_else(|| {
974            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
975        })?
976        .as_any()
977        .downcast_ref::<UInt64Array>()
978        .unwrap();
979
980    let keep: Vec<bool> = (0..eid_col.len())
981        .map(|i| !tombstones.contains(&eid_col.value(i)))
982        .collect();
983    let mask = arrow_array::BooleanArray::from(keep);
984    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
985}
986
987/// Build a RecordBatch from L0 buffer data for a given label, matching the
988/// Lance query's column set.
989///
990/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
991/// with later buffers overwriting earlier ones for the same VID.
992fn build_l0_vertex_batch(
993    l0_ctx: &crate::query::df_graph::L0Context,
994    label: &str,
995    lance_schema: &SchemaRef,
996    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
997) -> DFResult<RecordBatch> {
998    // Collect all L0 vertex data, merging in visibility order
999    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1000    let mut tombstones: HashSet<u64> = HashSet::new();
1001
1002    for l0 in l0_ctx.iter_l0_buffers() {
1003        let guard = l0.read();
1004        // Collect tombstones
1005        for vid in guard.vertex_tombstones.iter() {
1006            tombstones.insert(vid.as_u64());
1007        }
1008        // Collect vertices for this label
1009        for vid in guard.vids_for_label(label) {
1010            let vid_u64 = vid.as_u64();
1011            if tombstones.contains(&vid_u64) {
1012                continue;
1013            }
1014            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1015            let entry = vid_data
1016                .entry(vid_u64)
1017                .or_insert_with(|| (Properties::new(), 0));
1018            // Merge properties (later L0 overwrites)
1019            if let Some(props) = guard.vertex_properties.get(&vid) {
1020                for (k, v) in props {
1021                    entry.0.insert(k.clone(), v.clone());
1022                }
1023            }
1024            // Take the highest version
1025            if version > entry.1 {
1026                entry.1 = version;
1027            }
1028        }
1029    }
1030
1031    // Remove tombstoned VIDs
1032    for t in &tombstones {
1033        vid_data.remove(t);
1034    }
1035
1036    if vid_data.is_empty() {
1037        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1038    }
1039
1040    // Sort VIDs for deterministic output
1041    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1042    vids.sort_unstable();
1043
1044    let num_rows = vids.len();
1045    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1046
1047    // Determine which schema property names exist
1048    let schema_prop_names: HashSet<&str> = label_props
1049        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1050        .unwrap_or_default();
1051
1052    for field in lance_schema.fields() {
1053        let col_name = field.name().as_str();
1054        match col_name {
1055            "_vid" => {
1056                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1057            }
1058            "_deleted" => {
1059                // L0 vertices are always live (tombstoned ones are already excluded)
1060                let vals = vec![false; num_rows];
1061                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1062            }
1063            "_version" => {
1064                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1065                columns.push(Arc::new(UInt64Array::from(vals)));
1066            }
1067            "overflow_json" => {
1068                // Collect non-schema properties as CypherValue
1069                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1070                for vid_u64 in &vids {
1071                    let (props, _) = &vid_data[vid_u64];
1072                    let mut overflow = serde_json::Map::new();
1073                    for (k, v) in props {
1074                        if k == "ext_id" || k.starts_with('_') {
1075                            continue;
1076                        }
1077                        if !schema_prop_names.contains(k.as_str()) {
1078                            let json_val: serde_json::Value = v.clone().into();
1079                            overflow.insert(k.clone(), json_val);
1080                        }
1081                    }
1082                    if overflow.is_empty() {
1083                        builder.append_null();
1084                    } else {
1085                        let json_val = serde_json::Value::Object(overflow);
1086                        match encode_cypher_value(&json_val) {
1087                            Ok(bytes) => builder.append_value(bytes),
1088                            Err(_) => builder.append_null(),
1089                        }
1090                    }
1091                }
1092                columns.push(Arc::new(builder.finish()));
1093            }
1094            _ => {
1095                // Schema property column: convert L0 Value → Arrow typed value
1096                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1097                columns.push(col);
1098            }
1099        }
1100    }
1101
1102    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1103}
1104
1105/// Build a single Arrow column from L0 property values.
1106///
1107/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1108fn build_l0_property_column(
1109    vids: &[u64],
1110    vid_data: &HashMap<u64, (Properties, u64)>,
1111    prop_name: &str,
1112    data_type: &DataType,
1113) -> DFResult<ArrayRef> {
1114    // Convert to Vid keys for reuse of existing build_property_column_static
1115    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1116    let props_map: HashMap<Vid, Properties> = vid_data
1117        .iter()
1118        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1119        .collect();
1120
1121    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1122}
1123
1124/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1125/// the DeltaDataset Lance table's column set.
1126///
1127/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1128/// with later buffers overwriting earlier ones for the same EID.
1129fn build_l0_edge_batch(
1130    l0_ctx: &crate::query::df_graph::L0Context,
1131    edge_type: &str,
1132    internal_schema: &SchemaRef,
1133    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1134) -> DFResult<RecordBatch> {
1135    // Collect all L0 edge data, merging in visibility order
1136    // eid -> (src_vid, dst_vid, properties, version)
1137    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1138    let mut tombstones: HashSet<u64> = HashSet::new();
1139
1140    for l0 in l0_ctx.iter_l0_buffers() {
1141        let guard = l0.read();
1142        // Collect tombstones
1143        for eid in guard.tombstones.keys() {
1144            tombstones.insert(eid.as_u64());
1145        }
1146        // Collect edges for this type
1147        for eid in guard.eids_for_type(edge_type) {
1148            let eid_u64 = eid.as_u64();
1149            if tombstones.contains(&eid_u64) {
1150                continue;
1151            }
1152            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1153                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1154                None => continue,
1155            };
1156            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1157            let entry = eid_data
1158                .entry(eid_u64)
1159                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1160            // Merge properties (later L0 overwrites)
1161            if let Some(props) = guard.edge_properties.get(&eid) {
1162                for (k, v) in props {
1163                    entry.2.insert(k.clone(), v.clone());
1164                }
1165            }
1166            // Update endpoints from latest L0 layer
1167            entry.0 = src_vid;
1168            entry.1 = dst_vid;
1169            // Take the highest version
1170            if version > entry.3 {
1171                entry.3 = version;
1172            }
1173        }
1174    }
1175
1176    // Remove tombstoned EIDs
1177    for t in &tombstones {
1178        eid_data.remove(t);
1179    }
1180
1181    if eid_data.is_empty() {
1182        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1183    }
1184
1185    // Sort EIDs for deterministic output
1186    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1187    eids.sort_unstable();
1188
1189    let num_rows = eids.len();
1190    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1191
1192    // Determine which schema property names exist
1193    let schema_prop_names: HashSet<&str> = type_props
1194        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1195        .unwrap_or_default();
1196
1197    for field in internal_schema.fields() {
1198        let col_name = field.name().as_str();
1199        match col_name {
1200            "eid" => {
1201                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1202            }
1203            "src_vid" => {
1204                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1205                columns.push(Arc::new(UInt64Array::from(vals)));
1206            }
1207            "dst_vid" => {
1208                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1209                columns.push(Arc::new(UInt64Array::from(vals)));
1210            }
1211            "op" => {
1212                // L0 edges are always live (tombstoned ones already excluded)
1213                let vals = vec![0u8; num_rows];
1214                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1215            }
1216            "_version" => {
1217                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1218                columns.push(Arc::new(UInt64Array::from(vals)));
1219            }
1220            "overflow_json" => {
1221                // Collect non-schema properties as CypherValue
1222                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1223                for eid_u64 in &eids {
1224                    let (_, _, props, _) = &eid_data[eid_u64];
1225                    let mut overflow = serde_json::Map::new();
1226                    for (k, v) in props {
1227                        if k.starts_with('_') {
1228                            continue;
1229                        }
1230                        if !schema_prop_names.contains(k.as_str()) {
1231                            let json_val: serde_json::Value = v.clone().into();
1232                            overflow.insert(k.clone(), json_val);
1233                        }
1234                    }
1235                    if overflow.is_empty() {
1236                        builder.append_null();
1237                    } else {
1238                        let json_val = serde_json::Value::Object(overflow);
1239                        match encode_cypher_value(&json_val) {
1240                            Ok(bytes) => builder.append_value(bytes),
1241                            Err(_) => builder.append_null(),
1242                        }
1243                    }
1244                }
1245                columns.push(Arc::new(builder.finish()));
1246            }
1247            _ => {
1248                // Schema property column: convert L0 Value → Arrow typed value
1249                let col =
1250                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1251                columns.push(col);
1252            }
1253        }
1254    }
1255
1256    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1257}
1258
1259/// Build a single Arrow column from L0 edge property values.
1260///
1261/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1262fn build_l0_edge_property_column(
1263    eids: &[u64],
1264    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1265    prop_name: &str,
1266    data_type: &DataType,
1267) -> DFResult<ArrayRef> {
1268    // Convert to Vid keys for reuse of existing build_property_column_static
1269    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1270    let props_map: HashMap<Vid, Properties> = eid_data
1271        .iter()
1272        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1273        .collect();
1274
1275    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1276}
1277
1278/// Build the `_labels` column for known-label vertices.
1279///
1280/// Reads `_labels` from the stored Lance batch if available. Falls back to
1281/// `[label]` when the column is absent (legacy data). Additional labels from
1282/// L0 buffers are merged in.
1283fn build_labels_column_for_known_label(
1284    vid_arr: &UInt64Array,
1285    label: &str,
1286    l0_ctx: &crate::query::df_graph::L0Context,
1287    batch_labels_col: Option<&arrow_array::ListArray>,
1288) -> DFResult<ArrayRef> {
1289    use uni_store::storage::arrow_convert::labels_from_list_array;
1290
1291    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1292
1293    for i in 0..vid_arr.len() {
1294        let vid = Vid::from(vid_arr.value(i));
1295
1296        // Start with labels from the stored column, falling back to [label]
1297        let mut labels = match batch_labels_col {
1298            Some(list_arr) => {
1299                let stored = labels_from_list_array(list_arr, i);
1300                if stored.is_empty() {
1301                    vec![label.to_string()]
1302                } else {
1303                    stored
1304                }
1305            }
1306            None => vec![label.to_string()],
1307        };
1308
1309        // Ensure the scanned label is present (defensive)
1310        if !labels.iter().any(|l| l == label) {
1311            labels.push(label.to_string());
1312        }
1313
1314        // Merge additional labels from L0 buffers
1315        for l0 in l0_ctx.iter_l0_buffers() {
1316            let guard = l0.read();
1317            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1318                for lbl in l0_labels {
1319                    if !labels.contains(lbl) {
1320                        labels.push(lbl.clone());
1321                    }
1322                }
1323            }
1324        }
1325
1326        let values = labels_builder.values();
1327        for lbl in &labels {
1328            values.append_value(lbl);
1329        }
1330        labels_builder.append(true);
1331    }
1332
1333    Ok(Arc::new(labels_builder.finish()))
1334}
1335
1336/// Map a Lance-schema batch to the DataFusion output schema.
1337///
1338/// The output schema has `{variable}.{property}` column names, while Lance
1339/// uses bare property names. This function performs the positional mapping,
1340/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1341fn map_to_output_schema(
1342    batch: &RecordBatch,
1343    label: &str,
1344    _variable: &str,
1345    projected_properties: &[String],
1346    output_schema: &SchemaRef,
1347    l0_ctx: &crate::query::df_graph::L0Context,
1348) -> DFResult<RecordBatch> {
1349    if batch.num_rows() == 0 {
1350        return Ok(RecordBatch::new_empty(output_schema.clone()));
1351    }
1352
1353    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1354
1355    // 1. {var}._vid
1356    let vid_col = batch
1357        .column_by_name("_vid")
1358        .ok_or_else(|| {
1359            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1360        })?
1361        .clone();
1362    let vid_arr = vid_col
1363        .as_any()
1364        .downcast_ref::<UInt64Array>()
1365        .ok_or_else(|| {
1366            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1367        })?;
1368
1369    // 2. {var}._labels — read from stored column, overlay L0 additions
1370    let batch_labels_col = batch
1371        .column_by_name("_labels")
1372        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1373    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1374    columns.push(vid_col.clone());
1375    columns.push(labels_col);
1376
1377    // 3. Projected properties
1378    // Pre-load overflow_json column for extracting non-schema properties
1379    let overflow_arr = batch
1380        .column_by_name("overflow_json")
1381        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1382
1383    for prop in projected_properties {
1384        if prop == "overflow_json" {
1385            match batch.column_by_name("overflow_json") {
1386                Some(col) => columns.push(col.clone()),
1387                None => {
1388                    // No overflow_json in Lance — return null column
1389                    columns.push(arrow_array::new_null_array(
1390                        &DataType::LargeBinary,
1391                        batch.num_rows(),
1392                    ));
1393                }
1394            }
1395        } else if prop == "_all_props" {
1396            // Build _all_props from overflow_json + L0 overlay.
1397            // Fast path: if no L0 buffer has vertex property mutations AND
1398            // there are no schema columns to merge, pass through overflow_json.
1399            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1400                let guard = l0.read();
1401                !guard.vertex_properties.is_empty()
1402            });
1403            // Check if this label has schema-defined columns (besides system columns)
1404            let has_schema_cols = projected_properties
1405                .iter()
1406                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1407
1408            if !any_l0_has_vertex_props && !has_schema_cols {
1409                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1410                match batch.column_by_name("overflow_json") {
1411                    Some(col) => columns.push(col.clone()),
1412                    None => {
1413                        columns.push(arrow_array::new_null_array(
1414                            &DataType::LargeBinary,
1415                            batch.num_rows(),
1416                        ));
1417                    }
1418                }
1419            } else {
1420                // Need to merge: schema columns + overflow_json + L0 overlay
1421                let col = build_all_props_column_for_schema_scan(
1422                    batch,
1423                    vid_arr,
1424                    overflow_arr,
1425                    projected_properties,
1426                    l0_ctx,
1427                );
1428                columns.push(col);
1429            }
1430        } else {
1431            match batch.column_by_name(prop) {
1432                Some(col) => columns.push(col.clone()),
1433                None => {
1434                    // Column missing in Lance -- extract from overflow_json
1435                    // CypherValue blob with L0 overlay
1436                    let col = build_overflow_property_column(
1437                        batch.num_rows(),
1438                        vid_arr,
1439                        overflow_arr,
1440                        prop,
1441                        l0_ctx,
1442                    );
1443                    columns.push(col);
1444                }
1445            }
1446        }
1447    }
1448
1449    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1450}
1451
1452/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1453///
1454/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1455/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1456/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1457/// `_version` are dropped.
1458fn map_edge_to_output_schema(
1459    batch: &RecordBatch,
1460    variable: &str,
1461    projected_properties: &[String],
1462    output_schema: &SchemaRef,
1463) -> DFResult<RecordBatch> {
1464    if batch.num_rows() == 0 {
1465        return Ok(RecordBatch::new_empty(output_schema.clone()));
1466    }
1467
1468    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1469
1470    // 1. {var}._eid
1471    let eid_col = batch
1472        .column_by_name("eid")
1473        .ok_or_else(|| {
1474            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1475        })?
1476        .clone();
1477    columns.push(eid_col);
1478
1479    // 2. {var}._src_vid
1480    let src_col = batch
1481        .column_by_name("src_vid")
1482        .ok_or_else(|| {
1483            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1484        })?
1485        .clone();
1486    columns.push(src_col);
1487
1488    // 3. {var}._dst_vid
1489    let dst_col = batch
1490        .column_by_name("dst_vid")
1491        .ok_or_else(|| {
1492            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1493        })?
1494        .clone();
1495    columns.push(dst_col);
1496
1497    // 4. Projected properties
1498    for prop in projected_properties {
1499        if prop == "overflow_json" {
1500            match batch.column_by_name("overflow_json") {
1501                Some(col) => columns.push(col.clone()),
1502                None => {
1503                    columns.push(arrow_array::new_null_array(
1504                        &DataType::LargeBinary,
1505                        batch.num_rows(),
1506                    ));
1507                }
1508            }
1509        } else {
1510            match batch.column_by_name(prop) {
1511                Some(col) => columns.push(col.clone()),
1512                None => {
1513                    // Column missing in Lance — extract from overflow_json CypherValue blob
1514                    // (mirrors the vertex path in map_to_output_schema)
1515                    let overflow_arr = batch
1516                        .column_by_name("overflow_json")
1517                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1518
1519                    if let Some(arr) = overflow_arr {
1520                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1521                        for i in 0..batch.num_rows() {
1522                            if !arr.is_null(i) {
1523                                let blob = arr.value(i);
1524                                // Fast path: extract map entry without decoding entire map
1525                                if let Some(sub_bytes) =
1526                                    uni_common::cypher_value_codec::extract_map_entry_raw(
1527                                        blob, prop,
1528                                    )
1529                                {
1530                                    builder.append_value(&sub_bytes);
1531                                } else {
1532                                    builder.append_null();
1533                                }
1534                            } else {
1535                                builder.append_null();
1536                            }
1537                        }
1538                        columns.push(Arc::new(builder.finish()));
1539                    } else {
1540                        // No overflow_json column either — return null column
1541                        let target_field = output_schema
1542                            .fields()
1543                            .iter()
1544                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
1545                        let dt = target_field
1546                            .map(|f| f.data_type().clone())
1547                            .unwrap_or(DataType::LargeBinary);
1548                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1549                    }
1550                }
1551            }
1552        }
1553    }
1554
1555    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1556}
1557
1558/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
1559///
1560/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
1561/// for known-label vertex scans. Reads all needed columns in a single Lance query,
1562/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
1563/// and maps to the output schema.
1564async fn columnar_scan_vertex_batch_static(
1565    graph_ctx: &GraphExecutionContext,
1566    label: &str,
1567    variable: &str,
1568    projected_properties: &[String],
1569    output_schema: &SchemaRef,
1570) -> DFResult<RecordBatch> {
1571    let storage = graph_ctx.storage();
1572    let l0_ctx = graph_ctx.l0_context();
1573    let uni_schema = storage.schema_manager().schema();
1574    let label_props = uni_schema.properties.get(label);
1575
1576    // Build the list of columns to request from Lance
1577    let mut lance_columns: Vec<String> = vec![
1578        "_vid".to_string(),
1579        "_deleted".to_string(),
1580        "_version".to_string(),
1581    ];
1582    for prop in projected_properties {
1583        if prop == "overflow_json" {
1584            push_column_if_absent(&mut lance_columns, "overflow_json");
1585        } else {
1586            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1587            if exists_in_schema {
1588                push_column_if_absent(&mut lance_columns, prop);
1589            }
1590        }
1591    }
1592
1593    // Ensure overflow_json is present when any projected property is not in the schema
1594    let needs_overflow = projected_properties
1595        .iter()
1596        .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1597    if needs_overflow {
1598        push_column_if_absent(&mut lance_columns, "overflow_json");
1599    }
1600
1601    // Try to query Lance
1602    let ds = storage
1603        .vertex_dataset(label)
1604        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1605    let lancedb_store = storage.lancedb_store();
1606
1607    let lance_batch = match ds.open_lancedb(lancedb_store).await {
1608        Ok(table) => {
1609            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1610
1611            // Check which columns actually exist in the Lance table
1612            let table_schema = table
1613                .schema()
1614                .await
1615                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1616            let table_field_names: HashSet<&str> = table_schema
1617                .fields()
1618                .iter()
1619                .map(|f| f.name().as_str())
1620                .collect();
1621
1622            let actual_columns: Vec<&str> = lance_columns
1623                .iter()
1624                .filter(|c| table_field_names.contains(c.as_str()))
1625                .map(|c| c.as_str())
1626                .collect();
1627
1628            let query = table.query().select(Select::columns(&actual_columns));
1629            // Do NOT filter _deleted here — MVCC dedup must see the
1630            // highest-version row first (which may be a deletion tombstone).
1631            // Deleted rows are filtered out after dedup.
1632            let query = match storage.version_high_water_mark() {
1633                Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1634                None => query,
1635            };
1636
1637            match query.execute().await {
1638                Ok(stream) => {
1639                    use futures::TryStreamExt;
1640                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1641                    if batches.is_empty() {
1642                        None
1643                    } else {
1644                        Some(
1645                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1646                                .map_err(|e| {
1647                                    datafusion::error::DataFusionError::ArrowError(
1648                                        Box::new(e),
1649                                        None,
1650                                    )
1651                                })?,
1652                        )
1653                    }
1654                }
1655                Err(_) => None,
1656            }
1657        }
1658        Err(_) => None, // Table doesn't exist yet — only L0 data
1659    };
1660
1661    // MVCC dedup the Lance batch
1662    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1663
1664    // Build the internal Lance schema for L0 batch construction.
1665    // Use the Lance batch schema if available, otherwise build from scratch.
1666    let internal_schema = match &lance_deduped {
1667        Some(batch) => batch.schema(),
1668        None => {
1669            let mut fields = vec![
1670                Field::new("_vid", DataType::UInt64, false),
1671                Field::new("_deleted", DataType::Boolean, false),
1672                Field::new("_version", DataType::UInt64, false),
1673            ];
1674            for col in &lance_columns {
1675                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1676                    continue;
1677                }
1678                if col == "overflow_json" {
1679                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1680                } else {
1681                    let arrow_type = label_props
1682                        .and_then(|lp| lp.get(col.as_str()))
1683                        .map(|meta| meta.r#type.to_arrow())
1684                        .unwrap_or(DataType::LargeBinary);
1685                    fields.push(Field::new(col, arrow_type, true));
1686                }
1687            }
1688            Arc::new(Schema::new(fields))
1689        }
1690    };
1691
1692    // Build L0 batch
1693    let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1694
1695    // Merge Lance + L0
1696    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1697    else {
1698        return Ok(RecordBatch::new_empty(output_schema.clone()));
1699    };
1700
1701    // Filter out MVCC deletion tombstones (_deleted = true)
1702    let merged = filter_deleted_rows(&merged)?;
1703    if merged.num_rows() == 0 {
1704        return Ok(RecordBatch::new_empty(output_schema.clone()));
1705    }
1706
1707    // Filter L0 tombstones
1708    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1709
1710    if filtered.num_rows() == 0 {
1711        return Ok(RecordBatch::new_empty(output_schema.clone()));
1712    }
1713
1714    // Map to output schema
1715    map_to_output_schema(
1716        &filtered,
1717        label,
1718        variable,
1719        projected_properties,
1720        output_schema,
1721        l0_ctx,
1722    )
1723}
1724
1725/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
1726///
1727/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
1728/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
1729/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
1730/// to the output schema.
1731async fn columnar_scan_edge_batch_static(
1732    graph_ctx: &GraphExecutionContext,
1733    edge_type: &str,
1734    variable: &str,
1735    projected_properties: &[String],
1736    output_schema: &SchemaRef,
1737) -> DFResult<RecordBatch> {
1738    let storage = graph_ctx.storage();
1739    let l0_ctx = graph_ctx.l0_context();
1740    let uni_schema = storage.schema_manager().schema();
1741    let type_props = uni_schema.properties.get(edge_type);
1742
1743    // Build the list of columns to request from DeltaDataset Lance table
1744    let mut lance_columns: Vec<String> = vec![
1745        "eid".to_string(),
1746        "src_vid".to_string(),
1747        "dst_vid".to_string(),
1748        "op".to_string(),
1749        "_version".to_string(),
1750    ];
1751    for prop in projected_properties {
1752        if prop == "overflow_json" {
1753            push_column_if_absent(&mut lance_columns, "overflow_json");
1754        } else {
1755            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1756            if exists_in_schema {
1757                push_column_if_absent(&mut lance_columns, prop);
1758            }
1759        }
1760    }
1761
1762    // Ensure overflow_json is present when any projected property is not in the schema
1763    let needs_overflow = projected_properties
1764        .iter()
1765        .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1766    if needs_overflow {
1767        push_column_if_absent(&mut lance_columns, "overflow_json");
1768    }
1769
1770    // Try to query DeltaDataset (forward direction)
1771    let ds = storage
1772        .delta_dataset(edge_type, "fwd")
1773        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1774    let lancedb_store = storage.lancedb_store();
1775
1776    let lance_batch = match ds.open_lancedb(lancedb_store).await {
1777        Ok(table) => {
1778            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1779
1780            // Check which columns actually exist in the Lance table
1781            let table_schema = table
1782                .schema()
1783                .await
1784                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1785            let table_field_names: HashSet<&str> = table_schema
1786                .fields()
1787                .iter()
1788                .map(|f| f.name().as_str())
1789                .collect();
1790
1791            let actual_columns: Vec<&str> = lance_columns
1792                .iter()
1793                .filter(|c| table_field_names.contains(c.as_str()))
1794                .map(|c| c.as_str())
1795                .collect();
1796
1797            // Do NOT filter op here — MVCC dedup must see DELETE ops
1798            // (op != 0) to pick the highest version. Deleted edges are
1799            // filtered out after dedup.
1800            let query = table.query().select(Select::columns(&actual_columns));
1801            let query = match storage.version_high_water_mark() {
1802                Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1803                None => query,
1804            };
1805
1806            match query.execute().await {
1807                Ok(stream) => {
1808                    use futures::TryStreamExt;
1809                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1810                    if batches.is_empty() {
1811                        None
1812                    } else {
1813                        Some(
1814                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1815                                .map_err(|e| {
1816                                    datafusion::error::DataFusionError::ArrowError(
1817                                        Box::new(e),
1818                                        None,
1819                                    )
1820                                })?,
1821                        )
1822                    }
1823                }
1824                Err(_) => None,
1825            }
1826        }
1827        Err(_) => None, // Table doesn't exist yet — only L0 data
1828    };
1829
1830    // MVCC dedup the Lance batch (by eid)
1831    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1832
1833    // Build the internal schema for L0 batch construction.
1834    // Use the Lance batch schema if available, otherwise build from scratch.
1835    let internal_schema = match &lance_deduped {
1836        Some(batch) => batch.schema(),
1837        None => {
1838            let mut fields = vec![
1839                Field::new("eid", DataType::UInt64, false),
1840                Field::new("src_vid", DataType::UInt64, false),
1841                Field::new("dst_vid", DataType::UInt64, false),
1842                Field::new("op", DataType::UInt8, false),
1843                Field::new("_version", DataType::UInt64, false),
1844            ];
1845            for col in &lance_columns {
1846                if matches!(
1847                    col.as_str(),
1848                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1849                ) {
1850                    continue;
1851                }
1852                if col == "overflow_json" {
1853                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1854                } else {
1855                    let arrow_type = type_props
1856                        .and_then(|tp| tp.get(col.as_str()))
1857                        .map(|meta| meta.r#type.to_arrow())
1858                        .unwrap_or(DataType::LargeBinary);
1859                    fields.push(Field::new(col, arrow_type, true));
1860                }
1861            }
1862            Arc::new(Schema::new(fields))
1863        }
1864    };
1865
1866    // Build L0 batch
1867    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1868
1869    // Merge Lance + L0
1870    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1871        return Ok(RecordBatch::new_empty(output_schema.clone()));
1872    };
1873
1874    // Filter out MVCC deletion ops (op != 0) after dedup
1875    let merged = filter_deleted_edge_ops(&merged)?;
1876    if merged.num_rows() == 0 {
1877        return Ok(RecordBatch::new_empty(output_schema.clone()));
1878    }
1879
1880    // Filter L0 edge tombstones
1881    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1882
1883    if filtered.num_rows() == 0 {
1884        return Ok(RecordBatch::new_empty(output_schema.clone()));
1885    }
1886
1887    // Map to output schema
1888    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1889}
1890
1891/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
1892///
1893/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
1894/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
1895/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
1896/// buffer data, filters tombstones, and maps to the output schema.
1897async fn columnar_scan_schemaless_vertex_batch_static(
1898    graph_ctx: &GraphExecutionContext,
1899    label: &str,
1900    variable: &str,
1901    projected_properties: &[String],
1902    output_schema: &SchemaRef,
1903) -> DFResult<RecordBatch> {
1904    use uni_store::storage::main_vertex::MainVertexDataset;
1905
1906    let storage = graph_ctx.storage();
1907    let l0_ctx = graph_ctx.l0_context();
1908    let lancedb_store = storage.lancedb_store();
1909
1910    // Build the Lance filter expression — do NOT filter _deleted here;
1911    // MVCC dedup must see deletion tombstones to pick the highest version.
1912    let filter = {
1913        let mut parts = Vec::new();
1914
1915        // Label filter
1916        if !label.is_empty() {
1917            if label.contains(':') {
1918                // Multi-label: each label must be present
1919                for lbl in label.split(':') {
1920                    parts.push(format!("array_contains(labels, '{}')", lbl));
1921                }
1922            } else {
1923                parts.push(format!("array_contains(labels, '{}')", label));
1924            }
1925        }
1926
1927        // MVCC version filter
1928        if let Some(hwm) = storage.version_high_water_mark() {
1929            parts.push(format!("_version <= {}", hwm));
1930        }
1931
1932        if parts.is_empty() {
1933            None
1934        } else {
1935            Some(parts.join(" AND "))
1936        }
1937    };
1938
1939    // Single Lance query: SELECT _vid, _deleted, labels, props_json, _version WHERE <filter>
1940    let lance_batch = match MainVertexDataset::open_table(lancedb_store).await {
1941        Ok(table) => {
1942            use lancedb::query::{ExecutableQuery, QueryBase, Select};
1943
1944            let query = table.query().select(Select::columns(&[
1945                "_vid",
1946                "_deleted",
1947                "labels",
1948                "props_json",
1949                "_version",
1950            ]));
1951            let query = match filter {
1952                Some(f) => query.only_if(f),
1953                None => query,
1954            };
1955
1956            match query.execute().await {
1957                Ok(stream) => {
1958                    use futures::TryStreamExt;
1959                    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1960                    if batches.is_empty() {
1961                        None
1962                    } else {
1963                        Some(
1964                            arrow::compute::concat_batches(&batches[0].schema(), &batches)
1965                                .map_err(|e| {
1966                                    datafusion::error::DataFusionError::ArrowError(
1967                                        Box::new(e),
1968                                        None,
1969                                    )
1970                                })?,
1971                        )
1972                    }
1973                }
1974                Err(_) => None,
1975            }
1976        }
1977        Err(_) => None, // Table doesn't exist yet — only L0 data
1978    };
1979
1980    // MVCC dedup the Lance batch
1981    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1982
1983    // Build the internal schema for L0 batch construction.
1984    // Use the Lance batch schema if available, otherwise build from scratch.
1985    let internal_schema = match &lance_deduped {
1986        Some(batch) => batch.schema(),
1987        None => Arc::new(Schema::new(vec![
1988            Field::new("_vid", DataType::UInt64, false),
1989            Field::new("_deleted", DataType::Boolean, false),
1990            Field::new("labels", labels_data_type(), false),
1991            Field::new("props_json", DataType::LargeBinary, true),
1992            Field::new("_version", DataType::UInt64, false),
1993        ])),
1994    };
1995
1996    // Build L0 batch
1997    let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1998
1999    // Merge Lance + L0
2000    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2001    else {
2002        return Ok(RecordBatch::new_empty(output_schema.clone()));
2003    };
2004
2005    // Filter out MVCC deletion tombstones (_deleted = true)
2006    let merged = filter_deleted_rows(&merged)?;
2007    if merged.num_rows() == 0 {
2008        return Ok(RecordBatch::new_empty(output_schema.clone()));
2009    }
2010
2011    // Filter L0 tombstones
2012    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2013
2014    if filtered.num_rows() == 0 {
2015        return Ok(RecordBatch::new_empty(output_schema.clone()));
2016    }
2017
2018    // Map to output schema
2019    map_to_schemaless_output_schema(
2020        &filtered,
2021        variable,
2022        projected_properties,
2023        output_schema,
2024        l0_ctx,
2025    )
2026}
2027
2028/// Build a RecordBatch from L0 buffer data for schemaless vertices.
2029///
2030/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
2031/// with later buffers overwriting earlier ones for the same VID. Produces a batch
2032/// matching the internal schema: `_vid, labels, props_json, _version`.
2033fn build_l0_schemaless_vertex_batch(
2034    l0_ctx: &crate::query::df_graph::L0Context,
2035    label: &str,
2036    internal_schema: &SchemaRef,
2037) -> DFResult<RecordBatch> {
2038    // Collect all L0 vertex data, merging in visibility order
2039    // vid -> (merged_props, highest_version, labels)
2040    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2041    let mut tombstones: HashSet<u64> = HashSet::new();
2042
2043    // Parse multi-label filter
2044    let label_filter: Vec<&str> = if label.is_empty() {
2045        vec![]
2046    } else if label.contains(':') {
2047        label.split(':').collect()
2048    } else {
2049        vec![label]
2050    };
2051
2052    for l0 in l0_ctx.iter_l0_buffers() {
2053        let guard = l0.read();
2054
2055        // Collect tombstones
2056        for vid in guard.vertex_tombstones.iter() {
2057            tombstones.insert(vid.as_u64());
2058        }
2059
2060        // Collect VIDs matching the label filter
2061        let vids: Vec<Vid> = if label_filter.is_empty() {
2062            guard.all_vertex_vids()
2063        } else if label_filter.len() == 1 {
2064            guard.vids_for_label(label_filter[0])
2065        } else {
2066            guard.vids_with_all_labels(&label_filter)
2067        };
2068
2069        for vid in vids {
2070            let vid_u64 = vid.as_u64();
2071            if tombstones.contains(&vid_u64) {
2072                continue;
2073            }
2074            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2075            let entry = vid_data
2076                .entry(vid_u64)
2077                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2078
2079            // Merge properties (later L0 overwrites)
2080            if let Some(props) = guard.vertex_properties.get(&vid) {
2081                for (k, v) in props {
2082                    entry.0.insert(k.clone(), v.clone());
2083                }
2084            }
2085            // Take the highest version
2086            if version > entry.1 {
2087                entry.1 = version;
2088            }
2089            // Update labels from latest L0 layer
2090            if let Some(labels) = guard.vertex_labels.get(&vid) {
2091                entry.2 = labels.clone();
2092            }
2093        }
2094    }
2095
2096    // Remove tombstoned VIDs
2097    for t in &tombstones {
2098        vid_data.remove(t);
2099    }
2100
2101    if vid_data.is_empty() {
2102        return Ok(RecordBatch::new_empty(internal_schema.clone()));
2103    }
2104
2105    // Sort VIDs for deterministic output
2106    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2107    vids.sort_unstable();
2108
2109    let num_rows = vids.len();
2110    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2111
2112    for field in internal_schema.fields() {
2113        match field.name().as_str() {
2114            "_vid" => {
2115                columns.push(Arc::new(UInt64Array::from(vids.clone())));
2116            }
2117            "labels" => {
2118                let mut labels_builder = ListBuilder::new(StringBuilder::new());
2119                for vid_u64 in &vids {
2120                    let (_, _, labels) = &vid_data[vid_u64];
2121                    let values = labels_builder.values();
2122                    for lbl in labels {
2123                        values.append_value(lbl);
2124                    }
2125                    labels_builder.append(true);
2126                }
2127                columns.push(Arc::new(labels_builder.finish()));
2128            }
2129            "props_json" => {
2130                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2131                for vid_u64 in &vids {
2132                    let (props, _, _) = &vid_data[vid_u64];
2133                    if props.is_empty() {
2134                        builder.append_null();
2135                    } else {
2136                        // Encode properties as CypherValue blob
2137                        let json_obj: serde_json::Value = {
2138                            let mut map = serde_json::Map::new();
2139                            for (k, v) in props {
2140                                let json_val: serde_json::Value = v.clone().into();
2141                                map.insert(k.clone(), json_val);
2142                            }
2143                            serde_json::Value::Object(map)
2144                        };
2145                        match encode_cypher_value(&json_obj) {
2146                            Ok(bytes) => builder.append_value(bytes),
2147                            Err(_) => builder.append_null(),
2148                        }
2149                    }
2150                }
2151                columns.push(Arc::new(builder.finish()));
2152            }
2153            "_deleted" => {
2154                // L0 vertices are always live (tombstoned ones already excluded)
2155                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2156                    false;
2157                    num_rows
2158                ])));
2159            }
2160            "_version" => {
2161                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2162                columns.push(Arc::new(UInt64Array::from(vals)));
2163            }
2164            _ => {
2165                // Unexpected column — fill with nulls
2166                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2167            }
2168        }
2169    }
2170
2171    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2172}
2173
2174/// Map an internal-schema schemaless batch to the DataFusion output schema.
2175///
2176/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2177/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2178/// Individual properties are extracted from the `props_json` CypherValue blob by
2179/// decoding to a Map and extracting the sub-value.
2180fn map_to_schemaless_output_schema(
2181    batch: &RecordBatch,
2182    _variable: &str,
2183    projected_properties: &[String],
2184    output_schema: &SchemaRef,
2185    l0_ctx: &crate::query::df_graph::L0Context,
2186) -> DFResult<RecordBatch> {
2187    if batch.num_rows() == 0 {
2188        return Ok(RecordBatch::new_empty(output_schema.clone()));
2189    }
2190
2191    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2192
2193    // 1. {var}._vid — passthrough
2194    let vid_col = batch
2195        .column_by_name("_vid")
2196        .ok_or_else(|| {
2197            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2198        })?
2199        .clone();
2200    let vid_arr = vid_col
2201        .as_any()
2202        .downcast_ref::<UInt64Array>()
2203        .ok_or_else(|| {
2204            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2205        })?;
2206    columns.push(vid_col.clone());
2207
2208    // 2. {var}._labels — from labels column with L0 overlay
2209    let labels_col = batch.column_by_name("labels");
2210    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2211
2212    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2213    for i in 0..vid_arr.len() {
2214        let vid_u64 = vid_arr.value(i);
2215        let vid = Vid::from(vid_u64);
2216
2217        // Start with labels from the batch
2218        let mut row_labels: Vec<String> = Vec::new();
2219        if let Some(arr) = labels_arr
2220            && !arr.is_null(i)
2221        {
2222            let list_val = arr.value(i);
2223            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2224                for j in 0..str_arr.len() {
2225                    if !str_arr.is_null(j) {
2226                        row_labels.push(str_arr.value(j).to_string());
2227                    }
2228                }
2229            }
2230        }
2231
2232        // Overlay L0 labels
2233        for l0 in l0_ctx.iter_l0_buffers() {
2234            let guard = l0.read();
2235            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2236                for lbl in l0_labels {
2237                    if !row_labels.contains(lbl) {
2238                        row_labels.push(lbl.clone());
2239                    }
2240                }
2241            }
2242        }
2243
2244        let values = labels_builder.values();
2245        for lbl in &row_labels {
2246            values.append_value(lbl);
2247        }
2248        labels_builder.append(true);
2249    }
2250    columns.push(Arc::new(labels_builder.finish()));
2251
2252    // 3. Projected properties — extract from props_json
2253    let props_col = batch.column_by_name("props_json");
2254    let props_arr =
2255        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2256
2257    for prop in projected_properties {
2258        if prop == "_all_props" {
2259            // Fast path: if no L0 buffer has vertex property mutations,
2260            // the raw props_json passthrough is correct.
2261            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2262                let guard = l0.read();
2263                !guard.vertex_properties.is_empty()
2264            });
2265            if !any_l0_has_vertex_props {
2266                match props_col {
2267                    Some(col) => columns.push(col.clone()),
2268                    None => {
2269                        columns.push(arrow_array::new_null_array(
2270                            &DataType::LargeBinary,
2271                            batch.num_rows(),
2272                        ));
2273                    }
2274                }
2275            } else {
2276                let col = build_all_props_column_with_l0_overlay(
2277                    batch.num_rows(),
2278                    vid_arr,
2279                    props_arr,
2280                    l0_ctx,
2281                );
2282                columns.push(col);
2283            }
2284        } else {
2285            // Extract individual property from CypherValue blob with L0 overlay
2286            let col =
2287                build_overflow_property_column(batch.num_rows(), vid_arr, props_arr, prop, l0_ctx);
2288            columns.push(col);
2289        }
2290    }
2291
2292    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2293}
2294
2295/// Get the property value for a VID, returning None if not found.
2296pub(crate) fn get_property_value(
2297    vid: &Vid,
2298    props_map: &HashMap<Vid, Properties>,
2299    prop_name: &str,
2300) -> Option<Value> {
2301    if prop_name == "_all_props" {
2302        return props_map.get(vid).map(|p| {
2303            let map: HashMap<String, Value> =
2304                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2305            Value::Map(map)
2306        });
2307    }
2308    props_map
2309        .get(vid)
2310        .and_then(|props| props.get(prop_name))
2311        .cloned()
2312}
2313
2314/// Encode a `serde_json::Value` as CypherValue binary (MessagePack-tagged).
2315///
2316/// Converts from serde_json::Value -> uni_common::Value -> CypherValue bytes.
2317pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2318    let uni_val: uni_common::Value = val.clone().into();
2319    Ok(uni_common::cypher_value_codec::encode(&uni_val))
2320}
2321
2322/// Build a numeric column from property values using the specified builder and extractor.
2323macro_rules! build_numeric_column {
2324    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2325        let mut builder = <$builder_ty>::new();
2326        for vid in $vids {
2327            match get_property_value(vid, $props_map, $prop_name) {
2328                Some(ref v) => {
2329                    if let Some(val) = $extractor(v) {
2330                        builder.append_value($cast(val));
2331                    } else {
2332                        builder.append_null();
2333                    }
2334                }
2335                None => builder.append_null(),
2336            }
2337        }
2338        Ok(Arc::new(builder.finish()) as ArrayRef)
2339    }};
2340}
2341
2342/// Build an Arrow column from property values (static version).
2343pub(crate) fn build_property_column_static(
2344    vids: &[Vid],
2345    props_map: &HashMap<Vid, Properties>,
2346    prop_name: &str,
2347    data_type: &DataType,
2348) -> DFResult<ArrayRef> {
2349    match data_type {
2350        DataType::LargeBinary => {
2351            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2352            use arrow_array::builder::LargeBinaryBuilder;
2353            let mut builder = LargeBinaryBuilder::new();
2354
2355            for vid in vids {
2356                match get_property_value(vid, props_map, prop_name) {
2357                    Some(Value::Null) | None => builder.append_null(),
2358                    Some(Value::Bytes(bytes)) => {
2359                        builder.append_value(&bytes);
2360                    }
2361                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2362                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2363                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2364                        let bytes: Vec<u8> = arr
2365                            .iter()
2366                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2367                            .collect();
2368                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2369                            builder.append_value(&bytes);
2370                        } else {
2371                            let json_val: serde_json::Value = Value::List(arr).into();
2372                            match encode_cypher_value(&json_val) {
2373                                Ok(encoded) => builder.append_value(encoded),
2374                                Err(_) => builder.append_null(),
2375                            }
2376                        }
2377                    }
2378                    Some(val) => {
2379                        // Value from PropertyManager — convert to serde_json and re-encode to CypherValue binary
2380                        let json_val: serde_json::Value = val.into();
2381                        match encode_cypher_value(&json_val) {
2382                            Ok(bytes) => builder.append_value(bytes),
2383                            Err(_) => builder.append_null(),
2384                        }
2385                    }
2386                }
2387            }
2388            Ok(Arc::new(builder.finish()))
2389        }
2390        DataType::Binary => {
2391            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2392            let mut builder = BinaryBuilder::new();
2393            for vid in vids {
2394                let bytes = get_property_value(vid, props_map, prop_name)
2395                    .filter(|v| !v.is_null())
2396                    .and_then(|v| {
2397                        let json_val: serde_json::Value = v.into();
2398                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2399                    })
2400                    .and_then(|crdt| crdt.to_msgpack().ok());
2401                match bytes {
2402                    Some(b) => builder.append_value(&b),
2403                    None => builder.append_null(),
2404                }
2405            }
2406            Ok(Arc::new(builder.finish()))
2407        }
2408        DataType::Utf8 => {
2409            let mut builder = StringBuilder::new();
2410            for vid in vids {
2411                match get_property_value(vid, props_map, prop_name) {
2412                    Some(Value::String(s)) => builder.append_value(s),
2413                    Some(Value::Null) | None => builder.append_null(),
2414                    Some(other) => builder.append_value(other.to_string()),
2415                }
2416            }
2417            Ok(Arc::new(builder.finish()))
2418        }
2419        DataType::Int64 => {
2420            build_numeric_column!(
2421                vids,
2422                props_map,
2423                prop_name,
2424                Int64Builder,
2425                |v: &Value| v.as_i64(),
2426                |v| v
2427            )
2428        }
2429        DataType::Int32 => {
2430            build_numeric_column!(
2431                vids,
2432                props_map,
2433                prop_name,
2434                Int32Builder,
2435                |v: &Value| v.as_i64(),
2436                |v: i64| v as i32
2437            )
2438        }
2439        DataType::Float64 => {
2440            build_numeric_column!(
2441                vids,
2442                props_map,
2443                prop_name,
2444                Float64Builder,
2445                |v: &Value| v.as_f64(),
2446                |v| v
2447            )
2448        }
2449        DataType::Float32 => {
2450            build_numeric_column!(
2451                vids,
2452                props_map,
2453                prop_name,
2454                Float32Builder,
2455                |v: &Value| v.as_f64(),
2456                |v: f64| v as f32
2457            )
2458        }
2459        DataType::Boolean => {
2460            let mut builder = BooleanBuilder::new();
2461            for vid in vids {
2462                match get_property_value(vid, props_map, prop_name) {
2463                    Some(Value::Bool(b)) => builder.append_value(b),
2464                    _ => builder.append_null(),
2465                }
2466            }
2467            Ok(Arc::new(builder.finish()))
2468        }
2469        DataType::UInt64 => {
2470            build_numeric_column!(
2471                vids,
2472                props_map,
2473                prop_name,
2474                UInt64Builder,
2475                |v: &Value| v.as_u64(),
2476                |v| v
2477            )
2478        }
2479        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2480            // Vector properties: FixedSizeList(Float32, N)
2481            let values_builder = Float32Builder::new();
2482            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2483            for vid in vids {
2484                match get_property_value(vid, props_map, prop_name) {
2485                    Some(Value::Vector(v)) => {
2486                        for val in v {
2487                            list_builder.values().append_value(val);
2488                        }
2489                        list_builder.append(true);
2490                    }
2491                    Some(Value::List(arr)) => {
2492                        for v in arr {
2493                            list_builder
2494                                .values()
2495                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
2496                        }
2497                        list_builder.append(true);
2498                    }
2499                    _ => {
2500                        // Append dim nulls to inner values, then mark row as null
2501                        for _ in 0..*dim {
2502                            list_builder.values().append_null();
2503                        }
2504                        list_builder.append(false);
2505                    }
2506                }
2507            }
2508            Ok(Arc::new(list_builder.finish()))
2509        }
2510        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2511            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
2512            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2513            for vid in vids {
2514                match get_property_value(vid, props_map, prop_name) {
2515                    Some(Value::Temporal(tv)) => match tv {
2516                        uni_common::TemporalValue::DateTime {
2517                            nanos_since_epoch, ..
2518                        }
2519                        | uni_common::TemporalValue::LocalDateTime {
2520                            nanos_since_epoch, ..
2521                        } => {
2522                            builder.append_value(nanos_since_epoch);
2523                        }
2524                        uni_common::TemporalValue::Date { days_since_epoch } => {
2525                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2526                        }
2527                        _ => builder.append_null(),
2528                    },
2529                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
2530                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2531                        Err(_) => builder.append_null(),
2532                    },
2533                    Some(Value::Int(n)) => {
2534                        builder.append_value(n);
2535                    }
2536                    _ => builder.append_null(),
2537                }
2538            }
2539            Ok(Arc::new(builder.finish()))
2540        }
2541        DataType::Date32 => {
2542            let mut builder = Date32Builder::new();
2543            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2544            for vid in vids {
2545                match get_property_value(vid, props_map, prop_name) {
2546                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2547                        builder.append_value(days_since_epoch);
2548                    }
2549                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2550                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2551                        Err(_) => builder.append_null(),
2552                    },
2553                    Some(Value::Int(n)) => {
2554                        builder.append_value(n as i32);
2555                    }
2556                    _ => builder.append_null(),
2557                }
2558            }
2559            Ok(Arc::new(builder.finish()))
2560        }
2561        DataType::Time64(TimeUnit::Nanosecond) => {
2562            let mut builder = Time64NanosecondBuilder::new();
2563            for vid in vids {
2564                match get_property_value(vid, props_map, prop_name) {
2565                    Some(Value::Temporal(
2566                        uni_common::TemporalValue::LocalTime {
2567                            nanos_since_midnight,
2568                        }
2569                        | uni_common::TemporalValue::Time {
2570                            nanos_since_midnight,
2571                            ..
2572                        },
2573                    )) => {
2574                        builder.append_value(nanos_since_midnight);
2575                    }
2576                    Some(Value::Temporal(_)) => builder.append_null(),
2577                    Some(Value::String(s)) => {
2578                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2579                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2580                        {
2581                            Ok(t) => {
2582                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2583                                    + t.nanosecond() as i64;
2584                                builder.append_value(nanos);
2585                            }
2586                            Err(_) => builder.append_null(),
2587                        }
2588                    }
2589                    Some(Value::Int(n)) => {
2590                        builder.append_value(n);
2591                    }
2592                    _ => builder.append_null(),
2593                }
2594            }
2595            Ok(Arc::new(builder.finish()))
2596        }
2597        DataType::Interval(IntervalUnit::MonthDayNano) => {
2598            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2599                Vec::with_capacity(vids.len());
2600            for vid in vids {
2601                match get_property_value(vid, props_map, prop_name) {
2602                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
2603                        months,
2604                        days,
2605                        nanos,
2606                    })) => {
2607                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2608                            months: months as i32,
2609                            days: days as i32,
2610                            nanoseconds: nanos,
2611                        }));
2612                    }
2613                    Some(Value::Int(_n)) => {
2614                        values.push(None);
2615                    }
2616                    _ => values.push(None),
2617                }
2618            }
2619            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2620            Ok(Arc::new(arr))
2621        }
2622        DataType::List(inner_field) => {
2623            build_list_property_column(vids, props_map, prop_name, inner_field)
2624        }
2625        DataType::Struct(fields) => {
2626            build_struct_property_column(vids, props_map, prop_name, fields)
2627        }
2628        // Default: convert to string
2629        _ => {
2630            let mut builder = StringBuilder::new();
2631            for vid in vids {
2632                match get_property_value(vid, props_map, prop_name) {
2633                    Some(Value::Null) | None => builder.append_null(),
2634                    Some(other) => builder.append_value(other.to_string()),
2635                }
2636            }
2637            Ok(Arc::new(builder.finish()))
2638        }
2639    }
2640}
2641
2642/// Build a List-typed Arrow column from list property values.
2643fn build_list_property_column(
2644    vids: &[Vid],
2645    props_map: &HashMap<Vid, Properties>,
2646    prop_name: &str,
2647    inner_field: &Arc<Field>,
2648) -> DFResult<ArrayRef> {
2649    match inner_field.data_type() {
2650        DataType::Utf8 => {
2651            let mut builder = ListBuilder::new(StringBuilder::new());
2652            for vid in vids {
2653                match get_property_value(vid, props_map, prop_name) {
2654                    Some(Value::List(arr)) => {
2655                        for v in arr {
2656                            match v {
2657                                Value::String(s) => builder.values().append_value(s),
2658                                Value::Null => builder.values().append_null(),
2659                                other => builder.values().append_value(format!("{other:?}")),
2660                            }
2661                        }
2662                        builder.append(true);
2663                    }
2664                    _ => builder.append(false),
2665                }
2666            }
2667            Ok(Arc::new(builder.finish()))
2668        }
2669        DataType::Int64 => {
2670            let mut builder = ListBuilder::new(Int64Builder::new());
2671            for vid in vids {
2672                match get_property_value(vid, props_map, prop_name) {
2673                    Some(Value::List(arr)) => {
2674                        for v in arr {
2675                            match v.as_i64() {
2676                                Some(n) => builder.values().append_value(n),
2677                                None => builder.values().append_null(),
2678                            }
2679                        }
2680                        builder.append(true);
2681                    }
2682                    _ => builder.append(false),
2683                }
2684            }
2685            Ok(Arc::new(builder.finish()))
2686        }
2687        DataType::Float64 => {
2688            let mut builder = ListBuilder::new(Float64Builder::new());
2689            for vid in vids {
2690                match get_property_value(vid, props_map, prop_name) {
2691                    Some(Value::List(arr)) => {
2692                        for v in arr {
2693                            match v.as_f64() {
2694                                Some(n) => builder.values().append_value(n),
2695                                None => builder.values().append_null(),
2696                            }
2697                        }
2698                        builder.append(true);
2699                    }
2700                    _ => builder.append(false),
2701                }
2702            }
2703            Ok(Arc::new(builder.finish()))
2704        }
2705        DataType::Boolean => {
2706            let mut builder = ListBuilder::new(BooleanBuilder::new());
2707            for vid in vids {
2708                match get_property_value(vid, props_map, prop_name) {
2709                    Some(Value::List(arr)) => {
2710                        for v in arr {
2711                            match v.as_bool() {
2712                                Some(b) => builder.values().append_value(b),
2713                                None => builder.values().append_null(),
2714                            }
2715                        }
2716                        builder.append(true);
2717                    }
2718                    _ => builder.append(false),
2719                }
2720            }
2721            Ok(Arc::new(builder.finish()))
2722        }
2723        DataType::Struct(fields) => {
2724            // Map types are List(Struct(key, value)) — build struct inner elements
2725            build_list_of_structs_column(vids, props_map, prop_name, fields)
2726        }
2727        // Fallback: serialize inner elements as strings
2728        _ => {
2729            let mut builder = ListBuilder::new(StringBuilder::new());
2730            for vid in vids {
2731                match get_property_value(vid, props_map, prop_name) {
2732                    Some(Value::List(arr)) => {
2733                        for v in arr {
2734                            match v {
2735                                Value::Null => builder.values().append_null(),
2736                                other => builder.values().append_value(format!("{other:?}")),
2737                            }
2738                        }
2739                        builder.append(true);
2740                    }
2741                    _ => builder.append(false),
2742                }
2743            }
2744            Ok(Arc::new(builder.finish()))
2745        }
2746    }
2747}
2748
2749/// Build a List(Struct(...)) column, used for Map-type properties.
2750///
2751/// Handles two value representations:
2752/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
2753/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
2754fn build_list_of_structs_column(
2755    vids: &[Vid],
2756    props_map: &HashMap<Vid, Properties>,
2757    prop_name: &str,
2758    fields: &Fields,
2759) -> DFResult<ArrayRef> {
2760    use arrow_array::StructArray;
2761
2762    let values: Vec<Option<Value>> = vids
2763        .iter()
2764        .map(|vid| get_property_value(vid, props_map, prop_name))
2765        .collect();
2766
2767    // Convert each row's value to an owned Vec of Maps (key-value pairs).
2768    // This normalizes both List-of-maps and Map representations.
2769    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2770        .iter()
2771        .map(|val| match val {
2772            Some(Value::List(arr)) => {
2773                let objs: Vec<HashMap<String, Value>> = arr
2774                    .iter()
2775                    .filter_map(|v| {
2776                        if let Value::Map(m) = v {
2777                            Some(m.clone())
2778                        } else {
2779                            None
2780                        }
2781                    })
2782                    .collect();
2783                if objs.is_empty() { None } else { Some(objs) }
2784            }
2785            Some(Value::Map(obj)) => {
2786                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
2787                let kv_pairs: Vec<HashMap<String, Value>> = obj
2788                    .iter()
2789                    .map(|(k, v)| {
2790                        let mut m = HashMap::new();
2791                        m.insert("key".to_string(), Value::String(k.clone()));
2792                        m.insert("value".to_string(), v.clone());
2793                        m
2794                    })
2795                    .collect();
2796                Some(kv_pairs)
2797            }
2798            _ => None,
2799        })
2800        .collect();
2801
2802    let total_items: usize = rows
2803        .iter()
2804        .filter_map(|r| r.as_ref())
2805        .map(|v| v.len())
2806        .sum();
2807
2808    // Build child arrays for each field in the struct
2809    let child_arrays: Vec<ArrayRef> = fields
2810        .iter()
2811        .map(|field| {
2812            let field_name = field.name();
2813            match field.data_type() {
2814                DataType::Utf8 => {
2815                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2816                    for obj in rows.iter().flatten().flatten() {
2817                        match obj.get(field_name) {
2818                            Some(Value::String(s)) => builder.append_value(s),
2819                            Some(Value::Null) | None => builder.append_null(),
2820                            Some(other) => builder.append_value(format!("{other:?}")),
2821                        }
2822                    }
2823                    Arc::new(builder.finish()) as ArrayRef
2824                }
2825                DataType::Int64 => {
2826                    let mut builder = Int64Builder::with_capacity(total_items);
2827                    for obj in rows.iter().flatten().flatten() {
2828                        match obj.get(field_name).and_then(|v| v.as_i64()) {
2829                            Some(n) => builder.append_value(n),
2830                            None => builder.append_null(),
2831                        }
2832                    }
2833                    Arc::new(builder.finish()) as ArrayRef
2834                }
2835                DataType::Float64 => {
2836                    let mut builder = Float64Builder::with_capacity(total_items);
2837                    for obj in rows.iter().flatten().flatten() {
2838                        match obj.get(field_name).and_then(|v| v.as_f64()) {
2839                            Some(n) => builder.append_value(n),
2840                            None => builder.append_null(),
2841                        }
2842                    }
2843                    Arc::new(builder.finish()) as ArrayRef
2844                }
2845                // Fallback: serialize as string
2846                _ => {
2847                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2848                    for obj in rows.iter().flatten().flatten() {
2849                        match obj.get(field_name) {
2850                            Some(Value::Null) | None => builder.append_null(),
2851                            Some(other) => builder.append_value(format!("{other:?}")),
2852                        }
2853                    }
2854                    Arc::new(builder.finish()) as ArrayRef
2855                }
2856            }
2857        })
2858        .collect();
2859
2860    // Build struct array from children
2861    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2862        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2863
2864    // Build list offsets
2865    let mut offsets = Vec::with_capacity(vids.len() + 1);
2866    let mut nulls = Vec::with_capacity(vids.len());
2867    let mut offset = 0i32;
2868    offsets.push(offset);
2869    for row in &rows {
2870        match row {
2871            Some(objs) => {
2872                offset += objs.len() as i32;
2873                offsets.push(offset);
2874                nulls.push(true);
2875            }
2876            None => {
2877                offsets.push(offset);
2878                nulls.push(false);
2879            }
2880        }
2881    }
2882
2883    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2884    let list_array = arrow_array::ListArray::try_new(
2885        list_field,
2886        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2887        Arc::new(struct_array),
2888        Some(arrow::buffer::NullBuffer::from(nulls)),
2889    )
2890    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2891
2892    Ok(Arc::new(list_array))
2893}
2894
2895/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
2896/// so that `build_struct_property_column` can extract fields uniformly.
2897fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2898    use uni_common::value::TemporalValue;
2899    let mut m = HashMap::new();
2900    match tv {
2901        TemporalValue::DateTime {
2902            nanos_since_epoch,
2903            offset_seconds,
2904            timezone_name,
2905        } => {
2906            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2907            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2908            if let Some(tz) = timezone_name {
2909                m.insert("timezone_name".into(), Value::String(tz.clone()));
2910            }
2911        }
2912        TemporalValue::LocalDateTime { nanos_since_epoch } => {
2913            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2914        }
2915        TemporalValue::Time {
2916            nanos_since_midnight,
2917            offset_seconds,
2918        } => {
2919            m.insert(
2920                "nanos_since_midnight".into(),
2921                Value::Int(*nanos_since_midnight),
2922            );
2923            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2924        }
2925        TemporalValue::LocalTime {
2926            nanos_since_midnight,
2927        } => {
2928            m.insert(
2929                "nanos_since_midnight".into(),
2930                Value::Int(*nanos_since_midnight),
2931            );
2932        }
2933        TemporalValue::Date { days_since_epoch } => {
2934            m.insert(
2935                "days_since_epoch".into(),
2936                Value::Int(*days_since_epoch as i64),
2937            );
2938        }
2939        TemporalValue::Duration {
2940            months,
2941            days,
2942            nanos,
2943        } => {
2944            m.insert("months".into(), Value::Int(*months));
2945            m.insert("days".into(), Value::Int(*days));
2946            m.insert("nanos".into(), Value::Int(*nanos));
2947        }
2948    }
2949    m
2950}
2951
2952/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
2953fn build_struct_property_column(
2954    vids: &[Vid],
2955    props_map: &HashMap<Vid, Properties>,
2956    prop_name: &str,
2957    fields: &Fields,
2958) -> DFResult<ArrayRef> {
2959    use arrow_array::StructArray;
2960
2961    // Convert raw values, expanding Temporal values into Map representation
2962    // so the struct field extraction below works uniformly.
2963    let values: Vec<Option<Value>> = vids
2964        .iter()
2965        .map(|vid| {
2966            let val = get_property_value(vid, props_map, prop_name);
2967            match val {
2968                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2969                other => other,
2970            }
2971        })
2972        .collect();
2973
2974    let child_arrays: Vec<ArrayRef> = fields
2975        .iter()
2976        .map(|field| {
2977            let field_name = field.name();
2978            match field.data_type() {
2979                DataType::Float64 => {
2980                    let mut builder = Float64Builder::with_capacity(vids.len());
2981                    for val in &values {
2982                        match val {
2983                            Some(Value::Map(obj)) => {
2984                                match obj.get(field_name).and_then(|v| v.as_f64()) {
2985                                    Some(n) => builder.append_value(n),
2986                                    None => builder.append_null(),
2987                                }
2988                            }
2989                            _ => builder.append_null(),
2990                        }
2991                    }
2992                    Arc::new(builder.finish()) as ArrayRef
2993                }
2994                DataType::Utf8 => {
2995                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2996                    for val in &values {
2997                        match val {
2998                            Some(Value::Map(obj)) => match obj.get(field_name) {
2999                                Some(Value::String(s)) => builder.append_value(s),
3000                                Some(Value::Null) | None => builder.append_null(),
3001                                Some(other) => builder.append_value(format!("{other:?}")),
3002                            },
3003                            _ => builder.append_null(),
3004                        }
3005                    }
3006                    Arc::new(builder.finish()) as ArrayRef
3007                }
3008                DataType::Int64 => {
3009                    let mut builder = Int64Builder::with_capacity(vids.len());
3010                    for val in &values {
3011                        match val {
3012                            Some(Value::Map(obj)) => {
3013                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3014                                    Some(n) => builder.append_value(n),
3015                                    None => builder.append_null(),
3016                                }
3017                            }
3018                            _ => builder.append_null(),
3019                        }
3020                    }
3021                    Arc::new(builder.finish()) as ArrayRef
3022                }
3023                DataType::Timestamp(_, _) => {
3024                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3025                    for val in &values {
3026                        match val {
3027                            Some(Value::Map(obj)) => {
3028                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3029                                    Some(n) => builder.append_value(n),
3030                                    None => builder.append_null(),
3031                                }
3032                            }
3033                            _ => builder.append_null(),
3034                        }
3035                    }
3036                    Arc::new(builder.finish()) as ArrayRef
3037                }
3038                DataType::Int32 => {
3039                    let mut builder = Int32Builder::with_capacity(vids.len());
3040                    for val in &values {
3041                        match val {
3042                            Some(Value::Map(obj)) => {
3043                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3044                                    Some(n) => builder.append_value(n as i32),
3045                                    None => builder.append_null(),
3046                                }
3047                            }
3048                            _ => builder.append_null(),
3049                        }
3050                    }
3051                    Arc::new(builder.finish()) as ArrayRef
3052                }
3053                DataType::Time64(_) => {
3054                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3055                    for val in &values {
3056                        match val {
3057                            Some(Value::Map(obj)) => {
3058                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3059                                    Some(n) => builder.append_value(n),
3060                                    None => builder.append_null(),
3061                                }
3062                            }
3063                            _ => builder.append_null(),
3064                        }
3065                    }
3066                    Arc::new(builder.finish()) as ArrayRef
3067                }
3068                // Fallback: serialize as string
3069                _ => {
3070                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3071                    for val in &values {
3072                        match val {
3073                            Some(Value::Map(obj)) => match obj.get(field_name) {
3074                                Some(Value::Null) | None => builder.append_null(),
3075                                Some(other) => builder.append_value(format!("{other:?}")),
3076                            },
3077                            _ => builder.append_null(),
3078                        }
3079                    }
3080                    Arc::new(builder.finish()) as ArrayRef
3081                }
3082            }
3083        })
3084        .collect();
3085
3086    // Build null bitmap — null when the value is null/missing
3087    let nulls: Vec<bool> = values
3088        .iter()
3089        .map(|v| matches!(v, Some(Value::Map(_))))
3090        .collect();
3091
3092    let struct_array = StructArray::try_new(
3093        fields.clone(),
3094        child_arrays,
3095        Some(arrow::buffer::NullBuffer::from(nulls)),
3096    )
3097    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3098
3099    Ok(Arc::new(struct_array))
3100}
3101
3102impl Stream for GraphScanStream {
3103    type Item = DFResult<RecordBatch>;
3104
3105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3106        loop {
3107            // Use a temporary to avoid borrow issues
3108            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3109
3110            match state {
3111                GraphScanState::Init => {
3112                    // Create the future with cloned data for ownership
3113                    let graph_ctx = self.graph_ctx.clone();
3114                    let label = self.label.clone();
3115                    let variable = self.variable.clone();
3116                    let properties = self.properties.clone();
3117                    let is_edge_scan = self.is_edge_scan;
3118                    let is_schemaless = self.is_schemaless;
3119                    let schema = self.schema.clone();
3120
3121                    let fut = async move {
3122                        graph_ctx.check_timeout().map_err(|e| {
3123                            datafusion::error::DataFusionError::Execution(e.to_string())
3124                        })?;
3125
3126                        let batch = if is_edge_scan {
3127                            columnar_scan_edge_batch_static(
3128                                &graph_ctx,
3129                                &label,
3130                                &variable,
3131                                &properties,
3132                                &schema,
3133                            )
3134                            .await?
3135                        } else if is_schemaless {
3136                            columnar_scan_schemaless_vertex_batch_static(
3137                                &graph_ctx,
3138                                &label,
3139                                &variable,
3140                                &properties,
3141                                &schema,
3142                            )
3143                            .await?
3144                        } else {
3145                            columnar_scan_vertex_batch_static(
3146                                &graph_ctx,
3147                                &label,
3148                                &variable,
3149                                &properties,
3150                                &schema,
3151                            )
3152                            .await?
3153                        };
3154                        Ok(Some(batch))
3155                    };
3156
3157                    self.state = GraphScanState::Executing(Box::pin(fut));
3158                    // Continue loop to poll the future
3159                }
3160                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3161                    Poll::Ready(Ok(batch)) => {
3162                        self.state = GraphScanState::Done;
3163                        self.metrics
3164                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3165                        return Poll::Ready(batch.map(Ok));
3166                    }
3167                    Poll::Ready(Err(e)) => {
3168                        self.state = GraphScanState::Done;
3169                        return Poll::Ready(Some(Err(e)));
3170                    }
3171                    Poll::Pending => {
3172                        self.state = GraphScanState::Executing(fut);
3173                        return Poll::Pending;
3174                    }
3175                },
3176                GraphScanState::Done => {
3177                    return Poll::Ready(None);
3178                }
3179            }
3180        }
3181    }
3182}
3183
3184impl RecordBatchStream for GraphScanStream {
3185    fn schema(&self) -> SchemaRef {
3186        self.schema.clone()
3187    }
3188}
3189
3190#[cfg(test)]
3191mod tests {
3192    use super::*;
3193
3194    #[test]
3195    fn test_build_vertex_schema() {
3196        let uni_schema = UniSchema::default();
3197        let schema = GraphScanExec::build_vertex_schema(
3198            "n",
3199            "Person",
3200            &["name".to_string(), "age".to_string()],
3201            &uni_schema,
3202        );
3203
3204        assert_eq!(schema.fields().len(), 4);
3205        assert_eq!(schema.field(0).name(), "n._vid");
3206        assert_eq!(schema.field(1).name(), "n._labels");
3207        assert_eq!(schema.field(2).name(), "n.name");
3208        assert_eq!(schema.field(3).name(), "n.age");
3209    }
3210
3211    #[test]
3212    fn test_build_edge_schema() {
3213        let uni_schema = UniSchema::default();
3214        let schema =
3215            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3216
3217        assert_eq!(schema.fields().len(), 4);
3218        assert_eq!(schema.field(0).name(), "r._eid");
3219        assert_eq!(schema.field(1).name(), "r._src_vid");
3220        assert_eq!(schema.field(2).name(), "r._dst_vid");
3221        assert_eq!(schema.field(3).name(), "r.weight");
3222    }
3223
3224    #[test]
3225    fn test_build_schemaless_vertex_schema() {
3226        let schema = GraphScanExec::build_schemaless_vertex_schema(
3227            "n",
3228            &["name".to_string(), "age".to_string()],
3229        );
3230
3231        assert_eq!(schema.fields().len(), 4);
3232        assert_eq!(schema.field(0).name(), "n._vid");
3233        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3234        assert_eq!(schema.field(1).name(), "n._labels");
3235        assert_eq!(schema.field(2).name(), "n.name");
3236        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3237        assert_eq!(schema.field(3).name(), "n.age");
3238        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3239    }
3240
3241    #[test]
3242    fn test_schemaless_all_scan_has_empty_label() {
3243        // This test verifies that new_schemaless_all_scan creates a scan with empty label
3244        // We can't fully test execution without a GraphExecutionContext, but we can verify
3245        // the constructor sets the empty label correctly by checking the struct internals
3246        // This is a structural test to ensure the "empty label signals scan all" pattern works
3247        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[]);
3248
3249        // Verify the schema has _vid and _labels columns for a scan with no properties
3250        assert_eq!(schema.fields().len(), 2);
3251        assert_eq!(schema.field(0).name(), "n._vid");
3252        assert_eq!(schema.field(1).name(), "n._labels");
3253    }
3254
3255    #[test]
3256    fn test_cypher_value_all_props_extraction() {
3257        // Simulate _all_props encoding using encode_cypher_value helper
3258        let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3259        let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3260
3261        // Decode and extract "age" value
3262        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3263        match decoded {
3264            uni_common::Value::Map(map) => {
3265                let age_val = map.get("age").unwrap();
3266                assert_eq!(age_val, &uni_common::Value::Int(30));
3267            }
3268            _ => panic!("Expected Map"),
3269        }
3270
3271        // Also test single value encoding
3272        let single_val = serde_json::json!(30);
3273        let single_bytes = encode_cypher_value(&single_val).unwrap();
3274        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3275        assert_eq!(single_decoded, uni_common::Value::Int(30));
3276    }
3277
3278    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3279    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3280        let schema = Arc::new(Schema::new(vec![
3281            Field::new("_vid", DataType::UInt64, false),
3282            Field::new("_deleted", DataType::Boolean, false),
3283            Field::new("_version", DataType::UInt64, false),
3284            Field::new("name", DataType::Utf8, true),
3285        ]));
3286        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3287        let names: Vec<String> = vids
3288            .iter()
3289            .zip(versions.iter())
3290            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3291            .collect();
3292        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3293
3294        RecordBatch::try_new(
3295            schema,
3296            vec![
3297                Arc::new(UInt64Array::from(vids.to_vec())),
3298                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3299                Arc::new(UInt64Array::from(versions.to_vec())),
3300                Arc::new(name_arr),
3301            ],
3302        )
3303        .unwrap()
3304    }
3305
3306    #[test]
3307    fn test_mvcc_dedup_multiple_versions() {
3308        // VID 1 at versions 3, 1, 5 — should keep version 5
3309        // VID 2 at versions 2, 4 — should keep version 4
3310        let batch = make_mvcc_batch(
3311            &[1, 1, 1, 2, 2],
3312            &[3, 1, 5, 2, 4],
3313            &[false, false, false, false, false],
3314        );
3315
3316        let result = mvcc_dedup_batch(&batch).unwrap();
3317        assert_eq!(result.num_rows(), 2);
3318
3319        let vid_col = result
3320            .column_by_name("_vid")
3321            .unwrap()
3322            .as_any()
3323            .downcast_ref::<UInt64Array>()
3324            .unwrap();
3325        let ver_col = result
3326            .column_by_name("_version")
3327            .unwrap()
3328            .as_any()
3329            .downcast_ref::<UInt64Array>()
3330            .unwrap();
3331        let name_col = result
3332            .column_by_name("name")
3333            .unwrap()
3334            .as_any()
3335            .downcast_ref::<arrow_array::StringArray>()
3336            .unwrap();
3337
3338        // VID 1 → version 5, VID 2 → version 4
3339        assert_eq!(vid_col.value(0), 1);
3340        assert_eq!(ver_col.value(0), 5);
3341        assert_eq!(name_col.value(0), "v1_ver5");
3342
3343        assert_eq!(vid_col.value(1), 2);
3344        assert_eq!(ver_col.value(1), 4);
3345        assert_eq!(name_col.value(1), "v2_ver4");
3346    }
3347
3348    #[test]
3349    fn test_mvcc_dedup_single_rows() {
3350        // Each VID appears once — nothing should change
3351        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3352        let result = mvcc_dedup_batch(&batch).unwrap();
3353        assert_eq!(result.num_rows(), 3);
3354    }
3355
3356    #[test]
3357    fn test_mvcc_dedup_empty() {
3358        let batch = make_mvcc_batch(&[], &[], &[]);
3359        let result = mvcc_dedup_batch(&batch).unwrap();
3360        assert_eq!(result.num_rows(), 0);
3361    }
3362
3363    #[test]
3364    fn test_filter_l0_tombstones_removes_tombstoned() {
3365        use crate::query::df_graph::L0Context;
3366
3367        // Create a batch with VIDs 1, 2, 3
3368        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3369
3370        // Create L0 context with VID 2 tombstoned
3371        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3372        {
3373            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
3374            // But we can't easily create one with tombstones through the constructor.
3375            // Use a direct approach.
3376        }
3377        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3378        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3379
3380        let l0_ctx = L0Context {
3381            current_l0: Some(l0_buf),
3382            transaction_l0: None,
3383            pending_flush_l0s: vec![],
3384        };
3385
3386        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3387        assert_eq!(result.num_rows(), 2);
3388
3389        let vid_col = result
3390            .column_by_name("_vid")
3391            .unwrap()
3392            .as_any()
3393            .downcast_ref::<UInt64Array>()
3394            .unwrap();
3395        assert_eq!(vid_col.value(0), 1);
3396        assert_eq!(vid_col.value(1), 3);
3397    }
3398
3399    #[test]
3400    fn test_filter_l0_tombstones_none() {
3401        use crate::query::df_graph::L0Context;
3402
3403        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3404        let l0_ctx = L0Context::default();
3405
3406        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3407        assert_eq!(result.num_rows(), 3);
3408    }
3409
3410    #[test]
3411    fn test_map_to_output_schema_basic() {
3412        use crate::query::df_graph::L0Context;
3413
3414        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
3415        let lance_schema = Arc::new(Schema::new(vec![
3416            Field::new("_vid", DataType::UInt64, false),
3417            Field::new("_deleted", DataType::Boolean, false),
3418            Field::new("_version", DataType::UInt64, false),
3419            Field::new("name", DataType::Utf8, true),
3420        ]));
3421        let name_arr: arrow_array::StringArray =
3422            vec![Some("Alice"), Some("Bob")].into_iter().collect();
3423        let batch = RecordBatch::try_new(
3424            lance_schema,
3425            vec![
3426                Arc::new(UInt64Array::from(vec![1u64, 2])),
3427                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3428                Arc::new(UInt64Array::from(vec![1u64, 1])),
3429                Arc::new(name_arr),
3430            ],
3431        )
3432        .unwrap();
3433
3434        // Output schema: n._vid, n._labels, n.name
3435        let output_schema = Arc::new(Schema::new(vec![
3436            Field::new("n._vid", DataType::UInt64, false),
3437            Field::new("n._labels", labels_data_type(), true),
3438            Field::new("n.name", DataType::Utf8, true),
3439        ]));
3440
3441        let l0_ctx = L0Context::default();
3442        let result = map_to_output_schema(
3443            &batch,
3444            "Person",
3445            "n",
3446            &["name".to_string()],
3447            &output_schema,
3448            &l0_ctx,
3449        )
3450        .unwrap();
3451
3452        assert_eq!(result.num_rows(), 2);
3453        assert_eq!(result.schema().fields().len(), 3);
3454        assert_eq!(result.schema().field(0).name(), "n._vid");
3455        assert_eq!(result.schema().field(1).name(), "n._labels");
3456        assert_eq!(result.schema().field(2).name(), "n.name");
3457
3458        // Check name values carried through
3459        let name_col = result
3460            .column(2)
3461            .as_any()
3462            .downcast_ref::<arrow_array::StringArray>()
3463            .unwrap();
3464        assert_eq!(name_col.value(0), "Alice");
3465        assert_eq!(name_col.value(1), "Bob");
3466    }
3467}