Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down (used for L0 short-circuit and
89    /// single-VID Lance pushdown). For multi-VID IN-list pushdown, use
90    /// `vid_list_filter` — see issue #55 PR #4.
91    filter: Option<Arc<dyn PhysicalExpr>>,
92
93    /// Multi-VID IN-list filter to push to Lance as `_vid IN (v1, v2, ...)`.
94    /// Set when the planner has resolved a static set of vids from an
95    /// `Expr::In { Property(_, "_vid"), List }` predicate. Bypasses the
96    /// PhysicalExpr roundtrip used by `filter`. See issue #55 PR #4.
97    vid_list_filter: Option<Vec<u64>>,
98
99    /// Pre-rendered Lance filter string for indexed-property pushdown
100    /// (e.g. `name = 'foo'`). AND-combined with the VID filter at scan time.
101    /// Populated by the planner when an indexed-property equality / IN
102    /// predicate is detected — Lance turns it into a hash-index lookup.
103    /// See issue #57.
104    extra_lance_filter: Option<String>,
105
106    /// Arrow-side equivalent of `extra_lance_filter`, applied to the merged
107    /// (Lance + L0) batch in-process so the scan output reflects only
108    /// matching rows even when data is still in L0 (Lance pushdown alone
109    /// can't reach uncommitted/unflushed rows). See issue #57.
110    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112    /// Whether this is an edge scan (vs vertex scan).
113    is_edge_scan: bool,
114
115    /// Whether this is a schemaless scan (uses main table instead of per-label table).
116    is_schemaless: bool,
117
118    /// Output schema with materialized property columns.
119    schema: SchemaRef,
120
121    /// Cached plan properties.
122    properties: Arc<PlanProperties>,
123
124    /// Metrics for execution tracking.
125    metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("GraphScanExec")
131            .field("label", &self.label)
132            .field("variable", &self.variable)
133            .field("projected_properties", &self.projected_properties)
134            .field(
135                "vid_list_filter_len",
136                &self.vid_list_filter.as_ref().map(Vec::len),
137            )
138            .field("is_edge_scan", &self.is_edge_scan)
139            .finish()
140    }
141}
142
143impl GraphScanExec {
144    /// Attach a multi-VID IN-list filter to this scan, pushing
145    /// `_vid IN (v1, v2, ...)` to Lance at execute time. Use this for
146    /// pre-resolved vid sets (e.g. from `UNWIND $list AS e WHERE id(x)=e.field`).
147    /// See issue #55 PR #4.
148    pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149        self.vid_list_filter = Some(vids);
150        self
151    }
152
153    /// Attach a pre-rendered Lance filter string for indexed-property
154    /// pushdown. AND-combined with any VID filter at scan time. See
155    /// issue #57.
156    pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157        self.extra_lance_filter = Some(filter);
158        self
159    }
160
161    /// Attach the Arrow-side counterpart of `extra_lance_filter`. Applied to
162    /// the merged (Lance + L0) batch so the scan output is index-bound even
163    /// for not-yet-flushed L0 rows. See issue #57.
164    pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165        self.extra_runtime_filter = Some(filter);
166        self
167    }
168
169    /// Whether this scan has an extra Lance filter pushed in. Used by the
170    /// EXPLAIN/IndexUsage path to confirm the planner actually pushed.
171    pub fn has_extra_lance_filter(&self) -> bool {
172        self.extra_lance_filter.is_some()
173    }
174
175    /// Execute this scan once with a runtime-supplied list of VIDs as the
176    /// pushdown filter (`_vid IN (v1, v2, ...)`). Returns a single merged
177    /// `RecordBatch`. Used by `VidLookupJoinExec` (issue #55 PR #5) for
178    /// cross-MATCH dynamic pushdown — the build side materializes its keys at
179    /// runtime, then the probe scan runs once with those keys.
180    ///
181    /// Only supported for vertex and schemaless-vertex scans; edge scans
182    /// have a different shape and aren't currently a join target for this
183    /// optimization.
184    pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185        if self.is_edge_scan {
186            return Err(datafusion::error::DataFusionError::Plan(
187                "execute_with_vid_filter not supported for edge scans".into(),
188            ));
189        }
190        if self.is_schemaless {
191            columnar_scan_schemaless_vertex_batch_static(
192                &self.graph_ctx,
193                &self.label,
194                &self.variable,
195                &self.projected_properties,
196                &self.schema,
197                &self.filter,
198                Some(vids),
199                self.extra_lance_filter.as_deref(),
200                self.extra_runtime_filter.as_ref(),
201            )
202            .await
203        } else {
204            columnar_scan_vertex_batch_static(
205                &self.graph_ctx,
206                &self.label,
207                &self.variable,
208                &self.projected_properties,
209                &self.schema,
210                &self.filter,
211                Some(vids),
212                self.extra_lance_filter.as_deref(),
213                self.extra_runtime_filter.as_ref(),
214            )
215            .await
216        }
217    }
218}
219
220impl GraphScanExec {
221    /// Create a new graph scan for vertices.
222    ///
223    /// Scans all vertices of the given label from storage and L0 buffers,
224    /// then materializes the requested properties.
225    pub fn new_vertex_scan(
226        graph_ctx: Arc<GraphExecutionContext>,
227        label: impl Into<String>,
228        variable: impl Into<String>,
229        projected_properties: Vec<String>,
230        filter: Option<Arc<dyn PhysicalExpr>>,
231    ) -> Self {
232        let label = label.into();
233        let variable = variable.into();
234
235        // Build output schema with proper types from Uni schema
236        let uni_schema = graph_ctx.storage().schema_manager().schema();
237        let schema =
238            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240        let properties = compute_plan_properties(schema.clone());
241
242        Self {
243            graph_ctx,
244            label,
245            variable,
246            projected_properties,
247            filter,
248            vid_list_filter: None,
249            extra_lance_filter: None,
250            extra_runtime_filter: None,
251            is_edge_scan: false,
252            is_schemaless: false,
253            schema,
254            properties,
255            metrics: ExecutionPlanMetricsSet::new(),
256        }
257    }
258
259    /// Create a new schemaless vertex scan.
260    ///
261    /// Scans the main vertices table for vertices with the given label name.
262    /// Properties are extracted from props_json (all treated as Utf8/JSON).
263    /// This is used for labels that aren't in the schema.
264    pub fn new_schemaless_vertex_scan(
265        graph_ctx: Arc<GraphExecutionContext>,
266        label_name: impl Into<String>,
267        variable: impl Into<String>,
268        projected_properties: Vec<String>,
269        filter: Option<Arc<dyn PhysicalExpr>>,
270    ) -> Self {
271        let label = label_name.into();
272        let variable = variable.into();
273
274        // Filter out system columns that are already materialized as dedicated columns
275        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
276        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
277        // they would create duplicate columns with conflicting types.
278        let projected_properties: Vec<String> = projected_properties
279            .into_iter()
280            .filter(|p| p != "_vid" && p != "_labels")
281            .collect();
282
283        let uni_schema = graph_ctx.storage().schema_manager().schema();
284        let schema =
285            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286        let properties = compute_plan_properties(schema.clone());
287
288        Self {
289            graph_ctx,
290            label,
291            variable,
292            projected_properties,
293            filter,
294            vid_list_filter: None,
295            extra_lance_filter: None,
296            extra_runtime_filter: None,
297            is_edge_scan: false,
298            is_schemaless: true,
299            schema,
300            properties,
301            metrics: ExecutionPlanMetricsSet::new(),
302        }
303    }
304
305    /// Create a new multi-label vertex scan using the main vertices table.
306    ///
307    /// Scans for vertices that have ALL specified labels (intersection semantics).
308    /// Properties are extracted from props_json (schemaless).
309    pub fn new_multi_label_vertex_scan(
310        graph_ctx: Arc<GraphExecutionContext>,
311        labels: Vec<String>,
312        variable: impl Into<String>,
313        projected_properties: Vec<String>,
314        filter: Option<Arc<dyn PhysicalExpr>>,
315    ) -> Self {
316        let variable = variable.into();
317        let projected_properties: Vec<String> = projected_properties
318            .into_iter()
319            .filter(|p| p != "_vid" && p != "_labels")
320            .collect();
321        let uni_schema = graph_ctx.storage().schema_manager().schema();
322        let schema =
323            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324        let properties = compute_plan_properties(schema.clone());
325
326        // Encode labels as colon-separated for the stream to parse
327        let encoded_labels = labels.join(":");
328
329        Self {
330            graph_ctx,
331            label: encoded_labels,
332            variable,
333            projected_properties,
334            filter,
335            vid_list_filter: None,
336            extra_lance_filter: None,
337            extra_runtime_filter: None,
338            is_edge_scan: false,
339            is_schemaless: true,
340            schema,
341            properties,
342            metrics: ExecutionPlanMetricsSet::new(),
343        }
344    }
345
346    /// Create a new schemaless scan for all vertices.
347    ///
348    /// Scans the main vertices table for all vertices regardless of label.
349    /// Properties are extracted from props_json with types resolved from the schema.
350    /// This is used for `MATCH (n)` without label filter.
351    pub fn new_schemaless_all_scan(
352        graph_ctx: Arc<GraphExecutionContext>,
353        variable: impl Into<String>,
354        projected_properties: Vec<String>,
355        filter: Option<Arc<dyn PhysicalExpr>>,
356    ) -> Self {
357        let variable = variable.into();
358        let projected_properties: Vec<String> = projected_properties
359            .into_iter()
360            .filter(|p| p != "_vid" && p != "_labels")
361            .collect();
362
363        let uni_schema = graph_ctx.storage().schema_manager().schema();
364        let schema =
365            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366        let properties = compute_plan_properties(schema.clone());
367
368        Self {
369            graph_ctx,
370            label: String::new(), // Empty label signals "scan all vertices"
371            variable,
372            projected_properties,
373            filter,
374            vid_list_filter: None,
375            extra_lance_filter: None,
376            extra_runtime_filter: None,
377            is_edge_scan: false,
378            is_schemaless: true,
379            schema,
380            properties,
381            metrics: ExecutionPlanMetricsSet::new(),
382        }
383    }
384
385    /// Build schema for schemaless vertex scan.
386    ///
387    /// Resolves property types from all labels in the schema. Falls back to
388    /// LargeBinary (CypherValue encoding) for properties not found in any
389    /// label's schema.
390    fn build_schemaless_vertex_schema(
391        variable: &str,
392        properties: &[String],
393        uni_schema: &uni_common::core::schema::Schema,
394    ) -> SchemaRef {
395        // Merge property metadata from all labels for type resolution.
396        let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397            std::collections::HashMap::new();
398        for label_props in uni_schema.properties.values() {
399            for (name, meta) in label_props {
400                merged.entry(name.as_str()).or_insert(meta);
401            }
402        }
403
404        let mut fields = vec![
405            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406            Field::new(format!("{}._labels", variable), labels_data_type(), true),
407        ];
408
409        for prop in properties {
410            let col_name = format!("{}.{}", variable, prop);
411            let arrow_type = merged
412                .get(prop.as_str())
413                .map(|meta| meta.r#type.to_arrow())
414                .unwrap_or(DataType::LargeBinary);
415            fields.push(Field::new(&col_name, arrow_type, true));
416        }
417
418        Arc::new(Schema::new(fields))
419    }
420
421    /// Create a new graph scan for edges.
422    ///
423    /// Scans all edges of the given type from storage and L0 buffers,
424    /// then materializes the requested properties.
425    pub fn new_edge_scan(
426        graph_ctx: Arc<GraphExecutionContext>,
427        edge_type: impl Into<String>,
428        variable: impl Into<String>,
429        projected_properties: Vec<String>,
430        filter: Option<Arc<dyn PhysicalExpr>>,
431    ) -> Self {
432        let label = edge_type.into();
433        let variable = variable.into();
434
435        // Build output schema with proper types from Uni schema
436        let uni_schema = graph_ctx.storage().schema_manager().schema();
437        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439        let properties = compute_plan_properties(schema.clone());
440
441        Self {
442            graph_ctx,
443            label,
444            variable,
445            projected_properties,
446            filter,
447            vid_list_filter: None,
448            extra_lance_filter: None,
449            extra_runtime_filter: None,
450            is_edge_scan: true,
451            is_schemaless: false,
452            schema,
453            properties,
454            metrics: ExecutionPlanMetricsSet::new(),
455        }
456    }
457
458    /// Build output schema for vertex scan with proper Arrow types.
459    fn build_vertex_schema(
460        variable: &str,
461        label: &str,
462        properties: &[String],
463        uni_schema: &UniSchema,
464    ) -> SchemaRef {
465        let mut fields = vec![
466            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467            Field::new(format!("{}._labels", variable), labels_data_type(), true),
468        ];
469        let label_props = uni_schema.properties.get(label);
470        for prop in properties {
471            let col_name = format!("{}.{}", variable, prop);
472            let arrow_type = resolve_property_type(prop, label_props);
473            fields.push(Field::new(&col_name, arrow_type, true));
474        }
475        Arc::new(Schema::new(fields))
476    }
477
478    /// Build output schema for edge scan with proper Arrow types.
479    fn build_edge_schema(
480        variable: &str,
481        edge_type: &str,
482        properties: &[String],
483        uni_schema: &UniSchema,
484    ) -> SchemaRef {
485        let mut fields = vec![
486            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
487            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
488            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
489        ];
490        let edge_props = uni_schema.properties.get(edge_type);
491        for prop in properties {
492            let col_name = format!("{}.{}", variable, prop);
493            let arrow_type = resolve_property_type(prop, edge_props);
494            fields.push(Field::new(&col_name, arrow_type, true));
495        }
496        Arc::new(Schema::new(fields))
497    }
498}
499
500impl DisplayAs for GraphScanExec {
501    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
503        write!(
504            f,
505            "GraphScanExec: {}={}, properties={:?}",
506            scan_type, self.label, self.projected_properties
507        )?;
508        if self.filter.is_some() {
509            write!(f, ", filter=<pushed>")?;
510        }
511        Ok(())
512    }
513}
514
515impl ExecutionPlan for GraphScanExec {
516    fn name(&self) -> &str {
517        "GraphScanExec"
518    }
519
520    fn as_any(&self) -> &dyn Any {
521        self
522    }
523
524    fn schema(&self) -> SchemaRef {
525        self.schema.clone()
526    }
527
528    fn properties(&self) -> &Arc<PlanProperties> {
529        &self.properties
530    }
531
532    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
533        vec![]
534    }
535
536    fn with_new_children(
537        self: Arc<Self>,
538        children: Vec<Arc<dyn ExecutionPlan>>,
539    ) -> DFResult<Arc<dyn ExecutionPlan>> {
540        if children.is_empty() {
541            Ok(self)
542        } else {
543            Err(datafusion::error::DataFusionError::Plan(
544                "GraphScanExec does not accept children".to_string(),
545            ))
546        }
547    }
548
549    fn execute(
550        &self,
551        partition: usize,
552        _context: Arc<TaskContext>,
553    ) -> DFResult<SendableRecordBatchStream> {
554        let metrics = BaselineMetrics::new(&self.metrics, partition);
555
556        Ok(Box::pin(GraphScanStream::new(
557            self.graph_ctx.clone(),
558            self.label.clone(),
559            self.variable.clone(),
560            self.projected_properties.clone(),
561            self.is_edge_scan,
562            self.is_schemaless,
563            self.filter.clone(),
564            self.vid_list_filter.clone(),
565            self.extra_lance_filter.clone(),
566            self.extra_runtime_filter.clone(),
567            self.schema.clone(),
568            metrics,
569        )))
570    }
571
572    fn metrics(&self) -> Option<MetricsSet> {
573        Some(self.metrics.clone_inner())
574    }
575}
576
577/// State machine for graph scan stream execution.
578enum GraphScanState {
579    /// Initial state, ready to start scanning.
580    Init,
581    /// Executing the async scan.
582    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
583    /// Stream is done.
584    Done,
585}
586
587/// Stream that scans vertices or edges and materializes properties.
588///
589/// For known-label vertex scans, uses a single columnar Lance query with
590/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
591/// to the two-phase VID-scan + property-materialize flow.
592struct GraphScanStream {
593    /// Graph execution context.
594    graph_ctx: Arc<GraphExecutionContext>,
595
596    /// Label (vertex) or edge type name.
597    label: String,
598
599    /// Variable name for column prefixing (e.g., "n" in `n.name`).
600    variable: String,
601
602    /// Properties to materialize.
603    properties: Vec<String>,
604
605    /// Whether this is an edge scan.
606    is_edge_scan: bool,
607
608    /// Whether this is a schemaless scan.
609    is_schemaless: bool,
610
611    /// Pushed-down filter expression (used for VID short-circuit in L0 scans).
612    filter: Option<Arc<dyn PhysicalExpr>>,
613
614    /// Multi-VID IN-list filter for Lance pushdown. See issue #55 PR #4.
615    vid_list_filter: Option<Vec<u64>>,
616
617    /// Extra Lance filter string (e.g. `name = 'foo'`) for indexed-property
618    /// pushdown. See issue #57.
619    extra_lance_filter: Option<String>,
620
621    /// Arrow-side equivalent of `extra_lance_filter`. See issue #57.
622    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
623
624    /// Output schema.
625    schema: SchemaRef,
626
627    /// Stream state.
628    state: GraphScanState,
629
630    /// Metrics.
631    metrics: BaselineMetrics,
632}
633
634impl GraphScanStream {
635    /// Create a new graph scan stream.
636    #[expect(clippy::too_many_arguments)]
637    fn new(
638        graph_ctx: Arc<GraphExecutionContext>,
639        label: String,
640        variable: String,
641        properties: Vec<String>,
642        is_edge_scan: bool,
643        is_schemaless: bool,
644        filter: Option<Arc<dyn PhysicalExpr>>,
645        vid_list_filter: Option<Vec<u64>>,
646        extra_lance_filter: Option<String>,
647        extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
648        schema: SchemaRef,
649        metrics: BaselineMetrics,
650    ) -> Self {
651        Self {
652            graph_ctx,
653            label,
654            variable,
655            properties,
656            is_edge_scan,
657            is_schemaless,
658            filter,
659            vid_list_filter,
660            extra_lance_filter,
661            extra_runtime_filter,
662            schema,
663            state: GraphScanState::Init,
664            metrics,
665        }
666    }
667}
668
669/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
670///
671/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
672/// preserving original value types for overflow/unknown properties.
673pub(crate) fn resolve_property_type(
674    prop: &str,
675    schema_props: Option<
676        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
677    >,
678) -> DataType {
679    if prop == "overflow_json" {
680        DataType::LargeBinary
681    } else if prop == "_created_at" || prop == "_updated_at" {
682        // System-managed timestamps surfaced via `created_at(n)` /
683        // `updated_at(n)`. Stored on every vertex/edge by the L0 buffer
684        // and the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
685        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
686    } else {
687        schema_props
688            .and_then(|props| props.get(prop))
689            .map(|meta| meta.r#type.to_arrow())
690            .unwrap_or(DataType::LargeBinary)
691    }
692}
693
694// ============================================================================
695// Columnar-first scan helpers
696// ============================================================================
697
698/// MVCC deduplication: keep only the highest-version row for each `_vid`.
699///
700/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
701/// _vid (= the highest version). This is a pure Arrow-compute operation.
702#[cfg(test)]
703fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
704    mvcc_dedup_batch_by(batch, "_vid")
705}
706
707/// Dedup a Lance batch and return `Some` only when rows remain.
708///
709/// Wraps the common pattern of dedup + empty-check that appears in every
710/// columnar scan path (vertex, edge, schemaless).
711fn mvcc_dedup_to_option(
712    batch: Option<RecordBatch>,
713    id_column: &str,
714) -> DFResult<Option<RecordBatch>> {
715    match batch {
716        Some(b) => {
717            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
718            Ok(if deduped.num_rows() > 0 {
719                Some(deduped)
720            } else {
721                None
722            })
723        }
724        None => Ok(None),
725    }
726}
727
728/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
729/// result. Returns an empty batch (against `output_schema`) when both inputs
730/// are empty.
731fn merge_lance_and_l0(
732    lance_deduped: Option<RecordBatch>,
733    l0_batch: RecordBatch,
734    internal_schema: &SchemaRef,
735    id_column: &str,
736) -> DFResult<Option<RecordBatch>> {
737    let has_l0 = l0_batch.num_rows() > 0;
738    match (lance_deduped, has_l0) {
739        (Some(lance), true) => {
740            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
741                .map_err(arrow_err)?;
742            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
743        }
744        (Some(lance), false) => Ok(Some(lance)),
745        (None, true) => Ok(Some(l0_batch)),
746        (None, false) => Ok(None),
747    }
748}
749
750/// Push `col_name` into `columns` if not already present.
751///
752/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
753/// that creates a temporary `String` allocation on every check.
754fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
755    if !columns.iter().any(|c| c == col_name) {
756        columns.push(col_name.to_string());
757    }
758}
759
760/// Extract a property value from an overflow_json CypherValue blob.
761///
762/// Returns the raw CypherValue bytes for `prop` if found in the blob,
763/// or `None` if the blob is null or the key is absent.
764fn extract_from_overflow_blob(
765    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
766    row: usize,
767    prop: &str,
768) -> Option<Vec<u8>> {
769    let arr = overflow_arr?;
770    if arr.is_null(row) {
771        return None;
772    }
773    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
774}
775
776/// Build a `LargeBinary` column by extracting a property from overflow_json
777/// blobs, with L0 buffer overlay.
778///
779/// For each row, checks L0 buffers first (later buffers take precedence).
780/// If the property is not in L0, falls back to extracting from the
781/// overflow_json CypherValue blob.
782fn build_overflow_property_column(
783    num_rows: usize,
784    vid_arr: &UInt64Array,
785    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
786    prop: &str,
787    l0_ctx: &crate::query::df_graph::L0Context,
788) -> ArrayRef {
789    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
790    for i in 0..num_rows {
791        let vid = Vid::from(vid_arr.value(i));
792
793        // Check L0 buffers (later overwrites earlier)
794        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
795
796        if let Some(val_opt) = l0_val {
797            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
798        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
799            builder.append_value(&bytes);
800        } else {
801            builder.append_null();
802        }
803    }
804    Arc::new(builder.finish())
805}
806
807/// Resolve a property value from the L0 visibility chain.
808///
809/// Returns `Some(Some(val))` when the property exists with a non-null value,
810/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
811/// has the property.
812fn resolve_l0_property(
813    vid: &Vid,
814    prop: &str,
815    l0_ctx: &crate::query::df_graph::L0Context,
816) -> Option<Option<Value>> {
817    let mut result = None;
818    for l0 in l0_ctx.iter_l0_buffers() {
819        let guard = l0.read();
820        if let Some(props) = guard.vertex_properties.get(vid)
821            && let Some(val) = props.get(prop)
822        {
823            result = Some(Some(val.clone()));
824        }
825    }
826    result
827}
828
829/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
830///
831/// Non-null values are JSON-encoded then CypherValue-encoded.
832/// Null values (or encoding failures) produce null entries.
833fn append_value_as_cypher_binary(
834    builder: &mut arrow_array::builder::LargeBinaryBuilder,
835    val: Option<&Value>,
836) {
837    match val {
838        Some(v) if !v.is_null() => {
839            let json_val: serde_json::Value = v.clone().into();
840            match encode_cypher_value(&json_val) {
841                Ok(bytes) => builder.append_value(bytes),
842                Err(_) => builder.append_null(),
843            }
844        }
845        _ => builder.append_null(),
846    }
847}
848
849/// Build the `_all_props` column by overlaying L0 buffer properties onto
850/// the batch's `props_json` column.
851///
852/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
853/// properties (in visibility order: pending → current → transaction), and
854/// re-encodes the result. This ensures `properties()` and `keys()` reflect
855/// uncommitted L0 mutations.
856fn build_all_props_column_with_l0_overlay(
857    num_rows: usize,
858    vid_arr: &UInt64Array,
859    props_arr: Option<&arrow_array::LargeBinaryArray>,
860    l0_ctx: &crate::query::df_graph::L0Context,
861) -> ArrayRef {
862    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
863    for i in 0..num_rows {
864        let vid = Vid::from(vid_arr.value(i));
865
866        // 1. Decode props_json blob from storage
867        let mut merged_props = serde_json::Map::new();
868        if let Some(arr) = props_arr
869            && !arr.is_null(i)
870            && let Ok(uni_common::Value::Map(map)) =
871                uni_common::cypher_value_codec::decode(arr.value(i))
872        {
873            for (k, v) in map {
874                let json_val: serde_json::Value = v.into();
875                merged_props.insert(k, json_val);
876            }
877        }
878
879        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
880        for l0 in l0_ctx.iter_l0_buffers() {
881            let guard = l0.read();
882            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
883                for (k, v) in l0_props {
884                    let json_val: serde_json::Value = v.clone().into();
885                    merged_props.insert(k.clone(), json_val);
886                }
887            }
888        }
889
890        // 3. Encode merged result
891        if merged_props.is_empty() {
892            builder.append_null();
893        } else {
894            let json_obj = serde_json::Value::Object(merged_props);
895            match encode_cypher_value(&json_obj) {
896                Ok(bytes) => builder.append_value(bytes),
897                Err(_) => builder.append_null(),
898            }
899        }
900    }
901    Arc::new(builder.finish())
902}
903
904/// Build `_all_props` for a schema-based scan by merging:
905/// 1. Schema-defined columns from the batch
906/// 2. Overflow_json properties
907/// 3. L0 buffer properties
908fn build_all_props_column_for_schema_scan(
909    batch: &RecordBatch,
910    vid_arr: &UInt64Array,
911    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
912    projected_properties: &[String],
913    l0_ctx: &crate::query::df_graph::L0Context,
914) -> ArrayRef {
915    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
916    let schema_props: Vec<&str> = projected_properties
917        .iter()
918        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
919        .map(String::as_str)
920        .collect();
921
922    let num_rows = batch.num_rows();
923    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
924    for i in 0..num_rows {
925        let vid = Vid::from(vid_arr.value(i));
926        let mut merged_props = serde_json::Map::new();
927
928        // 1. Schema-defined columns
929        for &prop in &schema_props {
930            if let Some(col) = batch.column_by_name(prop) {
931                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
932                if !val.is_null() {
933                    let json_val: serde_json::Value = val.into();
934                    merged_props.insert(prop.to_string(), json_val);
935                }
936            }
937        }
938
939        // 2. Overflow_json properties
940        if let Some(arr) = overflow_arr
941            && !arr.is_null(i)
942            && let Ok(uni_common::Value::Map(map)) =
943                uni_common::cypher_value_codec::decode(arr.value(i))
944        {
945            for (k, v) in map {
946                let json_val: serde_json::Value = v.into();
947                merged_props.insert(k, json_val);
948            }
949        }
950
951        // 3. L0 buffer overlay (pending → current → transaction)
952        for l0 in l0_ctx.iter_l0_buffers() {
953            let guard = l0.read();
954            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
955                for (k, v) in l0_props {
956                    let json_val: serde_json::Value = v.clone().into();
957                    merged_props.insert(k.clone(), json_val);
958                }
959            }
960        }
961
962        if merged_props.is_empty() {
963            builder.append_null();
964        } else {
965            let json_obj = serde_json::Value::Object(merged_props);
966            match encode_cypher_value(&json_obj) {
967                Ok(bytes) => builder.append_value(bytes),
968                Err(_) => builder.append_null(),
969            }
970        }
971    }
972    Arc::new(builder.finish())
973}
974
975/// MVCC deduplication: keep only the highest-version row for each unique value
976/// in the given `id_column`.
977///
978/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
979/// each id (= the highest version). This is a pure Arrow-compute operation.
980fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
981    if batch.num_rows() == 0 {
982        return Ok(batch.clone());
983    }
984
985    let id_col = batch
986        .column_by_name(id_column)
987        .ok_or_else(|| {
988            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
989        })?
990        .clone();
991    let version_col = batch
992        .column_by_name("_version")
993        .ok_or_else(|| {
994            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
995        })?
996        .clone();
997
998    // Sort by (id_column ASC, _version DESC)
999    let sort_columns = vec![
1000        arrow::compute::SortColumn {
1001            values: id_col,
1002            options: Some(arrow::compute::SortOptions {
1003                descending: false,
1004                nulls_first: false,
1005            }),
1006        },
1007        arrow::compute::SortColumn {
1008            values: version_col,
1009            options: Some(arrow::compute::SortOptions {
1010                descending: true,
1011                nulls_first: false,
1012            }),
1013        },
1014    ];
1015    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1016
1017    // Reorder all columns by sorted indices
1018    let sorted_columns: Vec<ArrayRef> = batch
1019        .columns()
1020        .iter()
1021        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1022        .collect::<Result<_, _>>()
1023        .map_err(arrow_err)?;
1024    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1025
1026    // Build dedup mask: keep first occurrence of each id
1027    let sorted_id = sorted
1028        .column_by_name(id_column)
1029        .unwrap()
1030        .as_any()
1031        .downcast_ref::<UInt64Array>()
1032        .unwrap();
1033
1034    let mut keep = vec![false; sorted.num_rows()];
1035    if !keep.is_empty() {
1036        keep[0] = true;
1037        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1038            if sorted_id.value(i) != sorted_id.value(i - 1) {
1039                *flag = true;
1040            }
1041        }
1042    }
1043
1044    let mask = arrow_array::BooleanArray::from(keep);
1045    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1046}
1047
1048/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
1049fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1050    if batch.num_rows() == 0 {
1051        return Ok(batch.clone());
1052    }
1053    let op_col = match batch.column_by_name("op") {
1054        Some(col) => col
1055            .as_any()
1056            .downcast_ref::<arrow_array::UInt8Array>()
1057            .unwrap(),
1058        None => return Ok(batch.clone()),
1059    };
1060    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1061    let mask = arrow_array::BooleanArray::from(keep);
1062    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1063}
1064
1065/// Filter out rows where `_deleted = true` after MVCC dedup.
1066fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1067    if batch.num_rows() == 0 {
1068        return Ok(batch.clone());
1069    }
1070    let deleted_col = match batch.column_by_name("_deleted") {
1071        Some(col) => col
1072            .as_any()
1073            .downcast_ref::<arrow_array::BooleanArray>()
1074            .unwrap(),
1075        None => return Ok(batch.clone()),
1076    };
1077    let keep: Vec<bool> = (0..deleted_col.len())
1078        .map(|i| !deleted_col.value(i))
1079        .collect();
1080    let mask = arrow_array::BooleanArray::from(keep);
1081    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1082}
1083
1084/// Filter out rows whose `_vid` appears in L0 tombstones.
1085fn filter_l0_tombstones(
1086    batch: &RecordBatch,
1087    l0_ctx: &crate::query::df_graph::L0Context,
1088) -> DFResult<RecordBatch> {
1089    if batch.num_rows() == 0 {
1090        return Ok(batch.clone());
1091    }
1092
1093    let mut tombstones: HashSet<u64> = HashSet::new();
1094    for l0 in l0_ctx.iter_l0_buffers() {
1095        let guard = l0.read();
1096        for vid in guard.vertex_tombstones.iter() {
1097            tombstones.insert(vid.as_u64());
1098        }
1099    }
1100
1101    if tombstones.is_empty() {
1102        return Ok(batch.clone());
1103    }
1104
1105    let vid_col = batch
1106        .column_by_name("_vid")
1107        .ok_or_else(|| {
1108            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1109        })?
1110        .as_any()
1111        .downcast_ref::<UInt64Array>()
1112        .unwrap();
1113
1114    let keep: Vec<bool> = (0..vid_col.len())
1115        .map(|i| !tombstones.contains(&vid_col.value(i)))
1116        .collect();
1117    let mask = arrow_array::BooleanArray::from(keep);
1118    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1119}
1120
1121/// Filter out rows whose `eid` appears in L0 edge tombstones.
1122fn filter_l0_edge_tombstones(
1123    batch: &RecordBatch,
1124    l0_ctx: &crate::query::df_graph::L0Context,
1125) -> DFResult<RecordBatch> {
1126    if batch.num_rows() == 0 {
1127        return Ok(batch.clone());
1128    }
1129
1130    let mut tombstones: HashSet<u64> = HashSet::new();
1131    for l0 in l0_ctx.iter_l0_buffers() {
1132        let guard = l0.read();
1133        for eid in guard.tombstones.keys() {
1134            tombstones.insert(eid.as_u64());
1135        }
1136    }
1137
1138    if tombstones.is_empty() {
1139        return Ok(batch.clone());
1140    }
1141
1142    let eid_col = batch
1143        .column_by_name("eid")
1144        .ok_or_else(|| {
1145            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1146        })?
1147        .as_any()
1148        .downcast_ref::<UInt64Array>()
1149        .unwrap();
1150
1151    let keep: Vec<bool> = (0..eid_col.len())
1152        .map(|i| !tombstones.contains(&eid_col.value(i)))
1153        .collect();
1154    let mask = arrow_array::BooleanArray::from(keep);
1155    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1156}
1157
1158/// Extract a target VID from a DataFusion physical filter expression.
1159///
1160/// Looks for patterns like `_vid = <uint64_literal>` or `<uint64_literal> = _vid`
1161/// in the top-level expression or as a conjunct of an AND chain. Returns the first
1162/// VID literal found, or `None` if the filter does not contain such a pattern.
1163///
1164/// This also handles `CAST(literal AS UInt64)` which DataFusion may insert when
1165/// the original literal is Int64.
1166fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1167    use datafusion::logical_expr::Operator;
1168    use datafusion::physical_expr::expressions::BinaryExpr;
1169
1170    // Try to match this expression as `_vid = literal`
1171    if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1172        if bin.op() == &Operator::Eq {
1173            // Check both directions: col = lit and lit = col
1174            if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1175                return Some(vid);
1176            }
1177            if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1178                return Some(vid);
1179            }
1180        }
1181        // Recurse into AND conjuncts
1182        if bin.op() == &Operator::And {
1183            if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1184                return Some(vid);
1185            }
1186            return extract_vid_from_physical_filter(bin.right());
1187        }
1188    }
1189    None
1190}
1191
1192/// Try to extract a VID value from a `(column_expr, value_expr)` pair where
1193/// `column_expr` is a `Column` named `_vid` and `value_expr` is a UInt64 or
1194/// non-negative Int64 literal (possibly wrapped in a CAST to UInt64).
1195fn try_extract_vid_eq(
1196    col_side: &Arc<dyn PhysicalExpr>,
1197    val_side: &Arc<dyn PhysicalExpr>,
1198) -> Option<u64> {
1199    use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1200
1201    // Check that col_side is Column("_vid") or Column("variable._vid")
1202    let col = col_side.as_any().downcast_ref::<Column>()?;
1203    if col.name() != "_vid" && !col.name().ends_with("._vid") {
1204        return None;
1205    }
1206
1207    // Try direct literal
1208    if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1209        return scalar_to_u64(lit.value());
1210    }
1211
1212    // Try CAST(literal AS UInt64)
1213    if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1214        && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1215    {
1216        return scalar_to_u64(lit.value());
1217    }
1218
1219    None
1220}
1221
1222/// AND-combine an optional VID filter clause with an optional indexed-property
1223/// filter clause. Either, both, or neither may be present. See issues #55, #57.
1224fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1225    match (vid_filter, extra) {
1226        (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1227        (Some(a), None) => Some(a.to_string()),
1228        (None, Some(b)) => Some(b.to_string()),
1229        (None, None) => None,
1230    }
1231}
1232
1233/// Format a slice of VIDs as a Lance `_vid IN (v1, v2, ...)` filter clause.
1234/// Used by the scan-side IN-list pushdown introduced in issue #55 PR #4.
1235fn format_vid_in_list(vids: &[u64]) -> String {
1236    use std::fmt::Write;
1237    debug_assert!(!vids.is_empty());
1238    let mut s = String::with_capacity(vids.len() * 8 + 12);
1239    s.push_str("_vid IN (");
1240    for (i, v) in vids.iter().enumerate() {
1241        if i > 0 {
1242            s.push(',');
1243        }
1244        let _ = write!(s, "{v}");
1245    }
1246    s.push(')');
1247    s
1248}
1249
1250/// Convert a `ScalarValue` to `u64` if it is a non-negative integer type.
1251fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1252    use datafusion::common::ScalarValue;
1253    match sv {
1254        ScalarValue::UInt64(Some(v)) => Some(*v),
1255        ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1256        ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1257        ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1258        _ => None,
1259    }
1260}
1261
1262/// Build a RecordBatch from L0 buffer data for a given label, matching the
1263/// Lance query's column set.
1264///
1265/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1266/// with later buffers overwriting earlier ones for the same VID.
1267///
1268/// When `target_vids` is `Some`, only those VIDs are collected (direct HashMap
1269/// lookups instead of iterating all VIDs for the label). This must mirror the
1270/// Lance-side VID pushdown — otherwise L0-only (unflushed) rows bypass the
1271/// filter and the scan emits the full label table. See issue #72 item 1.
1272fn build_l0_vertex_batch(
1273    l0_ctx: &crate::query::df_graph::L0Context,
1274    label: &str,
1275    lance_schema: &SchemaRef,
1276    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1277    target_vids: Option<&[u64]>,
1278) -> DFResult<RecordBatch> {
1279    // Collect all L0 vertex data, merging in visibility order
1280    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1281    let mut tombstones: HashSet<u64> = HashSet::new();
1282    // System-managed timestamps: created_at takes the earliest seen
1283    // timestamp across L0 buffers (preserving the original creation
1284    // moment when a row has been touched in multiple buffers); updated_at
1285    // takes the latest (most recent write). Used by `created_at(n)` /
1286    // `updated_at(n)` Cypher functions.
1287    let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1288    let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1289
1290    for l0 in l0_ctx.iter_l0_buffers() {
1291        let guard = l0.read();
1292        // Collect tombstones
1293        for vid in guard.vertex_tombstones.iter() {
1294            tombstones.insert(vid.as_u64());
1295        }
1296        // Collect vertices — restrict to target_vids (single- or multi-VID
1297        // pushdown from id(x) = ? / id(x) IN [...]) when set, else all
1298        // vertices for the label. See issue #72 item 1: without this filter,
1299        // freshly-inserted L0 rows bypass the IN-list pushdown that Lance
1300        // already honors, defeating the optimization.
1301        let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1302            let mut out = Vec::with_capacity(tvs.len());
1303            for &tv in tvs {
1304                let vid = Vid::from(tv);
1305                if guard.vertex_properties.contains_key(&vid)
1306                    && (label.is_empty()
1307                        || guard
1308                            .label_to_vids
1309                            .get(label)
1310                            .is_some_and(|s| s.contains(&vid)))
1311                {
1312                    out.push(vid);
1313                }
1314            }
1315            out
1316        } else {
1317            guard.vids_for_label(label)
1318        };
1319        for vid in candidate_vids {
1320            let vid_u64 = vid.as_u64();
1321            if tombstones.contains(&vid_u64) {
1322                continue;
1323            }
1324            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1325            let entry = vid_data
1326                .entry(vid_u64)
1327                .or_insert_with(|| (Properties::new(), 0));
1328            // Merge properties (later L0 overwrites)
1329            if let Some(props) = guard.vertex_properties.get(&vid) {
1330                for (k, v) in props {
1331                    entry.0.insert(k.clone(), v.clone());
1332                }
1333            }
1334            // Take the highest version
1335            if version > entry.1 {
1336                entry.1 = version;
1337            }
1338            // Merge system timestamps: earliest creation, latest update
1339            if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1340                vid_created_at
1341                    .entry(vid_u64)
1342                    .and_modify(|cur| {
1343                        if ts < *cur {
1344                            *cur = ts;
1345                        }
1346                    })
1347                    .or_insert(ts);
1348            }
1349            if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1350                vid_updated_at
1351                    .entry(vid_u64)
1352                    .and_modify(|cur| {
1353                        if ts > *cur {
1354                            *cur = ts;
1355                        }
1356                    })
1357                    .or_insert(ts);
1358            }
1359        }
1360    }
1361
1362    // Remove tombstoned VIDs
1363    for t in &tombstones {
1364        vid_data.remove(t);
1365    }
1366
1367    if vid_data.is_empty() {
1368        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1369    }
1370
1371    // Sort VIDs for deterministic output
1372    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1373    vids.sort_unstable();
1374
1375    let num_rows = vids.len();
1376    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1377
1378    // Determine which schema property names exist
1379    let schema_prop_names: HashSet<&str> = label_props
1380        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1381        .unwrap_or_default();
1382
1383    for field in lance_schema.fields() {
1384        let col_name = field.name().as_str();
1385        match col_name {
1386            "_vid" => {
1387                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1388            }
1389            "_deleted" => {
1390                // L0 vertices are always live (tombstoned ones are already excluded)
1391                let vals = vec![false; num_rows];
1392                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1393            }
1394            "_version" => {
1395                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1396                columns.push(Arc::new(UInt64Array::from(vals)));
1397            }
1398            "_created_at" => {
1399                let mut builder =
1400                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1401                for v in &vids {
1402                    match vid_created_at.get(v) {
1403                        Some(&ts) => builder.append_value(ts),
1404                        None => builder.append_null(),
1405                    }
1406                }
1407                columns.push(Arc::new(builder.finish()));
1408            }
1409            "_updated_at" => {
1410                let mut builder =
1411                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1412                for v in &vids {
1413                    match vid_updated_at.get(v) {
1414                        Some(&ts) => builder.append_value(ts),
1415                        None => builder.append_null(),
1416                    }
1417                }
1418                columns.push(Arc::new(builder.finish()));
1419            }
1420            "overflow_json" => {
1421                // Collect non-schema properties as CypherValue
1422                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1423                for vid_u64 in &vids {
1424                    let (props, _) = &vid_data[vid_u64];
1425                    let mut overflow = serde_json::Map::new();
1426                    for (k, v) in props {
1427                        if k == "ext_id" || k.starts_with('_') {
1428                            continue;
1429                        }
1430                        if !schema_prop_names.contains(k.as_str()) {
1431                            let json_val: serde_json::Value = v.clone().into();
1432                            overflow.insert(k.clone(), json_val);
1433                        }
1434                    }
1435                    if overflow.is_empty() {
1436                        builder.append_null();
1437                    } else {
1438                        let json_val = serde_json::Value::Object(overflow);
1439                        match encode_cypher_value(&json_val) {
1440                            Ok(bytes) => builder.append_value(bytes),
1441                            Err(_) => builder.append_null(),
1442                        }
1443                    }
1444                }
1445                columns.push(Arc::new(builder.finish()));
1446            }
1447            _ => {
1448                // Schema property column: convert L0 Value → Arrow typed value
1449                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1450                columns.push(col);
1451            }
1452        }
1453    }
1454
1455    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1456}
1457
1458/// Build a single Arrow column from L0 property values.
1459///
1460/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1461fn build_l0_property_column(
1462    vids: &[u64],
1463    vid_data: &HashMap<u64, (Properties, u64)>,
1464    prop_name: &str,
1465    data_type: &DataType,
1466) -> DFResult<ArrayRef> {
1467    // Convert to Vid keys for reuse of existing build_property_column_static
1468    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1469    let props_map: HashMap<Vid, Properties> = vid_data
1470        .iter()
1471        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1472        .collect();
1473
1474    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1475}
1476
1477/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1478/// the DeltaDataset Lance table's column set.
1479///
1480/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1481/// with later buffers overwriting earlier ones for the same EID.
1482fn build_l0_edge_batch(
1483    l0_ctx: &crate::query::df_graph::L0Context,
1484    edge_type: &str,
1485    internal_schema: &SchemaRef,
1486    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1487) -> DFResult<RecordBatch> {
1488    // Collect all L0 edge data, merging in visibility order
1489    // eid -> (src_vid, dst_vid, properties, version)
1490    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1491    let mut tombstones: HashSet<u64> = HashSet::new();
1492    // System-managed timestamps: earliest creation, latest update.
1493    // See `build_l0_vertex_batch` for the rationale.
1494    let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1495    let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1496
1497    for l0 in l0_ctx.iter_l0_buffers() {
1498        let guard = l0.read();
1499        // Collect tombstones
1500        for eid in guard.tombstones.keys() {
1501            tombstones.insert(eid.as_u64());
1502        }
1503        // Collect edges for this type
1504        for eid in guard.eids_for_type(edge_type) {
1505            let eid_u64 = eid.as_u64();
1506            if tombstones.contains(&eid_u64) {
1507                continue;
1508            }
1509            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1510                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1511                None => continue,
1512            };
1513            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1514            let entry = eid_data
1515                .entry(eid_u64)
1516                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1517            // Merge properties (later L0 overwrites)
1518            if let Some(props) = guard.edge_properties.get(&eid) {
1519                for (k, v) in props {
1520                    entry.2.insert(k.clone(), v.clone());
1521                }
1522            }
1523            // Update endpoints from latest L0 layer
1524            entry.0 = src_vid;
1525            entry.1 = dst_vid;
1526            // Take the highest version
1527            if version > entry.3 {
1528                entry.3 = version;
1529            }
1530            // Merge system timestamps
1531            if let Some(&ts) = guard.edge_created_at.get(&eid) {
1532                eid_created_at
1533                    .entry(eid_u64)
1534                    .and_modify(|cur| {
1535                        if ts < *cur {
1536                            *cur = ts;
1537                        }
1538                    })
1539                    .or_insert(ts);
1540            }
1541            if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1542                eid_updated_at
1543                    .entry(eid_u64)
1544                    .and_modify(|cur| {
1545                        if ts > *cur {
1546                            *cur = ts;
1547                        }
1548                    })
1549                    .or_insert(ts);
1550            }
1551        }
1552    }
1553
1554    // Remove tombstoned EIDs
1555    for t in &tombstones {
1556        eid_data.remove(t);
1557    }
1558
1559    if eid_data.is_empty() {
1560        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1561    }
1562
1563    // Sort EIDs for deterministic output
1564    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1565    eids.sort_unstable();
1566
1567    let num_rows = eids.len();
1568    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1569
1570    // Determine which schema property names exist
1571    let schema_prop_names: HashSet<&str> = type_props
1572        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1573        .unwrap_or_default();
1574
1575    for field in internal_schema.fields() {
1576        let col_name = field.name().as_str();
1577        match col_name {
1578            "eid" => {
1579                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1580            }
1581            "src_vid" => {
1582                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1583                columns.push(Arc::new(UInt64Array::from(vals)));
1584            }
1585            "dst_vid" => {
1586                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1587                columns.push(Arc::new(UInt64Array::from(vals)));
1588            }
1589            "op" => {
1590                // L0 edges are always live (tombstoned ones already excluded)
1591                let vals = vec![0u8; num_rows];
1592                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1593            }
1594            "_version" => {
1595                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1596                columns.push(Arc::new(UInt64Array::from(vals)));
1597            }
1598            "_created_at" => {
1599                let mut builder =
1600                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1601                for e in &eids {
1602                    match eid_created_at.get(e) {
1603                        Some(&ts) => builder.append_value(ts),
1604                        None => builder.append_null(),
1605                    }
1606                }
1607                columns.push(Arc::new(builder.finish()));
1608            }
1609            "_updated_at" => {
1610                let mut builder =
1611                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1612                for e in &eids {
1613                    match eid_updated_at.get(e) {
1614                        Some(&ts) => builder.append_value(ts),
1615                        None => builder.append_null(),
1616                    }
1617                }
1618                columns.push(Arc::new(builder.finish()));
1619            }
1620            "overflow_json" => {
1621                // Collect non-schema properties as CypherValue
1622                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1623                for eid_u64 in &eids {
1624                    let (_, _, props, _) = &eid_data[eid_u64];
1625                    let mut overflow = serde_json::Map::new();
1626                    for (k, v) in props {
1627                        if k.starts_with('_') {
1628                            continue;
1629                        }
1630                        if !schema_prop_names.contains(k.as_str()) {
1631                            let json_val: serde_json::Value = v.clone().into();
1632                            overflow.insert(k.clone(), json_val);
1633                        }
1634                    }
1635                    if overflow.is_empty() {
1636                        builder.append_null();
1637                    } else {
1638                        let json_val = serde_json::Value::Object(overflow);
1639                        match encode_cypher_value(&json_val) {
1640                            Ok(bytes) => builder.append_value(bytes),
1641                            Err(_) => builder.append_null(),
1642                        }
1643                    }
1644                }
1645                columns.push(Arc::new(builder.finish()));
1646            }
1647            _ => {
1648                // Schema property column: convert L0 Value → Arrow typed value
1649                let col =
1650                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1651                columns.push(col);
1652            }
1653        }
1654    }
1655
1656    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1657}
1658
1659/// Build a single Arrow column from L0 edge property values.
1660///
1661/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1662fn build_l0_edge_property_column(
1663    eids: &[u64],
1664    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1665    prop_name: &str,
1666    data_type: &DataType,
1667) -> DFResult<ArrayRef> {
1668    // Convert to Vid keys for reuse of existing build_property_column_static
1669    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1670    let props_map: HashMap<Vid, Properties> = eid_data
1671        .iter()
1672        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1673        .collect();
1674
1675    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1676}
1677
1678/// Build the `_labels` column for known-label vertices.
1679///
1680/// Reads `_labels` from the stored Lance batch if available. Falls back to
1681/// `[label]` when the column is absent (legacy data). Additional labels from
1682/// L0 buffers are merged in.
1683fn build_labels_column_for_known_label(
1684    vid_arr: &UInt64Array,
1685    label: &str,
1686    l0_ctx: &crate::query::df_graph::L0Context,
1687    batch_labels_col: Option<&arrow_array::ListArray>,
1688) -> DFResult<ArrayRef> {
1689    use uni_store::storage::arrow_convert::labels_from_list_array;
1690
1691    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1692
1693    for i in 0..vid_arr.len() {
1694        let vid = Vid::from(vid_arr.value(i));
1695
1696        // Start with labels from the stored column, falling back to [label]
1697        let mut labels = match batch_labels_col {
1698            Some(list_arr) => {
1699                let stored = labels_from_list_array(list_arr, i);
1700                if stored.is_empty() {
1701                    vec![label.to_string()]
1702                } else {
1703                    stored
1704                }
1705            }
1706            None => vec![label.to_string()],
1707        };
1708
1709        // Ensure the scanned label is present (defensive)
1710        if !labels.iter().any(|l| l == label) {
1711            labels.push(label.to_string());
1712        }
1713
1714        // Merge additional labels from L0 buffers
1715        for l0 in l0_ctx.iter_l0_buffers() {
1716            let guard = l0.read();
1717            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1718                for lbl in l0_labels {
1719                    if !labels.contains(lbl) {
1720                        labels.push(lbl.clone());
1721                    }
1722                }
1723            }
1724        }
1725
1726        let values = labels_builder.values();
1727        for lbl in &labels {
1728            values.append_value(lbl);
1729        }
1730        labels_builder.append(true);
1731    }
1732
1733    Ok(Arc::new(labels_builder.finish()))
1734}
1735
1736/// Map a Lance-schema batch to the DataFusion output schema.
1737///
1738/// The output schema has `{variable}.{property}` column names, while Lance
1739/// uses bare property names. This function performs the positional mapping,
1740/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1741fn map_to_output_schema(
1742    batch: &RecordBatch,
1743    label: &str,
1744    _variable: &str,
1745    projected_properties: &[String],
1746    output_schema: &SchemaRef,
1747    l0_ctx: &crate::query::df_graph::L0Context,
1748) -> DFResult<RecordBatch> {
1749    if batch.num_rows() == 0 {
1750        return Ok(RecordBatch::new_empty(output_schema.clone()));
1751    }
1752
1753    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1754
1755    // 1. {var}._vid
1756    let vid_col = batch
1757        .column_by_name("_vid")
1758        .ok_or_else(|| {
1759            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1760        })?
1761        .clone();
1762    let vid_arr = vid_col
1763        .as_any()
1764        .downcast_ref::<UInt64Array>()
1765        .ok_or_else(|| {
1766            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1767        })?;
1768
1769    // 2. {var}._labels — read from stored column, overlay L0 additions
1770    let batch_labels_col = batch
1771        .column_by_name("_labels")
1772        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1773    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1774    columns.push(vid_col.clone());
1775    columns.push(labels_col);
1776
1777    // 3. Projected properties
1778    // Pre-load overflow_json column for extracting non-schema properties
1779    let overflow_arr = batch
1780        .column_by_name("overflow_json")
1781        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1782
1783    for prop in projected_properties {
1784        if prop == "overflow_json" {
1785            match batch.column_by_name("overflow_json") {
1786                Some(col) => columns.push(col.clone()),
1787                None => {
1788                    // No overflow_json in Lance — return null column
1789                    columns.push(arrow_array::new_null_array(
1790                        &DataType::LargeBinary,
1791                        batch.num_rows(),
1792                    ));
1793                }
1794            }
1795        } else if prop == "_all_props" {
1796            // Build _all_props from overflow_json + L0 overlay.
1797            // Fast path: if no L0 buffer has vertex property mutations AND
1798            // there are no schema columns to merge, pass through overflow_json.
1799            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1800                let guard = l0.read();
1801                !guard.vertex_properties.is_empty()
1802            });
1803            // Check if this label has schema-defined columns (besides system columns)
1804            let has_schema_cols = projected_properties
1805                .iter()
1806                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1807
1808            if !any_l0_has_vertex_props && !has_schema_cols {
1809                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1810                match batch.column_by_name("overflow_json") {
1811                    Some(col) => columns.push(col.clone()),
1812                    None => {
1813                        columns.push(arrow_array::new_null_array(
1814                            &DataType::LargeBinary,
1815                            batch.num_rows(),
1816                        ));
1817                    }
1818                }
1819            } else {
1820                // Need to merge: schema columns + overflow_json + L0 overlay
1821                let col = build_all_props_column_for_schema_scan(
1822                    batch,
1823                    vid_arr,
1824                    overflow_arr,
1825                    projected_properties,
1826                    l0_ctx,
1827                );
1828                columns.push(col);
1829            }
1830        } else {
1831            match batch.column_by_name(prop) {
1832                Some(col) => columns.push(col.clone()),
1833                None => {
1834                    // Column missing in Lance -- extract from overflow_json
1835                    // CypherValue blob with L0 overlay
1836                    let col = build_overflow_property_column(
1837                        batch.num_rows(),
1838                        vid_arr,
1839                        overflow_arr,
1840                        prop,
1841                        l0_ctx,
1842                    );
1843                    columns.push(col);
1844                }
1845            }
1846        }
1847    }
1848
1849    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1850}
1851
1852/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1853///
1854/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1855/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1856/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1857/// `_version` are dropped.
1858fn map_edge_to_output_schema(
1859    batch: &RecordBatch,
1860    variable: &str,
1861    projected_properties: &[String],
1862    output_schema: &SchemaRef,
1863) -> DFResult<RecordBatch> {
1864    if batch.num_rows() == 0 {
1865        return Ok(RecordBatch::new_empty(output_schema.clone()));
1866    }
1867
1868    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1869
1870    // 1. {var}._eid
1871    let eid_col = batch
1872        .column_by_name("eid")
1873        .ok_or_else(|| {
1874            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1875        })?
1876        .clone();
1877    columns.push(eid_col);
1878
1879    // 2. {var}._src_vid
1880    let src_col = batch
1881        .column_by_name("src_vid")
1882        .ok_or_else(|| {
1883            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1884        })?
1885        .clone();
1886    columns.push(src_col);
1887
1888    // 3. {var}._dst_vid
1889    let dst_col = batch
1890        .column_by_name("dst_vid")
1891        .ok_or_else(|| {
1892            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1893        })?
1894        .clone();
1895    columns.push(dst_col);
1896
1897    // 4. Projected properties
1898    for prop in projected_properties {
1899        if prop == "overflow_json" {
1900            match batch.column_by_name("overflow_json") {
1901                Some(col) => columns.push(col.clone()),
1902                None => {
1903                    columns.push(arrow_array::new_null_array(
1904                        &DataType::LargeBinary,
1905                        batch.num_rows(),
1906                    ));
1907                }
1908            }
1909        } else {
1910            match batch.column_by_name(prop) {
1911                Some(col) => columns.push(col.clone()),
1912                None => {
1913                    // Column missing in Lance — extract from overflow_json CypherValue blob
1914                    // (mirrors the vertex path in map_to_output_schema)
1915                    let overflow_arr = batch
1916                        .column_by_name("overflow_json")
1917                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1918
1919                    if let Some(arr) = overflow_arr {
1920                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1921                        for i in 0..batch.num_rows() {
1922                            if !arr.is_null(i) {
1923                                let blob = arr.value(i);
1924                                // Fast path: extract map entry without decoding entire map
1925                                if let Some(sub_bytes) =
1926                                    uni_common::cypher_value_codec::extract_map_entry_raw(
1927                                        blob, prop,
1928                                    )
1929                                {
1930                                    builder.append_value(&sub_bytes);
1931                                } else {
1932                                    builder.append_null();
1933                                }
1934                            } else {
1935                                builder.append_null();
1936                            }
1937                        }
1938                        columns.push(Arc::new(builder.finish()));
1939                    } else {
1940                        // No overflow_json column either — return null column
1941                        let target_field = output_schema
1942                            .fields()
1943                            .iter()
1944                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
1945                        let dt = target_field
1946                            .map(|f| f.data_type().clone())
1947                            .unwrap_or(DataType::LargeBinary);
1948                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1949                    }
1950                }
1951            }
1952        }
1953    }
1954
1955    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1956}
1957
1958/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
1959///
1960/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
1961/// for known-label vertex scans. Reads all needed columns in a single Lance query,
1962/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
1963/// and maps to the output schema.
1964#[expect(clippy::too_many_arguments)]
1965async fn columnar_scan_vertex_batch_static(
1966    graph_ctx: &GraphExecutionContext,
1967    label: &str,
1968    variable: &str,
1969    projected_properties: &[String],
1970    output_schema: &SchemaRef,
1971    filter: &Option<Arc<dyn PhysicalExpr>>,
1972    vid_list_filter: Option<&[u64]>,
1973    extra_lance_filter: Option<&str>,
1974    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
1975) -> DFResult<RecordBatch> {
1976    let storage = graph_ctx.storage();
1977    let l0_ctx = graph_ctx.l0_context();
1978    let uni_schema = storage.schema_manager().schema();
1979    let label_props = uni_schema.properties.get(label);
1980
1981    // Extract target VID from filter for short-circuit lookup. Single-VID
1982    // pushdown is from the WHERE-clause `id(x) = $literal` path; multi-VID
1983    // pushdown is from the IN-list path (`UNWIND ... WHERE id(x) = e.field`,
1984    // see issue #55 PR #4).
1985    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
1986
1987    // Build the list of columns to request from Lance
1988    let mut lance_columns: Vec<String> = vec![
1989        "_vid".to_string(),
1990        "_deleted".to_string(),
1991        "_version".to_string(),
1992    ];
1993    for prop in projected_properties {
1994        if prop == "overflow_json" {
1995            push_column_if_absent(&mut lance_columns, "overflow_json");
1996        } else if prop == "_created_at" || prop == "_updated_at" {
1997            // System-managed timestamps live on every vertex table regardless
1998            // of label schema. Request them directly from Lance.
1999            push_column_if_absent(&mut lance_columns, prop);
2000        } else {
2001            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2002            if exists_in_schema {
2003                push_column_if_absent(&mut lance_columns, prop);
2004            }
2005        }
2006    }
2007
2008    // Ensure overflow_json is present when any projected property is not in the schema
2009    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2010    let needs_overflow = projected_properties.iter().any(|p| {
2011        p == "overflow_json"
2012            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2013                && !label_props.is_some_and(|lp| lp.contains_key(p)))
2014    });
2015    if needs_overflow {
2016        push_column_if_absent(&mut lance_columns, "overflow_json");
2017    }
2018
2019    // Push _vid filter to Lance for O(log N) BTree index lookup instead of full scan.
2020    // Prefer the multi-VID list (formats as `_vid IN (...)`); fall back to
2021    // single-VID `_vid = N` from the WHERE-clause path. AND-combined with
2022    // any indexed-property pushdown (issue #57).
2023    let vid_part = match (vid_list_filter, target_vid) {
2024        (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2025        (_, Some(v)) => Some(format!("_vid = {v}")),
2026        _ => None,
2027    };
2028    let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2029    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2030
2031    // M5h.2: route through plugin Storage if one is registered for
2032    // this label. v1 ships reads only — writes still go to native
2033    // backend. v1 ignores `combined_filter` when delegating (the
2034    // planner re-filters via the surrounding Filter node); per-plugin
2035    // filter pushdown is a v1.1 follow-up (`TODO(M5h.2-filter)`).
2036    let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2037        Some(reg) => match reg.lookup_label_storage(label) {
2038            Some(plugin_storage) => {
2039                let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2040                    datafusion::error::DataFusionError::Execution(format!(
2041                        "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2042                        e.message, e.code
2043                    ))
2044                })?;
2045                use futures::StreamExt;
2046                let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2047                let mut schema_ref: Option<SchemaRef> = None;
2048                while let Some(b) = stream.next().await {
2049                    let b = b.map_err(|e| {
2050                        datafusion::error::DataFusionError::Execution(format!(
2051                            "plugin Storage stream({label}) errored: {e}"
2052                        ))
2053                    })?;
2054                    if schema_ref.is_none() {
2055                        schema_ref = Some(b.schema());
2056                    }
2057                    batches.push(b);
2058                }
2059                if let Some(s) = schema_ref {
2060                    Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2061                        datafusion::error::DataFusionError::Execution(format!(
2062                            "plugin Storage concat({label}) failed: {e}"
2063                        ))
2064                    })?)
2065                } else {
2066                    None
2067                }
2068            }
2069            None => None,
2070        },
2071        None => None,
2072    };
2073
2074    let lance_batch = match plugin_batch {
2075        Some(b) => Some(b),
2076        None => storage
2077            .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2078            .await
2079            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2080    };
2081
2082    // MVCC dedup the Lance batch
2083    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2084
2085    // Build the internal Lance schema for L0 batch construction.
2086    // Use the Lance batch schema if available, otherwise build from scratch.
2087    let internal_schema = match &lance_deduped {
2088        Some(batch) => batch.schema(),
2089        None => {
2090            let mut fields = vec![
2091                Field::new("_vid", DataType::UInt64, false),
2092                Field::new("_deleted", DataType::Boolean, false),
2093                Field::new("_version", DataType::UInt64, false),
2094            ];
2095            for col in &lance_columns {
2096                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2097                    continue;
2098                }
2099                if col == "overflow_json" {
2100                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2101                } else if col == "_created_at" || col == "_updated_at" {
2102                    fields.push(Field::new(
2103                        col,
2104                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2105                        true,
2106                    ));
2107                } else {
2108                    let arrow_type = label_props
2109                        .and_then(|lp| lp.get(col.as_str()))
2110                        .map(|meta| meta.r#type.to_arrow())
2111                        .unwrap_or(DataType::LargeBinary);
2112                    fields.push(Field::new(col, arrow_type, true));
2113                }
2114            }
2115            Arc::new(Schema::new(fields))
2116        }
2117    };
2118
2119    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2120    // from issue #55 PR #4 — must restrict L0 to the same VID set Lance was
2121    // filtered against, see issue #72 item 1). Fall back to single-VID
2122    // (`id(x) = $literal` short-circuit). One-element buffer keeps the
2123    // borrowed slice alive for the single-VID case.
2124    let single_vid_buf: [u64; 1];
2125    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2126        (Some(vs), _) if !vs.is_empty() => Some(vs),
2127        (_, Some(v)) => {
2128            single_vid_buf = [v];
2129            Some(&single_vid_buf)
2130        }
2131        _ => None,
2132    };
2133    let l0_batch =
2134        build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2135
2136    // Merge Lance + L0
2137    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2138    else {
2139        return Ok(RecordBatch::new_empty(output_schema.clone()));
2140    };
2141
2142    // Filter out MVCC deletion tombstones (_deleted = true)
2143    let merged = filter_deleted_rows(&merged)?;
2144    if merged.num_rows() == 0 {
2145        return Ok(RecordBatch::new_empty(output_schema.clone()));
2146    }
2147
2148    // Filter L0 tombstones
2149    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2150
2151    if filtered.num_rows() == 0 {
2152        return Ok(RecordBatch::new_empty(output_schema.clone()));
2153    }
2154
2155    // Map to output schema
2156    let mapped = map_to_output_schema(
2157        &filtered,
2158        label,
2159        variable,
2160        projected_properties,
2161        output_schema,
2162        l0_ctx,
2163    )?;
2164
2165    // Apply indexed-property runtime filter (issue #57). Lance has already
2166    // filtered the on-disk side via `extra_lance_filter`; this catches any
2167    // L0 rows that slipped through the merge.
2168    apply_runtime_filter(mapped, extra_runtime_filter)
2169}
2170
2171/// Apply the indexed-property runtime filter, if present, to a `RecordBatch`.
2172/// Returns the filtered batch (or the original if no filter is set). Rows
2173/// where the predicate evaluates to NULL are treated as non-matching, same
2174/// as DataFusion `FilterExec`. See issue #57.
2175fn apply_runtime_filter(
2176    batch: RecordBatch,
2177    runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2178) -> DFResult<RecordBatch> {
2179    let Some(filter) = runtime_filter else {
2180        return Ok(batch);
2181    };
2182    if batch.num_rows() == 0 {
2183        return Ok(batch);
2184    }
2185    let result = filter.evaluate(&batch)?;
2186    let array = result.into_array(batch.num_rows())?;
2187    let bools = array
2188        .as_any()
2189        .downcast_ref::<arrow_array::BooleanArray>()
2190        .ok_or_else(|| {
2191            datafusion::error::DataFusionError::Internal(
2192                "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2193            )
2194        })?;
2195    arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2196}
2197
2198/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
2199///
2200/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
2201/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
2202/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
2203/// to the output schema.
2204async fn columnar_scan_edge_batch_static(
2205    graph_ctx: &GraphExecutionContext,
2206    edge_type: &str,
2207    variable: &str,
2208    projected_properties: &[String],
2209    output_schema: &SchemaRef,
2210) -> DFResult<RecordBatch> {
2211    let storage = graph_ctx.storage();
2212    let l0_ctx = graph_ctx.l0_context();
2213    let uni_schema = storage.schema_manager().schema();
2214    let type_props = uni_schema.properties.get(edge_type);
2215
2216    // Build the list of columns to request from DeltaDataset Lance table
2217    let mut lance_columns: Vec<String> = vec![
2218        "eid".to_string(),
2219        "src_vid".to_string(),
2220        "dst_vid".to_string(),
2221        "op".to_string(),
2222        "_version".to_string(),
2223    ];
2224    for prop in projected_properties {
2225        if prop == "overflow_json" {
2226            push_column_if_absent(&mut lance_columns, "overflow_json");
2227        } else if prop == "_created_at" || prop == "_updated_at" {
2228            // System-managed timestamps live on every edge table.
2229            push_column_if_absent(&mut lance_columns, prop);
2230        } else {
2231            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2232            if exists_in_schema {
2233                push_column_if_absent(&mut lance_columns, prop);
2234            }
2235        }
2236    }
2237
2238    // Ensure overflow_json is present when any projected property is not in the schema
2239    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2240    let needs_overflow = projected_properties.iter().any(|p| {
2241        p == "overflow_json"
2242            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2243                && !type_props.is_some_and(|tp| tp.contains_key(p)))
2244    });
2245    if needs_overflow {
2246        push_column_if_absent(&mut lance_columns, "overflow_json");
2247    }
2248
2249    // Try to query DeltaDataset (forward direction) via StorageManager domain method
2250    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2251    let lance_batch = storage
2252        .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2253        .await
2254        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2255
2256    // MVCC dedup the Lance batch (by eid)
2257    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2258
2259    // Build the internal schema for L0 batch construction.
2260    // Use the Lance batch schema if available, otherwise build from scratch.
2261    let internal_schema = match &lance_deduped {
2262        Some(batch) => batch.schema(),
2263        None => {
2264            let mut fields = vec![
2265                Field::new("eid", DataType::UInt64, false),
2266                Field::new("src_vid", DataType::UInt64, false),
2267                Field::new("dst_vid", DataType::UInt64, false),
2268                Field::new("op", DataType::UInt8, false),
2269                Field::new("_version", DataType::UInt64, false),
2270            ];
2271            for col in &lance_columns {
2272                if matches!(
2273                    col.as_str(),
2274                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2275                ) {
2276                    continue;
2277                }
2278                if col == "overflow_json" {
2279                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2280                } else if col == "_created_at" || col == "_updated_at" {
2281                    fields.push(Field::new(
2282                        col,
2283                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2284                        true,
2285                    ));
2286                } else {
2287                    let arrow_type = type_props
2288                        .and_then(|tp| tp.get(col.as_str()))
2289                        .map(|meta| meta.r#type.to_arrow())
2290                        .unwrap_or(DataType::LargeBinary);
2291                    fields.push(Field::new(col, arrow_type, true));
2292                }
2293            }
2294            Arc::new(Schema::new(fields))
2295        }
2296    };
2297
2298    // Build L0 batch
2299    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2300
2301    // Merge Lance + L0
2302    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2303        return Ok(RecordBatch::new_empty(output_schema.clone()));
2304    };
2305
2306    // Filter out MVCC deletion ops (op != 0) after dedup
2307    let merged = filter_deleted_edge_ops(&merged)?;
2308    if merged.num_rows() == 0 {
2309        return Ok(RecordBatch::new_empty(output_schema.clone()));
2310    }
2311
2312    // Filter L0 edge tombstones
2313    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2314
2315    if filtered.num_rows() == 0 {
2316        return Ok(RecordBatch::new_empty(output_schema.clone()));
2317    }
2318
2319    // Map to output schema
2320    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2321}
2322
2323/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
2324///
2325/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
2326/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
2327/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
2328/// buffer data, filters tombstones, and maps to the output schema.
2329#[expect(clippy::too_many_arguments)]
2330async fn columnar_scan_schemaless_vertex_batch_static(
2331    graph_ctx: &GraphExecutionContext,
2332    label: &str,
2333    variable: &str,
2334    projected_properties: &[String],
2335    output_schema: &SchemaRef,
2336    filter: &Option<Arc<dyn PhysicalExpr>>,
2337    vid_list_filter: Option<&[u64]>,
2338    extra_lance_filter: Option<&str>,
2339    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2340) -> DFResult<RecordBatch> {
2341    let storage = graph_ctx.storage();
2342    let l0_ctx = graph_ctx.l0_context();
2343
2344    // Extract target VID from filter for short-circuit lookup. See the
2345    // detailed comment on the per-label scan for the IN-list path
2346    // (issue #55 PR #4).
2347    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2348
2349    // Build the Lance filter expression — do NOT filter _deleted here;
2350    // MVCC dedup must see deletion tombstones to pick the highest version.
2351    let filter = {
2352        let mut parts = Vec::new();
2353
2354        // VID point-lookup filter — uses BTree index on _vid. Prefer the
2355        // multi-VID list (formats as `_vid IN (...)`); fall back to single-VID.
2356        match (vid_list_filter, target_vid) {
2357            (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2358            (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2359            _ => {}
2360        }
2361
2362        // Label filter
2363        if !label.is_empty() {
2364            if label.contains(':') {
2365                // Multi-label: each label must be present
2366                for lbl in label.split(':') {
2367                    parts.push(format!("array_contains(labels, '{}')", lbl));
2368                }
2369            } else {
2370                parts.push(format!("array_contains(labels, '{}')", label));
2371            }
2372        }
2373
2374        // Indexed-property pushdown — issue #57.
2375        if let Some(extra) = extra_lance_filter {
2376            parts.push(extra.to_string());
2377        }
2378
2379        if parts.is_empty() {
2380            None
2381        } else {
2382            Some(parts.join(" AND "))
2383        }
2384    };
2385
2386    // Single Lance query via StorageManager domain method
2387    let lance_batch = storage
2388        .scan_main_vertex_table(
2389            &["_vid", "_deleted", "labels", "props_json", "_version"],
2390            filter.as_deref(),
2391        )
2392        .await
2393        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2394
2395    // MVCC dedup the Lance batch
2396    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2397
2398    // Build the internal schema for L0 batch construction.
2399    // Use the Lance batch schema if available, otherwise build from scratch.
2400    let internal_schema = match &lance_deduped {
2401        Some(batch) => batch.schema(),
2402        None => Arc::new(Schema::new(vec![
2403            Field::new("_vid", DataType::UInt64, false),
2404            Field::new("_deleted", DataType::Boolean, false),
2405            Field::new("labels", labels_data_type(), false),
2406            Field::new("props_json", DataType::LargeBinary, true),
2407            Field::new("_version", DataType::UInt64, false),
2408        ])),
2409    };
2410
2411    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2412    // from issue #55 PR #4 — must restrict L0 to match Lance filtering, see
2413    // issue #72 item 1). Fall back to single-VID.
2414    let single_vid_buf: [u64; 1];
2415    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2416        (Some(vs), _) if !vs.is_empty() => Some(vs),
2417        (_, Some(v)) => {
2418            single_vid_buf = [v];
2419            Some(&single_vid_buf)
2420        }
2421        _ => None,
2422    };
2423    let l0_batch =
2424        build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2425
2426    // Merge Lance + L0
2427    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2428    else {
2429        return Ok(RecordBatch::new_empty(output_schema.clone()));
2430    };
2431
2432    // Filter out MVCC deletion tombstones (_deleted = true)
2433    let merged = filter_deleted_rows(&merged)?;
2434    if merged.num_rows() == 0 {
2435        return Ok(RecordBatch::new_empty(output_schema.clone()));
2436    }
2437
2438    // Filter L0 tombstones
2439    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2440
2441    if filtered.num_rows() == 0 {
2442        return Ok(RecordBatch::new_empty(output_schema.clone()));
2443    }
2444
2445    // Map to output schema
2446    let mapped = map_to_schemaless_output_schema(
2447        &filtered,
2448        variable,
2449        projected_properties,
2450        output_schema,
2451        l0_ctx,
2452    )?;
2453
2454    // Apply indexed-property runtime filter — issue #57.
2455    apply_runtime_filter(mapped, extra_runtime_filter)
2456}
2457
2458/// Build a RecordBatch from L0 buffer data for schemaless vertices.
2459///
2460/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
2461/// with later buffers overwriting earlier ones for the same VID. Produces a batch
2462/// matching the internal schema: `_vid, labels, props_json, _version`.
2463fn build_l0_schemaless_vertex_batch(
2464    l0_ctx: &crate::query::df_graph::L0Context,
2465    label: &str,
2466    internal_schema: &SchemaRef,
2467    target_vids: Option<&[u64]>,
2468) -> DFResult<RecordBatch> {
2469    // Collect all L0 vertex data, merging in visibility order
2470    // vid -> (merged_props, highest_version, labels)
2471    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2472    let mut tombstones: HashSet<u64> = HashSet::new();
2473
2474    // Parse multi-label filter
2475    let label_filter: Vec<&str> = if label.is_empty() {
2476        vec![]
2477    } else if label.contains(':') {
2478        label.split(':').collect()
2479    } else {
2480        vec![label]
2481    };
2482
2483    for l0 in l0_ctx.iter_l0_buffers() {
2484        let guard = l0.read();
2485
2486        // Collect tombstones
2487        for vid in guard.vertex_tombstones.iter() {
2488            tombstones.insert(vid.as_u64());
2489        }
2490
2491        // Collect VIDs matching the label filter — short-circuit when target_vids is set
2492        // (see issue #72 item 1; multi-VID IN-list must filter L0 too).
2493        let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2494            let mut out = Vec::with_capacity(tvs.len());
2495            for &tv in tvs {
2496                let vid = Vid::from(tv);
2497                if !guard.vertex_properties.contains_key(&vid) {
2498                    continue;
2499                }
2500                let label_ok = if label_filter.is_empty() {
2501                    true
2502                } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2503                    label_filter
2504                        .iter()
2505                        .all(|lf| labels.contains(&lf.to_string()))
2506                } else {
2507                    false
2508                };
2509                if label_ok {
2510                    out.push(vid);
2511                }
2512            }
2513            out
2514        } else if label_filter.is_empty() {
2515            guard.all_vertex_vids()
2516        } else if label_filter.len() == 1 {
2517            guard.vids_for_label(label_filter[0])
2518        } else {
2519            guard.vids_with_all_labels(&label_filter)
2520        };
2521
2522        for vid in vids {
2523            let vid_u64 = vid.as_u64();
2524            if tombstones.contains(&vid_u64) {
2525                continue;
2526            }
2527            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2528            let entry = vid_data
2529                .entry(vid_u64)
2530                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2531
2532            // Merge properties (later L0 overwrites)
2533            if let Some(props) = guard.vertex_properties.get(&vid) {
2534                for (k, v) in props {
2535                    entry.0.insert(k.clone(), v.clone());
2536                }
2537            }
2538            // Take the highest version
2539            if version > entry.1 {
2540                entry.1 = version;
2541            }
2542            // Update labels from latest L0 layer
2543            if let Some(labels) = guard.vertex_labels.get(&vid) {
2544                entry.2 = labels.clone();
2545            }
2546        }
2547    }
2548
2549    // Remove tombstoned VIDs
2550    for t in &tombstones {
2551        vid_data.remove(t);
2552    }
2553
2554    if vid_data.is_empty() {
2555        return Ok(RecordBatch::new_empty(internal_schema.clone()));
2556    }
2557
2558    // Sort VIDs for deterministic output
2559    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2560    vids.sort_unstable();
2561
2562    let num_rows = vids.len();
2563    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2564
2565    for field in internal_schema.fields() {
2566        match field.name().as_str() {
2567            "_vid" => {
2568                columns.push(Arc::new(UInt64Array::from(vids.clone())));
2569            }
2570            "labels" => {
2571                let mut labels_builder = ListBuilder::new(StringBuilder::new());
2572                for vid_u64 in &vids {
2573                    let (_, _, labels) = &vid_data[vid_u64];
2574                    let values = labels_builder.values();
2575                    for lbl in labels {
2576                        values.append_value(lbl);
2577                    }
2578                    labels_builder.append(true);
2579                }
2580                columns.push(Arc::new(labels_builder.finish()));
2581            }
2582            "props_json" => {
2583                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2584                for vid_u64 in &vids {
2585                    let (props, _, _) = &vid_data[vid_u64];
2586                    if props.is_empty() {
2587                        builder.append_null();
2588                    } else {
2589                        // Encode properties as CypherValue blob
2590                        let json_obj: serde_json::Value = {
2591                            let mut map = serde_json::Map::new();
2592                            for (k, v) in props {
2593                                let json_val: serde_json::Value = v.clone().into();
2594                                map.insert(k.clone(), json_val);
2595                            }
2596                            serde_json::Value::Object(map)
2597                        };
2598                        match encode_cypher_value(&json_obj) {
2599                            Ok(bytes) => builder.append_value(bytes),
2600                            Err(_) => builder.append_null(),
2601                        }
2602                    }
2603                }
2604                columns.push(Arc::new(builder.finish()));
2605            }
2606            "_deleted" => {
2607                // L0 vertices are always live (tombstoned ones already excluded)
2608                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2609                    false;
2610                    num_rows
2611                ])));
2612            }
2613            "_version" => {
2614                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2615                columns.push(Arc::new(UInt64Array::from(vals)));
2616            }
2617            _ => {
2618                // Unexpected column — fill with nulls
2619                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2620            }
2621        }
2622    }
2623
2624    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2625}
2626
2627/// Map an internal-schema schemaless batch to the DataFusion output schema.
2628///
2629/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2630/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2631/// Individual properties are extracted from the `props_json` CypherValue blob by
2632/// decoding to a Map and extracting the sub-value.
2633fn map_to_schemaless_output_schema(
2634    batch: &RecordBatch,
2635    _variable: &str,
2636    projected_properties: &[String],
2637    output_schema: &SchemaRef,
2638    l0_ctx: &crate::query::df_graph::L0Context,
2639) -> DFResult<RecordBatch> {
2640    if batch.num_rows() == 0 {
2641        return Ok(RecordBatch::new_empty(output_schema.clone()));
2642    }
2643
2644    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2645
2646    // 1. {var}._vid — passthrough
2647    let vid_col = batch
2648        .column_by_name("_vid")
2649        .ok_or_else(|| {
2650            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2651        })?
2652        .clone();
2653    let vid_arr = vid_col
2654        .as_any()
2655        .downcast_ref::<UInt64Array>()
2656        .ok_or_else(|| {
2657            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2658        })?;
2659    columns.push(vid_col.clone());
2660
2661    // 2. {var}._labels — from labels column with L0 overlay
2662    let labels_col = batch.column_by_name("labels");
2663    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2664
2665    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2666    for i in 0..vid_arr.len() {
2667        let vid_u64 = vid_arr.value(i);
2668        let vid = Vid::from(vid_u64);
2669
2670        // Start with labels from the batch
2671        let mut row_labels: Vec<String> = Vec::new();
2672        if let Some(arr) = labels_arr
2673            && !arr.is_null(i)
2674        {
2675            let list_val = arr.value(i);
2676            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2677                for j in 0..str_arr.len() {
2678                    if !str_arr.is_null(j) {
2679                        row_labels.push(str_arr.value(j).to_string());
2680                    }
2681                }
2682            }
2683        }
2684
2685        // Overlay L0 labels
2686        for l0 in l0_ctx.iter_l0_buffers() {
2687            let guard = l0.read();
2688            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2689                for lbl in l0_labels {
2690                    if !row_labels.contains(lbl) {
2691                        row_labels.push(lbl.clone());
2692                    }
2693                }
2694            }
2695        }
2696
2697        let values = labels_builder.values();
2698        for lbl in &row_labels {
2699            values.append_value(lbl);
2700        }
2701        labels_builder.append(true);
2702    }
2703    columns.push(Arc::new(labels_builder.finish()));
2704
2705    // 3. Projected properties — extract from props_json
2706    let props_col = batch.column_by_name("props_json");
2707    let props_arr =
2708        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2709
2710    for prop in projected_properties {
2711        if prop == "_all_props" {
2712            // Fast path: if no L0 buffer has vertex property mutations,
2713            // the raw props_json passthrough is correct.
2714            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2715                let guard = l0.read();
2716                !guard.vertex_properties.is_empty()
2717            });
2718            if !any_l0_has_vertex_props {
2719                match props_col {
2720                    Some(col) => columns.push(col.clone()),
2721                    None => {
2722                        columns.push(arrow_array::new_null_array(
2723                            &DataType::LargeBinary,
2724                            batch.num_rows(),
2725                        ));
2726                    }
2727                }
2728            } else {
2729                let col = build_all_props_column_with_l0_overlay(
2730                    batch.num_rows(),
2731                    vid_arr,
2732                    props_arr,
2733                    l0_ctx,
2734                );
2735                columns.push(col);
2736            }
2737        } else {
2738            // Extract individual property from CypherValue blob with L0 overlay.
2739            // The raw column is LargeBinary (CypherValue-encoded). If the output
2740            // schema expects a typed column (e.g., Utf8 for String properties),
2741            // decode the CypherValue and build the correct Arrow type.
2742            let expected_type = output_schema
2743                .field_with_name(&format!("{_variable}.{prop}"))
2744                .map(|f| f.data_type().clone())
2745                .unwrap_or(DataType::LargeBinary);
2746
2747            if expected_type == DataType::LargeBinary {
2748                let col = build_overflow_property_column(
2749                    batch.num_rows(),
2750                    vid_arr,
2751                    props_arr,
2752                    prop,
2753                    l0_ctx,
2754                );
2755                columns.push(col);
2756            } else {
2757                // Decode CypherValue to the expected type via build_property_column_static.
2758                let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2759                for i in 0..batch.num_rows() {
2760                    let vid = Vid::from(vid_arr.value(i));
2761                    let resolved =
2762                        resolve_l0_property(&vid, prop, l0_ctx)
2763                            .flatten()
2764                            .or_else(|| {
2765                                extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2766                                    uni_common::cypher_value_codec::decode(&bytes).ok()
2767                                })
2768                            });
2769                    if let Some(val) = resolved {
2770                        prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2771                    }
2772                }
2773                let vids: Vec<Vid> = (0..batch.num_rows())
2774                    .map(|i| Vid::from(vid_arr.value(i)))
2775                    .collect();
2776                let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2777                    .unwrap_or_else(|_| {
2778                        arrow_array::new_null_array(&expected_type, batch.num_rows())
2779                    });
2780                columns.push(col);
2781            }
2782        }
2783    }
2784
2785    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2786}
2787
2788/// Get the property value for a VID, returning None if not found.
2789pub(crate) fn get_property_value(
2790    vid: &Vid,
2791    props_map: &HashMap<Vid, Properties>,
2792    prop_name: &str,
2793) -> Option<Value> {
2794    if prop_name == "_all_props" {
2795        return props_map.get(vid).map(|p| {
2796            let map: HashMap<String, Value> =
2797                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2798            Value::Map(map)
2799        });
2800    }
2801    props_map
2802        .get(vid)
2803        .and_then(|props| props.get(prop_name))
2804        .cloned()
2805}
2806
2807/// Encode a `serde_json::Value` as CypherValue binary (MessagePack-tagged).
2808///
2809/// Converts from serde_json::Value -> uni_common::Value -> CypherValue bytes.
2810pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2811    let uni_val: uni_common::Value = val.clone().into();
2812    Ok(uni_common::cypher_value_codec::encode(&uni_val))
2813}
2814
2815/// Build a numeric column from property values using the specified builder and extractor.
2816macro_rules! build_numeric_column {
2817    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2818        let mut builder = <$builder_ty>::new();
2819        for vid in $vids {
2820            match get_property_value(vid, $props_map, $prop_name) {
2821                Some(ref v) => {
2822                    if let Some(val) = $extractor(v) {
2823                        builder.append_value($cast(val));
2824                    } else {
2825                        builder.append_null();
2826                    }
2827                }
2828                None => builder.append_null(),
2829            }
2830        }
2831        Ok(Arc::new(builder.finish()) as ArrayRef)
2832    }};
2833}
2834
2835/// Build an Arrow column from property values (static version).
2836pub(crate) fn build_property_column_static(
2837    vids: &[Vid],
2838    props_map: &HashMap<Vid, Properties>,
2839    prop_name: &str,
2840    data_type: &DataType,
2841) -> DFResult<ArrayRef> {
2842    match data_type {
2843        DataType::LargeBinary => {
2844            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2845            use arrow_array::builder::LargeBinaryBuilder;
2846            let mut builder = LargeBinaryBuilder::new();
2847
2848            for vid in vids {
2849                match get_property_value(vid, props_map, prop_name) {
2850                    Some(Value::Null) | None => builder.append_null(),
2851                    Some(Value::Bytes(bytes)) => {
2852                        builder.append_value(&bytes);
2853                    }
2854                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2855                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2856                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2857                        let bytes: Vec<u8> = arr
2858                            .iter()
2859                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2860                            .collect();
2861                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2862                            builder.append_value(&bytes);
2863                        } else {
2864                            let json_val: serde_json::Value = Value::List(arr).into();
2865                            match encode_cypher_value(&json_val) {
2866                                Ok(encoded) => builder.append_value(encoded),
2867                                Err(_) => builder.append_null(),
2868                            }
2869                        }
2870                    }
2871                    Some(val @ Value::Temporal(_)) => {
2872                        // Temporal values (including BTIC) must be encoded directly
2873                        // via CypherValue codec — serde_json round-trip loses the type.
2874                        builder.append_value(uni_common::cypher_value_codec::encode(&val));
2875                    }
2876                    Some(val) => {
2877                        // Value from PropertyManager — convert to serde_json and re-encode to CypherValue binary
2878                        let json_val: serde_json::Value = val.into();
2879                        match encode_cypher_value(&json_val) {
2880                            Ok(bytes) => builder.append_value(bytes),
2881                            Err(_) => builder.append_null(),
2882                        }
2883                    }
2884                }
2885            }
2886            Ok(Arc::new(builder.finish()))
2887        }
2888        DataType::Binary => {
2889            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2890            let mut builder = BinaryBuilder::new();
2891            for vid in vids {
2892                let bytes = get_property_value(vid, props_map, prop_name)
2893                    .filter(|v| !v.is_null())
2894                    .and_then(|v| {
2895                        let json_val: serde_json::Value = v.into();
2896                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2897                    })
2898                    .and_then(|crdt| crdt.to_msgpack().ok());
2899                match bytes {
2900                    Some(b) => builder.append_value(&b),
2901                    None => builder.append_null(),
2902                }
2903            }
2904            Ok(Arc::new(builder.finish()))
2905        }
2906        DataType::Utf8 => {
2907            let mut builder = StringBuilder::new();
2908            for vid in vids {
2909                match get_property_value(vid, props_map, prop_name) {
2910                    Some(Value::String(s)) => builder.append_value(s),
2911                    Some(Value::Null) | None => builder.append_null(),
2912                    Some(other) => builder.append_value(other.to_string()),
2913                }
2914            }
2915            Ok(Arc::new(builder.finish()))
2916        }
2917        DataType::Int64 => {
2918            build_numeric_column!(
2919                vids,
2920                props_map,
2921                prop_name,
2922                Int64Builder,
2923                |v: &Value| v.as_i64(),
2924                |v| v
2925            )
2926        }
2927        DataType::Int32 => {
2928            build_numeric_column!(
2929                vids,
2930                props_map,
2931                prop_name,
2932                Int32Builder,
2933                |v: &Value| v.as_i64(),
2934                |v: i64| v as i32
2935            )
2936        }
2937        DataType::Float64 => {
2938            build_numeric_column!(
2939                vids,
2940                props_map,
2941                prop_name,
2942                Float64Builder,
2943                |v: &Value| v.as_f64(),
2944                |v| v
2945            )
2946        }
2947        DataType::Float32 => {
2948            build_numeric_column!(
2949                vids,
2950                props_map,
2951                prop_name,
2952                Float32Builder,
2953                |v: &Value| v.as_f64(),
2954                |v: f64| v as f32
2955            )
2956        }
2957        DataType::Boolean => {
2958            let mut builder = BooleanBuilder::new();
2959            for vid in vids {
2960                match get_property_value(vid, props_map, prop_name) {
2961                    Some(Value::Bool(b)) => builder.append_value(b),
2962                    _ => builder.append_null(),
2963                }
2964            }
2965            Ok(Arc::new(builder.finish()))
2966        }
2967        DataType::UInt64 => {
2968            build_numeric_column!(
2969                vids,
2970                props_map,
2971                prop_name,
2972                UInt64Builder,
2973                |v: &Value| v.as_u64(),
2974                |v| v
2975            )
2976        }
2977        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2978            // Vector properties: FixedSizeList(Float32, N)
2979            let values_builder = Float32Builder::new();
2980            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2981            for vid in vids {
2982                match get_property_value(vid, props_map, prop_name) {
2983                    Some(Value::Vector(v)) => {
2984                        for val in v {
2985                            list_builder.values().append_value(val);
2986                        }
2987                        list_builder.append(true);
2988                    }
2989                    Some(Value::List(arr)) => {
2990                        for v in arr {
2991                            list_builder
2992                                .values()
2993                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
2994                        }
2995                        list_builder.append(true);
2996                    }
2997                    _ => {
2998                        // Append dim nulls to inner values, then mark row as null
2999                        for _ in 0..*dim {
3000                            list_builder.values().append_null();
3001                        }
3002                        list_builder.append(false);
3003                    }
3004                }
3005            }
3006            Ok(Arc::new(list_builder.finish()))
3007        }
3008        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3009            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
3010            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3011            for vid in vids {
3012                match get_property_value(vid, props_map, prop_name) {
3013                    Some(Value::Temporal(tv)) => match tv {
3014                        uni_common::TemporalValue::DateTime {
3015                            nanos_since_epoch, ..
3016                        }
3017                        | uni_common::TemporalValue::LocalDateTime {
3018                            nanos_since_epoch, ..
3019                        } => {
3020                            builder.append_value(nanos_since_epoch);
3021                        }
3022                        uni_common::TemporalValue::Date { days_since_epoch } => {
3023                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3024                        }
3025                        _ => builder.append_null(),
3026                    },
3027                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
3028                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3029                        Err(_) => builder.append_null(),
3030                    },
3031                    Some(Value::Int(n)) => {
3032                        builder.append_value(n);
3033                    }
3034                    _ => builder.append_null(),
3035                }
3036            }
3037            Ok(Arc::new(builder.finish()))
3038        }
3039        DataType::Date32 => {
3040            let mut builder = Date32Builder::new();
3041            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3042            for vid in vids {
3043                match get_property_value(vid, props_map, prop_name) {
3044                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3045                        builder.append_value(days_since_epoch);
3046                    }
3047                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3048                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3049                        Err(_) => builder.append_null(),
3050                    },
3051                    Some(Value::Int(n)) => {
3052                        builder.append_value(n as i32);
3053                    }
3054                    _ => builder.append_null(),
3055                }
3056            }
3057            Ok(Arc::new(builder.finish()))
3058        }
3059        DataType::Time64(TimeUnit::Nanosecond) => {
3060            let mut builder = Time64NanosecondBuilder::new();
3061            for vid in vids {
3062                match get_property_value(vid, props_map, prop_name) {
3063                    Some(Value::Temporal(
3064                        uni_common::TemporalValue::LocalTime {
3065                            nanos_since_midnight,
3066                        }
3067                        | uni_common::TemporalValue::Time {
3068                            nanos_since_midnight,
3069                            ..
3070                        },
3071                    )) => {
3072                        builder.append_value(nanos_since_midnight);
3073                    }
3074                    Some(Value::Temporal(_)) => builder.append_null(),
3075                    Some(Value::String(s)) => {
3076                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3077                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3078                        {
3079                            Ok(t) => {
3080                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3081                                    + t.nanosecond() as i64;
3082                                builder.append_value(nanos);
3083                            }
3084                            Err(_) => builder.append_null(),
3085                        }
3086                    }
3087                    Some(Value::Int(n)) => {
3088                        builder.append_value(n);
3089                    }
3090                    _ => builder.append_null(),
3091                }
3092            }
3093            Ok(Arc::new(builder.finish()))
3094        }
3095        DataType::Interval(IntervalUnit::MonthDayNano) => {
3096            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3097                Vec::with_capacity(vids.len());
3098            for vid in vids {
3099                match get_property_value(vid, props_map, prop_name) {
3100                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
3101                        months,
3102                        days,
3103                        nanos,
3104                    })) => {
3105                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3106                            months: months as i32,
3107                            days: days as i32,
3108                            nanoseconds: nanos,
3109                        }));
3110                    }
3111                    Some(Value::Int(_n)) => {
3112                        values.push(None);
3113                    }
3114                    _ => values.push(None),
3115                }
3116            }
3117            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3118            Ok(Arc::new(arr))
3119        }
3120        DataType::List(inner_field) => {
3121            build_list_property_column(vids, props_map, prop_name, inner_field)
3122        }
3123        DataType::Struct(fields) => {
3124            build_struct_property_column(vids, props_map, prop_name, fields)
3125        }
3126        DataType::FixedSizeBinary(24) => {
3127            // BTIC temporal interval columns: encode as FixedSizeBinary(24)
3128            use arrow_array::builder::FixedSizeBinaryBuilder;
3129            const BTIC_LEN: i32 = 24;
3130            let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3131            for vid in vids {
3132                match get_property_value(vid, props_map, prop_name) {
3133                    Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3134                        match uni_btic::Btic::new(lo, hi, meta) {
3135                            Ok(b) => {
3136                                builder
3137                                    .append_value(uni_btic::encode::encode(&b))
3138                                    .map_err(arrow_err)?;
3139                            }
3140                            Err(e) => {
3141                                tracing::warn!(
3142                                    "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3143                                    prop_name,
3144                                    lo,
3145                                    hi,
3146                                    meta,
3147                                    e
3148                                );
3149                                builder.append_null()
3150                            }
3151                        }
3152                    }
3153                    Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3154                        Ok(b) => {
3155                            builder
3156                                .append_value(uni_btic::encode::encode(&b))
3157                                .map_err(arrow_err)?;
3158                        }
3159                        Err(e) => {
3160                            tracing::warn!(
3161                                "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3162                                prop_name,
3163                                s,
3164                                e
3165                            );
3166                            builder.append_null()
3167                        }
3168                    },
3169                    _ => builder.append_null(),
3170                }
3171            }
3172            Ok(Arc::new(builder.finish()))
3173        }
3174        // Default: convert to string
3175        _ => {
3176            let mut builder = StringBuilder::new();
3177            for vid in vids {
3178                match get_property_value(vid, props_map, prop_name) {
3179                    Some(Value::Null) | None => builder.append_null(),
3180                    Some(other) => builder.append_value(other.to_string()),
3181                }
3182            }
3183            Ok(Arc::new(builder.finish()))
3184        }
3185    }
3186}
3187
3188/// Build a List-typed Arrow column from list property values.
3189fn build_list_property_column(
3190    vids: &[Vid],
3191    props_map: &HashMap<Vid, Properties>,
3192    prop_name: &str,
3193    inner_field: &Arc<Field>,
3194) -> DFResult<ArrayRef> {
3195    match inner_field.data_type() {
3196        DataType::Utf8 => {
3197            let mut builder = ListBuilder::new(StringBuilder::new());
3198            for vid in vids {
3199                match get_property_value(vid, props_map, prop_name) {
3200                    Some(Value::List(arr)) => {
3201                        for v in arr {
3202                            match v {
3203                                Value::String(s) => builder.values().append_value(s),
3204                                Value::Null => builder.values().append_null(),
3205                                other => builder.values().append_value(format!("{other:?}")),
3206                            }
3207                        }
3208                        builder.append(true);
3209                    }
3210                    _ => builder.append(false),
3211                }
3212            }
3213            Ok(Arc::new(builder.finish()))
3214        }
3215        DataType::Int64 => {
3216            let mut builder = ListBuilder::new(Int64Builder::new());
3217            for vid in vids {
3218                match get_property_value(vid, props_map, prop_name) {
3219                    Some(Value::List(arr)) => {
3220                        for v in arr {
3221                            match v.as_i64() {
3222                                Some(n) => builder.values().append_value(n),
3223                                None => builder.values().append_null(),
3224                            }
3225                        }
3226                        builder.append(true);
3227                    }
3228                    _ => builder.append(false),
3229                }
3230            }
3231            Ok(Arc::new(builder.finish()))
3232        }
3233        DataType::Float64 => {
3234            let mut builder = ListBuilder::new(Float64Builder::new());
3235            for vid in vids {
3236                match get_property_value(vid, props_map, prop_name) {
3237                    Some(Value::List(arr)) => {
3238                        for v in arr {
3239                            match v.as_f64() {
3240                                Some(n) => builder.values().append_value(n),
3241                                None => builder.values().append_null(),
3242                            }
3243                        }
3244                        builder.append(true);
3245                    }
3246                    _ => builder.append(false),
3247                }
3248            }
3249            Ok(Arc::new(builder.finish()))
3250        }
3251        DataType::Boolean => {
3252            let mut builder = ListBuilder::new(BooleanBuilder::new());
3253            for vid in vids {
3254                match get_property_value(vid, props_map, prop_name) {
3255                    Some(Value::List(arr)) => {
3256                        for v in arr {
3257                            match v.as_bool() {
3258                                Some(b) => builder.values().append_value(b),
3259                                None => builder.values().append_null(),
3260                            }
3261                        }
3262                        builder.append(true);
3263                    }
3264                    _ => builder.append(false),
3265                }
3266            }
3267            Ok(Arc::new(builder.finish()))
3268        }
3269        DataType::Struct(fields) => {
3270            // Map types are List(Struct(key, value)) — build struct inner elements
3271            build_list_of_structs_column(vids, props_map, prop_name, fields)
3272        }
3273        // Fallback: serialize inner elements as strings
3274        _ => {
3275            let mut builder = ListBuilder::new(StringBuilder::new());
3276            for vid in vids {
3277                match get_property_value(vid, props_map, prop_name) {
3278                    Some(Value::List(arr)) => {
3279                        for v in arr {
3280                            match v {
3281                                Value::Null => builder.values().append_null(),
3282                                other => builder.values().append_value(format!("{other:?}")),
3283                            }
3284                        }
3285                        builder.append(true);
3286                    }
3287                    _ => builder.append(false),
3288                }
3289            }
3290            Ok(Arc::new(builder.finish()))
3291        }
3292    }
3293}
3294
3295/// Build a List(Struct(...)) column, used for Map-type properties.
3296///
3297/// Handles two value representations:
3298/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
3299/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
3300fn build_list_of_structs_column(
3301    vids: &[Vid],
3302    props_map: &HashMap<Vid, Properties>,
3303    prop_name: &str,
3304    fields: &Fields,
3305) -> DFResult<ArrayRef> {
3306    use arrow_array::StructArray;
3307
3308    let values: Vec<Option<Value>> = vids
3309        .iter()
3310        .map(|vid| get_property_value(vid, props_map, prop_name))
3311        .collect();
3312
3313    // Convert each row's value to an owned Vec of Maps (key-value pairs).
3314    // This normalizes both List-of-maps and Map representations.
3315    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3316        .iter()
3317        .map(|val| match val {
3318            Some(Value::List(arr)) => {
3319                let objs: Vec<HashMap<String, Value>> = arr
3320                    .iter()
3321                    .filter_map(|v| {
3322                        if let Value::Map(m) = v {
3323                            Some(m.clone())
3324                        } else {
3325                            None
3326                        }
3327                    })
3328                    .collect();
3329                if objs.is_empty() { None } else { Some(objs) }
3330            }
3331            Some(Value::Map(obj)) => {
3332                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
3333                let kv_pairs: Vec<HashMap<String, Value>> = obj
3334                    .iter()
3335                    .map(|(k, v)| {
3336                        let mut m = HashMap::new();
3337                        m.insert("key".to_string(), Value::String(k.clone()));
3338                        m.insert("value".to_string(), v.clone());
3339                        m
3340                    })
3341                    .collect();
3342                Some(kv_pairs)
3343            }
3344            _ => None,
3345        })
3346        .collect();
3347
3348    let total_items: usize = rows
3349        .iter()
3350        .filter_map(|r| r.as_ref())
3351        .map(|v| v.len())
3352        .sum();
3353
3354    // Build child arrays for each field in the struct
3355    let child_arrays: Vec<ArrayRef> = fields
3356        .iter()
3357        .map(|field| {
3358            let field_name = field.name();
3359            match field.data_type() {
3360                DataType::Utf8 => {
3361                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3362                    for obj in rows.iter().flatten().flatten() {
3363                        match obj.get(field_name) {
3364                            Some(Value::String(s)) => builder.append_value(s),
3365                            Some(Value::Null) | None => builder.append_null(),
3366                            Some(other) => builder.append_value(format!("{other:?}")),
3367                        }
3368                    }
3369                    Arc::new(builder.finish()) as ArrayRef
3370                }
3371                DataType::Int64 => {
3372                    let mut builder = Int64Builder::with_capacity(total_items);
3373                    for obj in rows.iter().flatten().flatten() {
3374                        match obj.get(field_name).and_then(|v| v.as_i64()) {
3375                            Some(n) => builder.append_value(n),
3376                            None => builder.append_null(),
3377                        }
3378                    }
3379                    Arc::new(builder.finish()) as ArrayRef
3380                }
3381                DataType::Float64 => {
3382                    let mut builder = Float64Builder::with_capacity(total_items);
3383                    for obj in rows.iter().flatten().flatten() {
3384                        match obj.get(field_name).and_then(|v| v.as_f64()) {
3385                            Some(n) => builder.append_value(n),
3386                            None => builder.append_null(),
3387                        }
3388                    }
3389                    Arc::new(builder.finish()) as ArrayRef
3390                }
3391                // Fallback: serialize as string
3392                _ => {
3393                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3394                    for obj in rows.iter().flatten().flatten() {
3395                        match obj.get(field_name) {
3396                            Some(Value::Null) | None => builder.append_null(),
3397                            Some(other) => builder.append_value(format!("{other:?}")),
3398                        }
3399                    }
3400                    Arc::new(builder.finish()) as ArrayRef
3401                }
3402            }
3403        })
3404        .collect();
3405
3406    // Build struct array from children
3407    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3408        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3409
3410    // Build list offsets
3411    let mut offsets = Vec::with_capacity(vids.len() + 1);
3412    let mut nulls = Vec::with_capacity(vids.len());
3413    let mut offset = 0i32;
3414    offsets.push(offset);
3415    for row in &rows {
3416        match row {
3417            Some(objs) => {
3418                offset += objs.len() as i32;
3419                offsets.push(offset);
3420                nulls.push(true);
3421            }
3422            None => {
3423                offsets.push(offset);
3424                nulls.push(false);
3425            }
3426        }
3427    }
3428
3429    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3430    let list_array = arrow_array::ListArray::try_new(
3431        list_field,
3432        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3433        Arc::new(struct_array),
3434        Some(arrow::buffer::NullBuffer::from(nulls)),
3435    )
3436    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3437
3438    Ok(Arc::new(list_array))
3439}
3440
3441/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
3442/// so that `build_struct_property_column` can extract fields uniformly.
3443fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3444    use uni_common::value::TemporalValue;
3445    let mut m = HashMap::new();
3446    match tv {
3447        TemporalValue::DateTime {
3448            nanos_since_epoch,
3449            offset_seconds,
3450            timezone_name,
3451        } => {
3452            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3453            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3454            if let Some(tz) = timezone_name {
3455                m.insert("timezone_name".into(), Value::String(tz.clone()));
3456            }
3457        }
3458        TemporalValue::LocalDateTime { nanos_since_epoch } => {
3459            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3460        }
3461        TemporalValue::Time {
3462            nanos_since_midnight,
3463            offset_seconds,
3464        } => {
3465            m.insert(
3466                "nanos_since_midnight".into(),
3467                Value::Int(*nanos_since_midnight),
3468            );
3469            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3470        }
3471        TemporalValue::LocalTime {
3472            nanos_since_midnight,
3473        } => {
3474            m.insert(
3475                "nanos_since_midnight".into(),
3476                Value::Int(*nanos_since_midnight),
3477            );
3478        }
3479        TemporalValue::Date { days_since_epoch } => {
3480            m.insert(
3481                "days_since_epoch".into(),
3482                Value::Int(*days_since_epoch as i64),
3483            );
3484        }
3485        TemporalValue::Duration {
3486            months,
3487            days,
3488            nanos,
3489        } => {
3490            m.insert("months".into(), Value::Int(*months));
3491            m.insert("days".into(), Value::Int(*days));
3492            m.insert("nanos".into(), Value::Int(*nanos));
3493        }
3494        TemporalValue::Btic { lo, hi, meta } => {
3495            m.insert("lo".into(), Value::Int(*lo));
3496            m.insert("hi".into(), Value::Int(*hi));
3497            m.insert("meta".into(), Value::Int(*meta as i64));
3498        }
3499    }
3500    m
3501}
3502
3503/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
3504fn build_struct_property_column(
3505    vids: &[Vid],
3506    props_map: &HashMap<Vid, Properties>,
3507    prop_name: &str,
3508    fields: &Fields,
3509) -> DFResult<ArrayRef> {
3510    use arrow_array::StructArray;
3511
3512    // Convert raw values, expanding Temporal values into Map representation
3513    // so the struct field extraction below works uniformly.
3514    let values: Vec<Option<Value>> = vids
3515        .iter()
3516        .map(|vid| {
3517            let val = get_property_value(vid, props_map, prop_name);
3518            match val {
3519                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3520                other => other,
3521            }
3522        })
3523        .collect();
3524
3525    let child_arrays: Vec<ArrayRef> = fields
3526        .iter()
3527        .map(|field| {
3528            let field_name = field.name();
3529            match field.data_type() {
3530                DataType::Float64 => {
3531                    let mut builder = Float64Builder::with_capacity(vids.len());
3532                    for val in &values {
3533                        match val {
3534                            Some(Value::Map(obj)) => {
3535                                match obj.get(field_name).and_then(|v| v.as_f64()) {
3536                                    Some(n) => builder.append_value(n),
3537                                    None => builder.append_null(),
3538                                }
3539                            }
3540                            _ => builder.append_null(),
3541                        }
3542                    }
3543                    Arc::new(builder.finish()) as ArrayRef
3544                }
3545                DataType::Utf8 => {
3546                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3547                    for val in &values {
3548                        match val {
3549                            Some(Value::Map(obj)) => match obj.get(field_name) {
3550                                Some(Value::String(s)) => builder.append_value(s),
3551                                Some(Value::Null) | None => builder.append_null(),
3552                                Some(other) => builder.append_value(format!("{other:?}")),
3553                            },
3554                            _ => builder.append_null(),
3555                        }
3556                    }
3557                    Arc::new(builder.finish()) as ArrayRef
3558                }
3559                DataType::Int64 => {
3560                    let mut builder = Int64Builder::with_capacity(vids.len());
3561                    for val in &values {
3562                        match val {
3563                            Some(Value::Map(obj)) => {
3564                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3565                                    Some(n) => builder.append_value(n),
3566                                    None => builder.append_null(),
3567                                }
3568                            }
3569                            _ => builder.append_null(),
3570                        }
3571                    }
3572                    Arc::new(builder.finish()) as ArrayRef
3573                }
3574                DataType::Timestamp(_, _) => {
3575                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3576                    for val in &values {
3577                        match val {
3578                            Some(Value::Map(obj)) => {
3579                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3580                                    Some(n) => builder.append_value(n),
3581                                    None => builder.append_null(),
3582                                }
3583                            }
3584                            _ => builder.append_null(),
3585                        }
3586                    }
3587                    Arc::new(builder.finish()) as ArrayRef
3588                }
3589                DataType::Int32 => {
3590                    let mut builder = Int32Builder::with_capacity(vids.len());
3591                    for val in &values {
3592                        match val {
3593                            Some(Value::Map(obj)) => {
3594                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3595                                    Some(n) => builder.append_value(n as i32),
3596                                    None => builder.append_null(),
3597                                }
3598                            }
3599                            _ => builder.append_null(),
3600                        }
3601                    }
3602                    Arc::new(builder.finish()) as ArrayRef
3603                }
3604                DataType::Time64(_) => {
3605                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3606                    for val in &values {
3607                        match val {
3608                            Some(Value::Map(obj)) => {
3609                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3610                                    Some(n) => builder.append_value(n),
3611                                    None => builder.append_null(),
3612                                }
3613                            }
3614                            _ => builder.append_null(),
3615                        }
3616                    }
3617                    Arc::new(builder.finish()) as ArrayRef
3618                }
3619                // Fallback: serialize as string
3620                _ => {
3621                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3622                    for val in &values {
3623                        match val {
3624                            Some(Value::Map(obj)) => match obj.get(field_name) {
3625                                Some(Value::Null) | None => builder.append_null(),
3626                                Some(other) => builder.append_value(format!("{other:?}")),
3627                            },
3628                            _ => builder.append_null(),
3629                        }
3630                    }
3631                    Arc::new(builder.finish()) as ArrayRef
3632                }
3633            }
3634        })
3635        .collect();
3636
3637    // Build null bitmap — null when the value is null/missing
3638    let nulls: Vec<bool> = values
3639        .iter()
3640        .map(|v| matches!(v, Some(Value::Map(_))))
3641        .collect();
3642
3643    let struct_array = StructArray::try_new(
3644        fields.clone(),
3645        child_arrays,
3646        Some(arrow::buffer::NullBuffer::from(nulls)),
3647    )
3648    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3649
3650    Ok(Arc::new(struct_array))
3651}
3652
3653impl Stream for GraphScanStream {
3654    type Item = DFResult<RecordBatch>;
3655
3656    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3657        let metrics = self.metrics.clone();
3658        let _timer = metrics.elapsed_compute().timer();
3659        loop {
3660            // Use a temporary to avoid borrow issues
3661            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3662
3663            match state {
3664                GraphScanState::Init => {
3665                    // Create the future with cloned data for ownership
3666                    let graph_ctx = self.graph_ctx.clone();
3667                    let label = self.label.clone();
3668                    let variable = self.variable.clone();
3669                    let properties = self.properties.clone();
3670                    let is_edge_scan = self.is_edge_scan;
3671                    let is_schemaless = self.is_schemaless;
3672                    let filter = self.filter.clone();
3673                    let vid_list_filter = self.vid_list_filter.clone();
3674                    let extra_lance_filter = self.extra_lance_filter.clone();
3675                    let extra_runtime_filter = self.extra_runtime_filter.clone();
3676                    let schema = self.schema.clone();
3677
3678                    let fut = async move {
3679                        graph_ctx.check_timeout().map_err(|e| {
3680                            datafusion::error::DataFusionError::Execution(e.to_string())
3681                        })?;
3682
3683                        let batch = if is_edge_scan {
3684                            columnar_scan_edge_batch_static(
3685                                &graph_ctx,
3686                                &label,
3687                                &variable,
3688                                &properties,
3689                                &schema,
3690                            )
3691                            .await?
3692                        } else if is_schemaless {
3693                            columnar_scan_schemaless_vertex_batch_static(
3694                                &graph_ctx,
3695                                &label,
3696                                &variable,
3697                                &properties,
3698                                &schema,
3699                                &filter,
3700                                vid_list_filter.as_deref(),
3701                                extra_lance_filter.as_deref(),
3702                                extra_runtime_filter.as_ref(),
3703                            )
3704                            .await?
3705                        } else {
3706                            columnar_scan_vertex_batch_static(
3707                                &graph_ctx,
3708                                &label,
3709                                &variable,
3710                                &properties,
3711                                &schema,
3712                                &filter,
3713                                vid_list_filter.as_deref(),
3714                                extra_lance_filter.as_deref(),
3715                                extra_runtime_filter.as_ref(),
3716                            )
3717                            .await?
3718                        };
3719                        Ok(Some(batch))
3720                    };
3721
3722                    self.state = GraphScanState::Executing(Box::pin(fut));
3723                    // Continue loop to poll the future
3724                }
3725                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3726                    Poll::Ready(Ok(batch)) => {
3727                        self.state = GraphScanState::Done;
3728                        self.metrics
3729                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3730                        return Poll::Ready(batch.map(Ok));
3731                    }
3732                    Poll::Ready(Err(e)) => {
3733                        self.state = GraphScanState::Done;
3734                        return Poll::Ready(Some(Err(e)));
3735                    }
3736                    Poll::Pending => {
3737                        self.state = GraphScanState::Executing(fut);
3738                        return Poll::Pending;
3739                    }
3740                },
3741                GraphScanState::Done => {
3742                    return Poll::Ready(None);
3743                }
3744            }
3745        }
3746    }
3747}
3748
3749impl RecordBatchStream for GraphScanStream {
3750    fn schema(&self) -> SchemaRef {
3751        self.schema.clone()
3752    }
3753}
3754
3755#[cfg(test)]
3756mod tests {
3757    use super::*;
3758
3759    #[test]
3760    fn test_build_vertex_schema() {
3761        let uni_schema = UniSchema::default();
3762        let schema = GraphScanExec::build_vertex_schema(
3763            "n",
3764            "Person",
3765            &["name".to_string(), "age".to_string()],
3766            &uni_schema,
3767        );
3768
3769        assert_eq!(schema.fields().len(), 4);
3770        assert_eq!(schema.field(0).name(), "n._vid");
3771        assert_eq!(schema.field(1).name(), "n._labels");
3772        assert_eq!(schema.field(2).name(), "n.name");
3773        assert_eq!(schema.field(3).name(), "n.age");
3774    }
3775
3776    #[test]
3777    fn test_build_edge_schema() {
3778        let uni_schema = UniSchema::default();
3779        let schema =
3780            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3781
3782        assert_eq!(schema.fields().len(), 4);
3783        assert_eq!(schema.field(0).name(), "r._eid");
3784        assert_eq!(schema.field(1).name(), "r._src_vid");
3785        assert_eq!(schema.field(2).name(), "r._dst_vid");
3786        assert_eq!(schema.field(3).name(), "r.weight");
3787    }
3788
3789    #[test]
3790    fn test_build_schemaless_vertex_schema() {
3791        let empty_schema = uni_common::core::schema::Schema::default();
3792        let schema = GraphScanExec::build_schemaless_vertex_schema(
3793            "n",
3794            &["name".to_string(), "age".to_string()],
3795            &empty_schema,
3796        );
3797
3798        assert_eq!(schema.fields().len(), 4);
3799        assert_eq!(schema.field(0).name(), "n._vid");
3800        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3801        assert_eq!(schema.field(1).name(), "n._labels");
3802        assert_eq!(schema.field(2).name(), "n.name");
3803        // With empty schema, falls back to LargeBinary
3804        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3805        assert_eq!(schema.field(3).name(), "n.age");
3806        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3807    }
3808
3809    #[test]
3810    fn test_schemaless_all_scan_has_empty_label() {
3811        let empty_schema = uni_common::core::schema::Schema::default();
3812        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3813
3814        // Verify the schema has _vid and _labels columns for a scan with no properties
3815        assert_eq!(schema.fields().len(), 2);
3816        assert_eq!(schema.field(0).name(), "n._vid");
3817        assert_eq!(schema.field(1).name(), "n._labels");
3818    }
3819
3820    #[test]
3821    fn test_cypher_value_all_props_extraction() {
3822        // Simulate _all_props encoding using encode_cypher_value helper
3823        let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3824        let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3825
3826        // Decode and extract "age" value
3827        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3828        match decoded {
3829            uni_common::Value::Map(map) => {
3830                let age_val = map.get("age").unwrap();
3831                assert_eq!(age_val, &uni_common::Value::Int(30));
3832            }
3833            _ => panic!("Expected Map"),
3834        }
3835
3836        // Also test single value encoding
3837        let single_val = serde_json::json!(30);
3838        let single_bytes = encode_cypher_value(&single_val).unwrap();
3839        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3840        assert_eq!(single_decoded, uni_common::Value::Int(30));
3841    }
3842
3843    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3844    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3845        let schema = Arc::new(Schema::new(vec![
3846            Field::new("_vid", DataType::UInt64, false),
3847            Field::new("_deleted", DataType::Boolean, false),
3848            Field::new("_version", DataType::UInt64, false),
3849            Field::new("name", DataType::Utf8, true),
3850        ]));
3851        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3852        let names: Vec<String> = vids
3853            .iter()
3854            .zip(versions.iter())
3855            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3856            .collect();
3857        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3858
3859        RecordBatch::try_new(
3860            schema,
3861            vec![
3862                Arc::new(UInt64Array::from(vids.to_vec())),
3863                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3864                Arc::new(UInt64Array::from(versions.to_vec())),
3865                Arc::new(name_arr),
3866            ],
3867        )
3868        .unwrap()
3869    }
3870
3871    #[test]
3872    fn test_mvcc_dedup_multiple_versions() {
3873        // VID 1 at versions 3, 1, 5 — should keep version 5
3874        // VID 2 at versions 2, 4 — should keep version 4
3875        let batch = make_mvcc_batch(
3876            &[1, 1, 1, 2, 2],
3877            &[3, 1, 5, 2, 4],
3878            &[false, false, false, false, false],
3879        );
3880
3881        let result = mvcc_dedup_batch(&batch).unwrap();
3882        assert_eq!(result.num_rows(), 2);
3883
3884        let vid_col = result
3885            .column_by_name("_vid")
3886            .unwrap()
3887            .as_any()
3888            .downcast_ref::<UInt64Array>()
3889            .unwrap();
3890        let ver_col = result
3891            .column_by_name("_version")
3892            .unwrap()
3893            .as_any()
3894            .downcast_ref::<UInt64Array>()
3895            .unwrap();
3896        let name_col = result
3897            .column_by_name("name")
3898            .unwrap()
3899            .as_any()
3900            .downcast_ref::<arrow_array::StringArray>()
3901            .unwrap();
3902
3903        // VID 1 → version 5, VID 2 → version 4
3904        assert_eq!(vid_col.value(0), 1);
3905        assert_eq!(ver_col.value(0), 5);
3906        assert_eq!(name_col.value(0), "v1_ver5");
3907
3908        assert_eq!(vid_col.value(1), 2);
3909        assert_eq!(ver_col.value(1), 4);
3910        assert_eq!(name_col.value(1), "v2_ver4");
3911    }
3912
3913    #[test]
3914    fn test_mvcc_dedup_single_rows() {
3915        // Each VID appears once — nothing should change
3916        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3917        let result = mvcc_dedup_batch(&batch).unwrap();
3918        assert_eq!(result.num_rows(), 3);
3919    }
3920
3921    #[test]
3922    fn test_mvcc_dedup_empty() {
3923        let batch = make_mvcc_batch(&[], &[], &[]);
3924        let result = mvcc_dedup_batch(&batch).unwrap();
3925        assert_eq!(result.num_rows(), 0);
3926    }
3927
3928    #[test]
3929    fn test_filter_l0_tombstones_removes_tombstoned() {
3930        use crate::query::df_graph::L0Context;
3931
3932        // Create a batch with VIDs 1, 2, 3
3933        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3934
3935        // Create L0 context with VID 2 tombstoned
3936        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3937        {
3938            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
3939            // But we can't easily create one with tombstones through the constructor.
3940            // Use a direct approach.
3941        }
3942        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3943        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3944
3945        let l0_ctx = L0Context {
3946            current_l0: Some(l0_buf),
3947            transaction_l0: None,
3948            pending_flush_l0s: vec![],
3949        };
3950
3951        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3952        assert_eq!(result.num_rows(), 2);
3953
3954        let vid_col = result
3955            .column_by_name("_vid")
3956            .unwrap()
3957            .as_any()
3958            .downcast_ref::<UInt64Array>()
3959            .unwrap();
3960        assert_eq!(vid_col.value(0), 1);
3961        assert_eq!(vid_col.value(1), 3);
3962    }
3963
3964    #[test]
3965    fn test_filter_l0_tombstones_none() {
3966        use crate::query::df_graph::L0Context;
3967
3968        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3969        let l0_ctx = L0Context::default();
3970
3971        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3972        assert_eq!(result.num_rows(), 3);
3973    }
3974
3975    #[test]
3976    fn test_map_to_output_schema_basic() {
3977        use crate::query::df_graph::L0Context;
3978
3979        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
3980        let lance_schema = Arc::new(Schema::new(vec![
3981            Field::new("_vid", DataType::UInt64, false),
3982            Field::new("_deleted", DataType::Boolean, false),
3983            Field::new("_version", DataType::UInt64, false),
3984            Field::new("name", DataType::Utf8, true),
3985        ]));
3986        let name_arr: arrow_array::StringArray =
3987            vec![Some("Alice"), Some("Bob")].into_iter().collect();
3988        let batch = RecordBatch::try_new(
3989            lance_schema,
3990            vec![
3991                Arc::new(UInt64Array::from(vec![1u64, 2])),
3992                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3993                Arc::new(UInt64Array::from(vec![1u64, 1])),
3994                Arc::new(name_arr),
3995            ],
3996        )
3997        .unwrap();
3998
3999        // Output schema: n._vid, n._labels, n.name
4000        let output_schema = Arc::new(Schema::new(vec![
4001            Field::new("n._vid", DataType::UInt64, false),
4002            Field::new("n._labels", labels_data_type(), true),
4003            Field::new("n.name", DataType::Utf8, true),
4004        ]));
4005
4006        let l0_ctx = L0Context::default();
4007        let result = map_to_output_schema(
4008            &batch,
4009            "Person",
4010            "n",
4011            &["name".to_string()],
4012            &output_schema,
4013            &l0_ctx,
4014        )
4015        .unwrap();
4016
4017        assert_eq!(result.num_rows(), 2);
4018        assert_eq!(result.schema().fields().len(), 3);
4019        assert_eq!(result.schema().field(0).name(), "n._vid");
4020        assert_eq!(result.schema().field(1).name(), "n._labels");
4021        assert_eq!(result.schema().field(2).name(), "n.name");
4022
4023        // Check name values carried through
4024        let name_col = result
4025            .column(2)
4026            .as_any()
4027            .downcast_ref::<arrow_array::StringArray>()
4028            .unwrap();
4029        assert_eq!(name_col.value(0), "Alice");
4030        assert_eq!(name_col.value(1), "Bob");
4031    }
4032}