Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down (used for L0 short-circuit and
89    /// single-VID Lance pushdown). For multi-VID IN-list pushdown, use
90    /// `vid_list_filter` — see issue #55 PR #4.
91    filter: Option<Arc<dyn PhysicalExpr>>,
92
93    /// Multi-VID IN-list filter to push to Lance as `_vid IN (v1, v2, ...)`.
94    /// Set when the planner has resolved a static set of vids from an
95    /// `Expr::In { Property(_, "_vid"), List }` predicate. Bypasses the
96    /// PhysicalExpr roundtrip used by `filter`. See issue #55 PR #4.
97    vid_list_filter: Option<Vec<u64>>,
98
99    /// Pre-rendered Lance filter string for indexed-property pushdown
100    /// (e.g. `name = 'foo'`). AND-combined with the VID filter at scan time.
101    /// Populated by the planner when an indexed-property equality / IN
102    /// predicate is detected — Lance turns it into a hash-index lookup.
103    /// See issue #57.
104    extra_lance_filter: Option<String>,
105
106    /// Arrow-side equivalent of `extra_lance_filter`, applied to the merged
107    /// (Lance + L0) batch in-process so the scan output reflects only
108    /// matching rows even when data is still in L0 (Lance pushdown alone
109    /// can't reach uncommitted/unflushed rows). See issue #57.
110    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112    /// Whether this is an edge scan (vs vertex scan).
113    is_edge_scan: bool,
114
115    /// Whether this is a schemaless scan (uses main table instead of per-label table).
116    is_schemaless: bool,
117
118    /// Output schema with materialized property columns.
119    schema: SchemaRef,
120
121    /// Cached plan properties.
122    properties: Arc<PlanProperties>,
123
124    /// Metrics for execution tracking.
125    metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("GraphScanExec")
131            .field("label", &self.label)
132            .field("variable", &self.variable)
133            .field("projected_properties", &self.projected_properties)
134            .field(
135                "vid_list_filter_len",
136                &self.vid_list_filter.as_ref().map(Vec::len),
137            )
138            .field("is_edge_scan", &self.is_edge_scan)
139            .finish()
140    }
141}
142
143impl GraphScanExec {
144    /// Attach a multi-VID IN-list filter to this scan, pushing
145    /// `_vid IN (v1, v2, ...)` to Lance at execute time. Use this for
146    /// pre-resolved vid sets (e.g. from `UNWIND $list AS e WHERE id(x)=e.field`).
147    /// See issue #55 PR #4.
148    pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149        self.vid_list_filter = Some(vids);
150        self
151    }
152
153    /// Attach a pre-rendered Lance filter string for indexed-property
154    /// pushdown. AND-combined with any VID filter at scan time. See
155    /// issue #57.
156    pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157        self.extra_lance_filter = Some(filter);
158        self
159    }
160
161    /// Attach the Arrow-side counterpart of `extra_lance_filter`. Applied to
162    /// the merged (Lance + L0) batch so the scan output is index-bound even
163    /// for not-yet-flushed L0 rows. See issue #57.
164    pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165        self.extra_runtime_filter = Some(filter);
166        self
167    }
168
169    /// Whether this scan has an extra Lance filter pushed in. Used by the
170    /// EXPLAIN/IndexUsage path to confirm the planner actually pushed.
171    pub fn has_extra_lance_filter(&self) -> bool {
172        self.extra_lance_filter.is_some()
173    }
174
175    /// Execute this scan once with a runtime-supplied list of VIDs as the
176    /// pushdown filter (`_vid IN (v1, v2, ...)`). Returns a single merged
177    /// `RecordBatch`. Used by `VidLookupJoinExec` (issue #55 PR #5) for
178    /// cross-MATCH dynamic pushdown — the build side materializes its keys at
179    /// runtime, then the probe scan runs once with those keys.
180    ///
181    /// Only supported for vertex and schemaless-vertex scans; edge scans
182    /// have a different shape and aren't currently a join target for this
183    /// optimization.
184    pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185        if self.is_edge_scan {
186            return Err(datafusion::error::DataFusionError::Plan(
187                "execute_with_vid_filter not supported for edge scans".into(),
188            ));
189        }
190        if self.is_schemaless {
191            columnar_scan_schemaless_vertex_batch_static(
192                &self.graph_ctx,
193                &self.label,
194                &self.variable,
195                &self.projected_properties,
196                &self.schema,
197                &self.filter,
198                Some(vids),
199                self.extra_lance_filter.as_deref(),
200                self.extra_runtime_filter.as_ref(),
201            )
202            .await
203        } else {
204            columnar_scan_vertex_batch_static(
205                &self.graph_ctx,
206                &self.label,
207                &self.variable,
208                &self.projected_properties,
209                &self.schema,
210                &self.filter,
211                Some(vids),
212                self.extra_lance_filter.as_deref(),
213                self.extra_runtime_filter.as_ref(),
214            )
215            .await
216        }
217    }
218}
219
220impl GraphScanExec {
221    /// Create a new graph scan for vertices.
222    ///
223    /// Scans all vertices of the given label from storage and L0 buffers,
224    /// then materializes the requested properties.
225    pub fn new_vertex_scan(
226        graph_ctx: Arc<GraphExecutionContext>,
227        label: impl Into<String>,
228        variable: impl Into<String>,
229        projected_properties: Vec<String>,
230        filter: Option<Arc<dyn PhysicalExpr>>,
231    ) -> Self {
232        let label = label.into();
233        let variable = variable.into();
234
235        // Build output schema with proper types from Uni schema
236        let uni_schema = graph_ctx.storage().schema_manager().schema();
237        let schema =
238            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240        let properties = compute_plan_properties(schema.clone());
241
242        Self {
243            graph_ctx,
244            label,
245            variable,
246            projected_properties,
247            filter,
248            vid_list_filter: None,
249            extra_lance_filter: None,
250            extra_runtime_filter: None,
251            is_edge_scan: false,
252            is_schemaless: false,
253            schema,
254            properties,
255            metrics: ExecutionPlanMetricsSet::new(),
256        }
257    }
258
259    /// Create a new schemaless vertex scan.
260    ///
261    /// Scans the main vertices table for vertices with the given label name.
262    /// Properties are extracted from props_json (all treated as Utf8/JSON).
263    /// This is used for labels that aren't in the schema.
264    pub fn new_schemaless_vertex_scan(
265        graph_ctx: Arc<GraphExecutionContext>,
266        label_name: impl Into<String>,
267        variable: impl Into<String>,
268        projected_properties: Vec<String>,
269        filter: Option<Arc<dyn PhysicalExpr>>,
270    ) -> Self {
271        let label = label_name.into();
272        let variable = variable.into();
273
274        // Filter out system columns that are already materialized as dedicated columns
275        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
276        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
277        // they would create duplicate columns with conflicting types.
278        let projected_properties: Vec<String> = projected_properties
279            .into_iter()
280            .filter(|p| p != "_vid" && p != "_labels")
281            .collect();
282
283        let uni_schema = graph_ctx.storage().schema_manager().schema();
284        let schema =
285            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286        let properties = compute_plan_properties(schema.clone());
287
288        Self {
289            graph_ctx,
290            label,
291            variable,
292            projected_properties,
293            filter,
294            vid_list_filter: None,
295            extra_lance_filter: None,
296            extra_runtime_filter: None,
297            is_edge_scan: false,
298            is_schemaless: true,
299            schema,
300            properties,
301            metrics: ExecutionPlanMetricsSet::new(),
302        }
303    }
304
305    /// Create a new multi-label vertex scan using the main vertices table.
306    ///
307    /// Scans for vertices that have ALL specified labels (intersection semantics).
308    /// Properties are extracted from props_json (schemaless).
309    pub fn new_multi_label_vertex_scan(
310        graph_ctx: Arc<GraphExecutionContext>,
311        labels: Vec<String>,
312        variable: impl Into<String>,
313        projected_properties: Vec<String>,
314        filter: Option<Arc<dyn PhysicalExpr>>,
315    ) -> Self {
316        let variable = variable.into();
317        let projected_properties: Vec<String> = projected_properties
318            .into_iter()
319            .filter(|p| p != "_vid" && p != "_labels")
320            .collect();
321        let uni_schema = graph_ctx.storage().schema_manager().schema();
322        let schema =
323            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324        let properties = compute_plan_properties(schema.clone());
325
326        // Encode labels as colon-separated for the stream to parse
327        let encoded_labels = labels.join(":");
328
329        Self {
330            graph_ctx,
331            label: encoded_labels,
332            variable,
333            projected_properties,
334            filter,
335            vid_list_filter: None,
336            extra_lance_filter: None,
337            extra_runtime_filter: None,
338            is_edge_scan: false,
339            is_schemaless: true,
340            schema,
341            properties,
342            metrics: ExecutionPlanMetricsSet::new(),
343        }
344    }
345
346    /// Create a new schemaless scan for all vertices.
347    ///
348    /// Scans the main vertices table for all vertices regardless of label.
349    /// Properties are extracted from props_json with types resolved from the schema.
350    /// This is used for `MATCH (n)` without label filter.
351    pub fn new_schemaless_all_scan(
352        graph_ctx: Arc<GraphExecutionContext>,
353        variable: impl Into<String>,
354        projected_properties: Vec<String>,
355        filter: Option<Arc<dyn PhysicalExpr>>,
356    ) -> Self {
357        let variable = variable.into();
358        let projected_properties: Vec<String> = projected_properties
359            .into_iter()
360            .filter(|p| p != "_vid" && p != "_labels")
361            .collect();
362
363        let uni_schema = graph_ctx.storage().schema_manager().schema();
364        let schema =
365            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366        let properties = compute_plan_properties(schema.clone());
367
368        Self {
369            graph_ctx,
370            label: String::new(), // Empty label signals "scan all vertices"
371            variable,
372            projected_properties,
373            filter,
374            vid_list_filter: None,
375            extra_lance_filter: None,
376            extra_runtime_filter: None,
377            is_edge_scan: false,
378            is_schemaless: true,
379            schema,
380            properties,
381            metrics: ExecutionPlanMetricsSet::new(),
382        }
383    }
384
385    /// Build schema for schemaless vertex scan.
386    ///
387    /// Resolves property types from all labels in the schema. Falls back to
388    /// LargeBinary (CypherValue encoding) for properties not found in any
389    /// label's schema.
390    fn build_schemaless_vertex_schema(
391        variable: &str,
392        properties: &[String],
393        uni_schema: &uni_common::core::schema::Schema,
394    ) -> SchemaRef {
395        // Merge property metadata from all labels for type resolution.
396        let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397            std::collections::HashMap::new();
398        for label_props in uni_schema.properties.values() {
399            for (name, meta) in label_props {
400                merged.entry(name.as_str()).or_insert(meta);
401            }
402        }
403
404        let mut fields = vec![
405            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406            Field::new(format!("{}._labels", variable), labels_data_type(), true),
407        ];
408
409        for prop in properties {
410            let col_name = format!("{}.{}", variable, prop);
411            let uni_type = merged.get(prop.as_str()).map(|meta| &meta.r#type);
412            let arrow_type = uni_type
413                .map(|t| t.to_arrow())
414                .unwrap_or(DataType::LargeBinary);
415            fields.push(property_field(&col_name, arrow_type, uni_type));
416        }
417
418        Arc::new(Schema::new(fields))
419    }
420
421    /// Create a new graph scan for edges.
422    ///
423    /// Scans all edges of the given type from storage and L0 buffers,
424    /// then materializes the requested properties.
425    pub fn new_edge_scan(
426        graph_ctx: Arc<GraphExecutionContext>,
427        edge_type: impl Into<String>,
428        variable: impl Into<String>,
429        projected_properties: Vec<String>,
430        filter: Option<Arc<dyn PhysicalExpr>>,
431    ) -> Self {
432        let label = edge_type.into();
433        let variable = variable.into();
434
435        // Build output schema with proper types from Uni schema
436        let uni_schema = graph_ctx.storage().schema_manager().schema();
437        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439        let properties = compute_plan_properties(schema.clone());
440
441        Self {
442            graph_ctx,
443            label,
444            variable,
445            projected_properties,
446            filter,
447            vid_list_filter: None,
448            extra_lance_filter: None,
449            extra_runtime_filter: None,
450            is_edge_scan: true,
451            is_schemaless: false,
452            schema,
453            properties,
454            metrics: ExecutionPlanMetricsSet::new(),
455        }
456    }
457
458    /// Build output schema for vertex scan with proper Arrow types.
459    fn build_vertex_schema(
460        variable: &str,
461        label: &str,
462        properties: &[String],
463        uni_schema: &UniSchema,
464    ) -> SchemaRef {
465        let mut fields = vec![
466            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467            Field::new(format!("{}._labels", variable), labels_data_type(), true),
468        ];
469        let label_props = uni_schema.properties.get(label);
470        for prop in properties {
471            let col_name = format!("{}.{}", variable, prop);
472            let arrow_type = resolve_property_type(prop, label_props);
473            let uni_type = label_props
474                .and_then(|props| props.get(prop))
475                .map(|m| &m.r#type);
476            fields.push(property_field(&col_name, arrow_type, uni_type));
477        }
478        Arc::new(Schema::new(fields))
479    }
480
481    /// Build output schema for edge scan with proper Arrow types.
482    fn build_edge_schema(
483        variable: &str,
484        edge_type: &str,
485        properties: &[String],
486        uni_schema: &UniSchema,
487    ) -> SchemaRef {
488        let mut fields = vec![
489            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
490            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
491            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
492        ];
493        let edge_props = uni_schema.properties.get(edge_type);
494        for prop in properties {
495            let col_name = format!("{}.{}", variable, prop);
496            let arrow_type = resolve_property_type(prop, edge_props);
497            let uni_type = edge_props
498                .and_then(|props| props.get(prop))
499                .map(|m| &m.r#type);
500            fields.push(property_field(&col_name, arrow_type, uni_type));
501        }
502        Arc::new(Schema::new(fields))
503    }
504}
505
506impl DisplayAs for GraphScanExec {
507    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
509        write!(
510            f,
511            "GraphScanExec: {}={}, properties={:?}",
512            scan_type, self.label, self.projected_properties
513        )?;
514        if self.filter.is_some() {
515            write!(f, ", filter=<pushed>")?;
516        }
517        Ok(())
518    }
519}
520
521impl ExecutionPlan for GraphScanExec {
522    fn name(&self) -> &str {
523        "GraphScanExec"
524    }
525
526    fn as_any(&self) -> &dyn Any {
527        self
528    }
529
530    fn schema(&self) -> SchemaRef {
531        self.schema.clone()
532    }
533
534    fn properties(&self) -> &Arc<PlanProperties> {
535        &self.properties
536    }
537
538    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
539        vec![]
540    }
541
542    fn with_new_children(
543        self: Arc<Self>,
544        children: Vec<Arc<dyn ExecutionPlan>>,
545    ) -> DFResult<Arc<dyn ExecutionPlan>> {
546        if children.is_empty() {
547            Ok(self)
548        } else {
549            Err(datafusion::error::DataFusionError::Plan(
550                "GraphScanExec does not accept children".to_string(),
551            ))
552        }
553    }
554
555    fn execute(
556        &self,
557        partition: usize,
558        _context: Arc<TaskContext>,
559    ) -> DFResult<SendableRecordBatchStream> {
560        let metrics = BaselineMetrics::new(&self.metrics, partition);
561
562        Ok(Box::pin(GraphScanStream::new(
563            self.graph_ctx.clone(),
564            self.label.clone(),
565            self.variable.clone(),
566            self.projected_properties.clone(),
567            self.is_edge_scan,
568            self.is_schemaless,
569            self.filter.clone(),
570            self.vid_list_filter.clone(),
571            self.extra_lance_filter.clone(),
572            self.extra_runtime_filter.clone(),
573            self.schema.clone(),
574            metrics,
575        )))
576    }
577
578    fn metrics(&self) -> Option<MetricsSet> {
579        Some(self.metrics.clone_inner())
580    }
581}
582
583/// State machine for graph scan stream execution.
584enum GraphScanState {
585    /// Initial state, ready to start scanning.
586    Init,
587    /// Executing the async scan.
588    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
589    /// Stream is done.
590    Done,
591}
592
593/// Stream that scans vertices or edges and materializes properties.
594///
595/// For known-label vertex scans, uses a single columnar Lance query with
596/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
597/// to the two-phase VID-scan + property-materialize flow.
598struct GraphScanStream {
599    /// Graph execution context.
600    graph_ctx: Arc<GraphExecutionContext>,
601
602    /// Label (vertex) or edge type name.
603    label: String,
604
605    /// Variable name for column prefixing (e.g., "n" in `n.name`).
606    variable: String,
607
608    /// Properties to materialize.
609    properties: Vec<String>,
610
611    /// Whether this is an edge scan.
612    is_edge_scan: bool,
613
614    /// Whether this is a schemaless scan.
615    is_schemaless: bool,
616
617    /// Pushed-down filter expression (used for VID short-circuit in L0 scans).
618    filter: Option<Arc<dyn PhysicalExpr>>,
619
620    /// Multi-VID IN-list filter for Lance pushdown. See issue #55 PR #4.
621    vid_list_filter: Option<Vec<u64>>,
622
623    /// Extra Lance filter string (e.g. `name = 'foo'`) for indexed-property
624    /// pushdown. See issue #57.
625    extra_lance_filter: Option<String>,
626
627    /// Arrow-side equivalent of `extra_lance_filter`. See issue #57.
628    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
629
630    /// Output schema.
631    schema: SchemaRef,
632
633    /// Stream state.
634    state: GraphScanState,
635
636    /// Metrics.
637    metrics: BaselineMetrics,
638}
639
640impl GraphScanStream {
641    /// Create a new graph scan stream.
642    #[expect(clippy::too_many_arguments)]
643    fn new(
644        graph_ctx: Arc<GraphExecutionContext>,
645        label: String,
646        variable: String,
647        properties: Vec<String>,
648        is_edge_scan: bool,
649        is_schemaless: bool,
650        filter: Option<Arc<dyn PhysicalExpr>>,
651        vid_list_filter: Option<Vec<u64>>,
652        extra_lance_filter: Option<String>,
653        extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
654        schema: SchemaRef,
655        metrics: BaselineMetrics,
656    ) -> Self {
657        Self {
658            graph_ctx,
659            label,
660            variable,
661            properties,
662            is_edge_scan,
663            is_schemaless,
664            filter,
665            vid_list_filter,
666            extra_lance_filter,
667            extra_runtime_filter,
668            schema,
669            state: GraphScanState::Init,
670            metrics,
671        }
672    }
673}
674
675/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
676///
677/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
678/// preserving original value types for overflow/unknown properties.
679pub(crate) fn resolve_property_type(
680    prop: &str,
681    schema_props: Option<
682        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
683    >,
684) -> DataType {
685    if prop == "overflow_json" {
686        DataType::LargeBinary
687    } else if prop == "_created_at" || prop == "_updated_at" {
688        // System-managed timestamps surfaced via `created_at(n)` /
689        // `updated_at(n)`. Stored on every vertex/edge by the L0 buffer
690        // and the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
691        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
692    } else {
693        schema_props
694            .and_then(|props| props.get(prop))
695            .map(|meta| meta.r#type.to_arrow())
696            .unwrap_or(DataType::LargeBinary)
697    }
698}
699
700/// Build a scan-output `Field`, tagging raw `Bytes` columns for the final read.
701///
702/// `DataType::Bytes`, `DataType::CypherValue`, and `DataType::Duration` all map to Arrow
703/// `LargeBinary`, but only `Bytes` stores raw (un-codec'd) bytes. The projection read
704/// (`record_batches_to_rows`) cannot tell them apart from the Arrow type alone, so it
705/// would decode a raw `Bytes` column with the CypherValue MessagePack codec and corrupt
706/// it. Stamping `uni_raw_bytes=true` lets the read route the column to the raw-bytes
707/// branch of `arrow_to_value` instead.
708pub(crate) fn property_field(
709    col_name: &str,
710    arrow_type: DataType,
711    uni_type: Option<&uni_common::DataType>,
712) -> Field {
713    let field = Field::new(col_name, arrow_type, true);
714    if matches!(uni_type, Some(uni_common::DataType::Bytes)) {
715        field.with_metadata(std::collections::HashMap::from([(
716            "uni_raw_bytes".to_string(),
717            "true".to_string(),
718        )]))
719    } else {
720        field
721    }
722}
723
724// ============================================================================
725// Columnar-first scan helpers
726// ============================================================================
727
728/// MVCC deduplication: keep only the highest-version row for each `_vid`.
729///
730/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
731/// _vid (= the highest version). This is a pure Arrow-compute operation.
732#[cfg(test)]
733fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
734    mvcc_dedup_batch_by(batch, "_vid")
735}
736
737/// Dedup a Lance batch and return `Some` only when rows remain.
738///
739/// Wraps the common pattern of dedup + empty-check that appears in every
740/// columnar scan path (vertex, edge, schemaless).
741fn mvcc_dedup_to_option(
742    batch: Option<RecordBatch>,
743    id_column: &str,
744) -> DFResult<Option<RecordBatch>> {
745    match batch {
746        Some(b) => {
747            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
748            Ok(if deduped.num_rows() > 0 {
749                Some(deduped)
750            } else {
751                None
752            })
753        }
754        None => Ok(None),
755    }
756}
757
758/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
759/// result. Returns an empty batch (against `output_schema`) when both inputs
760/// are empty.
761fn merge_lance_and_l0(
762    lance_deduped: Option<RecordBatch>,
763    l0_batch: RecordBatch,
764    internal_schema: &SchemaRef,
765    id_column: &str,
766) -> DFResult<Option<RecordBatch>> {
767    let has_l0 = l0_batch.num_rows() > 0;
768    match (lance_deduped, has_l0) {
769        (Some(lance), true) => {
770            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
771                .map_err(arrow_err)?;
772            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
773        }
774        (Some(lance), false) => Ok(Some(lance)),
775        (None, true) => Ok(Some(l0_batch)),
776        (None, false) => Ok(None),
777    }
778}
779
780/// Drop rows superseded by a newer persisted version that the pushed
781/// property predicate filtered out (issue #57 × MVCC-append tables).
782///
783/// Lance evaluates a pushed property predicate per ROW, before the per-vid
784/// max-`_version` pick — so when a vid's property was rewritten and
785/// re-flushed, its CURRENT row fails the predicate and never reaches the
786/// dedup, while the stale still-matching row wins it by default. Re-reads
787/// `_vid`/`_version` for the candidate vids WITHOUT the property predicate
788/// (per-label table when `label_table` is `Some`, the main vertex table
789/// otherwise) and keeps only rows carrying their vid's true maximum
790/// persisted version. Must run on the RAW filtered batch, before
791/// [`mvcc_dedup_to_option`].
792async fn drop_superseded_pushdown_rows(
793    storage: &Arc<uni_store::storage::manager::StorageManager>,
794    label_table: Option<&str>,
795    batch: RecordBatch,
796) -> DFResult<RecordBatch> {
797    if batch.num_rows() == 0 {
798        return Ok(batch);
799    }
800    let (Some(vid_col), Some(ver_col)) = (
801        batch
802            .column_by_name("_vid")
803            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
804        batch
805            .column_by_name("_version")
806            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
807    ) else {
808        return Err(datafusion::error::DataFusionError::Execution(
809            "pushdown version verification: scan batch missing _vid/_version".to_string(),
810        ));
811    };
812
813    let mut candidates: Vec<u64> = Vec::new();
814    let mut seen: HashSet<u64> = HashSet::new();
815    for i in 0..vid_col.len() {
816        let vid = vid_col.value(i);
817        if seen.insert(vid) {
818            candidates.push(vid);
819        }
820    }
821
822    // True max persisted version per candidate vid — unfiltered apart from
823    // the vid list, so rewritten-key rows and deletion tombstones are seen.
824    // Chunked to bound the `_vid IN (…)` filter-string size.
825    const VERIFY_CHUNK: usize = 1000;
826    let mut max_ver: HashMap<u64, u64> = HashMap::with_capacity(candidates.len());
827    for chunk in candidates.chunks(VERIFY_CHUNK) {
828        let filter = format_vid_in_list(chunk);
829        let scanned = match label_table {
830            Some(label) => {
831                storage
832                    .scan_vertex_table(label, &["_vid", "_version"], Some(&filter))
833                    .await
834            }
835            None => {
836                storage
837                    .scan_main_vertex_table(&["_vid", "_version"], Some(&filter))
838                    .await
839            }
840        }
841        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
842        let Some(vbatch) = scanned else { continue };
843        let (Some(v_vid), Some(v_ver)) = (
844            vbatch
845                .column_by_name("_vid")
846                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
847            vbatch
848                .column_by_name("_version")
849                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
850        ) else {
851            return Err(datafusion::error::DataFusionError::Execution(
852                "pushdown version verification: rescan missing _vid/_version".to_string(),
853            ));
854        };
855        for i in 0..v_vid.len() {
856            let entry = max_ver.entry(v_vid.value(i)).or_insert(0);
857            *entry = (*entry).max(v_ver.value(i));
858        }
859    }
860
861    let keep: arrow_array::BooleanArray = (0..batch.num_rows())
862        .map(|i| {
863            Some(
864                max_ver
865                    .get(&vid_col.value(i))
866                    .is_none_or(|&max| ver_col.value(i) >= max),
867            )
868        })
869        .collect();
870    arrow::compute::filter_record_batch(&batch, &keep).map_err(arrow_err)
871}
872
873/// Push `col_name` into `columns` if not already present.
874///
875/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
876/// that creates a temporary `String` allocation on every check.
877fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
878    if !columns.iter().any(|c| c == col_name) {
879        columns.push(col_name.to_string());
880    }
881}
882
883/// Extract a property value from an overflow_json CypherValue blob.
884///
885/// Returns the raw CypherValue bytes for `prop` if found in the blob,
886/// or `None` if the blob is null or the key is absent.
887fn extract_from_overflow_blob(
888    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
889    row: usize,
890    prop: &str,
891) -> Option<Vec<u8>> {
892    let arr = overflow_arr?;
893    if arr.is_null(row) {
894        return None;
895    }
896    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
897}
898
899/// Build a `LargeBinary` column by extracting a property from overflow_json
900/// blobs, with L0 buffer overlay.
901///
902/// For each row, checks L0 buffers first (later buffers take precedence).
903/// If the property is not in L0, falls back to extracting from the
904/// overflow_json CypherValue blob.
905fn build_overflow_property_column(
906    num_rows: usize,
907    vid_arr: &UInt64Array,
908    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
909    prop: &str,
910    l0_ctx: &crate::query::df_graph::L0Context,
911) -> ArrayRef {
912    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
913    for i in 0..num_rows {
914        let vid = Vid::from(vid_arr.value(i));
915
916        // Check L0 buffers (later overwrites earlier)
917        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
918
919        if let Some(val_opt) = l0_val {
920            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
921        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
922            builder.append_value(&bytes);
923        } else {
924            builder.append_null();
925        }
926    }
927    Arc::new(builder.finish())
928}
929
930/// Resolve a property value from the L0 visibility chain.
931///
932/// Returns `Some(Some(val))` when the property exists with a non-null value,
933/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
934/// has the property.
935fn resolve_l0_property(
936    vid: &Vid,
937    prop: &str,
938    l0_ctx: &crate::query::df_graph::L0Context,
939) -> Option<Option<Value>> {
940    let mut result = None;
941    for l0 in l0_ctx.iter_l0_buffers() {
942        let guard = l0.read();
943        if let Some(props) = guard.vertex_properties.get(vid)
944            && let Some(val) = props.get(prop)
945        {
946            result = Some(Some(val.clone()));
947        }
948    }
949    result
950}
951
952/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
953///
954/// Encoded directly via the CypherValue codec so typed values (temporals,
955/// nested lists/maps) round-trip losslessly. Null values produce null entries.
956fn append_value_as_cypher_binary(
957    builder: &mut arrow_array::builder::LargeBinaryBuilder,
958    val: Option<&Value>,
959) {
960    match val {
961        Some(v) if !v.is_null() => {
962            builder.append_value(uni_common::cypher_value_codec::encode(v));
963        }
964        _ => builder.append_null(),
965    }
966}
967
968/// Build the `_all_props` column by overlaying L0 buffer properties onto
969/// the batch's `props_json` column.
970///
971/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
972/// properties (in visibility order: pending → current → transaction), and
973/// re-encodes the result. This ensures `properties()` and `keys()` reflect
974/// uncommitted L0 mutations.
975fn build_all_props_column_with_l0_overlay(
976    num_rows: usize,
977    vid_arr: &UInt64Array,
978    props_arr: Option<&arrow_array::LargeBinaryArray>,
979    l0_ctx: &crate::query::df_graph::L0Context,
980) -> ArrayRef {
981    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
982    for i in 0..num_rows {
983        let vid = Vid::from(vid_arr.value(i));
984
985        // 1. Decode props_json blob from storage (stays in `Value` space so
986        //    typed values such as temporals are preserved).
987        let mut merged_props: HashMap<String, Value> = HashMap::new();
988        if let Some(arr) = props_arr
989            && !arr.is_null(i)
990            && let Ok(uni_common::Value::Map(map)) =
991                uni_common::cypher_value_codec::decode(arr.value(i))
992        {
993            merged_props.extend(map);
994        }
995
996        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
997        for l0 in l0_ctx.iter_l0_buffers() {
998            let guard = l0.read();
999            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1000                for (k, v) in l0_props {
1001                    merged_props.insert(k.clone(), v.clone());
1002                }
1003            }
1004        }
1005
1006        // 3. Encode merged result directly via the CypherValue codec.
1007        if merged_props.is_empty() {
1008            builder.append_null();
1009        } else {
1010            builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1011                merged_props,
1012            )));
1013        }
1014    }
1015    Arc::new(builder.finish())
1016}
1017
1018/// Build `_all_props` for a schema-based scan by merging:
1019/// 1. Schema-defined columns from the batch
1020/// 2. Overflow_json properties
1021/// 3. L0 buffer properties
1022fn build_all_props_column_for_schema_scan(
1023    batch: &RecordBatch,
1024    vid_arr: &UInt64Array,
1025    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
1026    projected_properties: &[String],
1027    l0_ctx: &crate::query::df_graph::L0Context,
1028) -> ArrayRef {
1029    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
1030    let schema_props: Vec<&str> = projected_properties
1031        .iter()
1032        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
1033        .map(String::as_str)
1034        .collect();
1035
1036    let num_rows = batch.num_rows();
1037    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1038    for i in 0..num_rows {
1039        let vid = Vid::from(vid_arr.value(i));
1040        // Build the merged map in `Value` space so typed values (temporals,
1041        // nested lists/maps) are preserved through to the CypherValue blob.
1042        let mut merged_props: HashMap<String, Value> = HashMap::new();
1043
1044        // 1. Schema-defined columns
1045        for &prop in &schema_props {
1046            if let Some(col) = batch.column_by_name(prop) {
1047                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
1048                if !val.is_null() {
1049                    merged_props.insert(prop.to_string(), val);
1050                }
1051            }
1052        }
1053
1054        // 2. Overflow_json properties
1055        if let Some(arr) = overflow_arr
1056            && !arr.is_null(i)
1057            && let Ok(uni_common::Value::Map(map)) =
1058                uni_common::cypher_value_codec::decode(arr.value(i))
1059        {
1060            merged_props.extend(map);
1061        }
1062
1063        // 3. L0 buffer overlay (pending → current → transaction)
1064        for l0 in l0_ctx.iter_l0_buffers() {
1065            let guard = l0.read();
1066            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1067                for (k, v) in l0_props {
1068                    merged_props.insert(k.clone(), v.clone());
1069                }
1070            }
1071        }
1072
1073        if merged_props.is_empty() {
1074            builder.append_null();
1075        } else {
1076            builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1077                merged_props,
1078            )));
1079        }
1080    }
1081    Arc::new(builder.finish())
1082}
1083
1084/// MVCC deduplication: keep only the highest-version row for each unique value
1085/// in the given `id_column`.
1086///
1087/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
1088/// each id (= the highest version). This is a pure Arrow-compute operation.
1089fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
1090    if batch.num_rows() == 0 {
1091        return Ok(batch.clone());
1092    }
1093
1094    let id_col = batch
1095        .column_by_name(id_column)
1096        .ok_or_else(|| {
1097            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
1098        })?
1099        .clone();
1100    let version_col = batch
1101        .column_by_name("_version")
1102        .ok_or_else(|| {
1103            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
1104        })?
1105        .clone();
1106
1107    // Sort by (id_column ASC, _version DESC)
1108    let sort_columns = vec![
1109        arrow::compute::SortColumn {
1110            values: id_col,
1111            options: Some(arrow::compute::SortOptions {
1112                descending: false,
1113                nulls_first: false,
1114            }),
1115        },
1116        arrow::compute::SortColumn {
1117            values: version_col,
1118            options: Some(arrow::compute::SortOptions {
1119                descending: true,
1120                nulls_first: false,
1121            }),
1122        },
1123    ];
1124    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1125
1126    // Reorder all columns by sorted indices
1127    let sorted_columns: Vec<ArrayRef> = batch
1128        .columns()
1129        .iter()
1130        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1131        .collect::<Result<_, _>>()
1132        .map_err(arrow_err)?;
1133    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1134
1135    // Build dedup mask: keep first occurrence of each id
1136    let sorted_id = sorted
1137        .column_by_name(id_column)
1138        .unwrap()
1139        .as_any()
1140        .downcast_ref::<UInt64Array>()
1141        .unwrap();
1142
1143    let mut keep = vec![false; sorted.num_rows()];
1144    if !keep.is_empty() {
1145        keep[0] = true;
1146        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1147            if sorted_id.value(i) != sorted_id.value(i - 1) {
1148                *flag = true;
1149            }
1150        }
1151    }
1152
1153    let mask = arrow_array::BooleanArray::from(keep);
1154    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1155}
1156
1157/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
1158fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1159    if batch.num_rows() == 0 {
1160        return Ok(batch.clone());
1161    }
1162    let op_col = match batch.column_by_name("op") {
1163        Some(col) => col
1164            .as_any()
1165            .downcast_ref::<arrow_array::UInt8Array>()
1166            .unwrap(),
1167        None => return Ok(batch.clone()),
1168    };
1169    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1170    let mask = arrow_array::BooleanArray::from(keep);
1171    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1172}
1173
1174/// Filter out rows where `_deleted = true` after MVCC dedup.
1175fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1176    if batch.num_rows() == 0 {
1177        return Ok(batch.clone());
1178    }
1179    let deleted_col = match batch.column_by_name("_deleted") {
1180        Some(col) => col
1181            .as_any()
1182            .downcast_ref::<arrow_array::BooleanArray>()
1183            .unwrap(),
1184        None => return Ok(batch.clone()),
1185    };
1186    let keep: Vec<bool> = (0..deleted_col.len())
1187        .map(|i| !deleted_col.value(i))
1188        .collect();
1189    let mask = arrow_array::BooleanArray::from(keep);
1190    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1191}
1192
1193/// Filter out rows whose `_vid` appears in L0 tombstones.
1194fn filter_l0_tombstones(
1195    batch: &RecordBatch,
1196    l0_ctx: &crate::query::df_graph::L0Context,
1197) -> DFResult<RecordBatch> {
1198    if batch.num_rows() == 0 {
1199        return Ok(batch.clone());
1200    }
1201
1202    let mut tombstones: HashSet<u64> = HashSet::new();
1203    for l0 in l0_ctx.iter_l0_buffers() {
1204        let guard = l0.read();
1205        for vid in guard.vertex_tombstones.iter() {
1206            tombstones.insert(vid.as_u64());
1207        }
1208    }
1209
1210    if tombstones.is_empty() {
1211        return Ok(batch.clone());
1212    }
1213
1214    let vid_col = batch
1215        .column_by_name("_vid")
1216        .ok_or_else(|| {
1217            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1218        })?
1219        .as_any()
1220        .downcast_ref::<UInt64Array>()
1221        .unwrap();
1222
1223    let keep: Vec<bool> = (0..vid_col.len())
1224        .map(|i| !tombstones.contains(&vid_col.value(i)))
1225        .collect();
1226    let mask = arrow_array::BooleanArray::from(keep);
1227    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1228}
1229
1230/// Filter out rows whose `eid` appears in L0 edge tombstones.
1231fn filter_l0_edge_tombstones(
1232    batch: &RecordBatch,
1233    l0_ctx: &crate::query::df_graph::L0Context,
1234) -> DFResult<RecordBatch> {
1235    if batch.num_rows() == 0 {
1236        return Ok(batch.clone());
1237    }
1238
1239    let mut tombstones: HashSet<u64> = HashSet::new();
1240    for l0 in l0_ctx.iter_l0_buffers() {
1241        let guard = l0.read();
1242        for eid in guard.tombstones.keys() {
1243            tombstones.insert(eid.as_u64());
1244        }
1245    }
1246
1247    if tombstones.is_empty() {
1248        return Ok(batch.clone());
1249    }
1250
1251    let eid_col = batch
1252        .column_by_name("eid")
1253        .ok_or_else(|| {
1254            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1255        })?
1256        .as_any()
1257        .downcast_ref::<UInt64Array>()
1258        .unwrap();
1259
1260    let keep: Vec<bool> = (0..eid_col.len())
1261        .map(|i| !tombstones.contains(&eid_col.value(i)))
1262        .collect();
1263    let mask = arrow_array::BooleanArray::from(keep);
1264    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1265}
1266
1267/// Extract a target VID from a DataFusion physical filter expression.
1268///
1269/// Looks for patterns like `_vid = <uint64_literal>` or `<uint64_literal> = _vid`
1270/// in the top-level expression or as a conjunct of an AND chain. Returns the first
1271/// VID literal found, or `None` if the filter does not contain such a pattern.
1272///
1273/// This also handles `CAST(literal AS UInt64)` which DataFusion may insert when
1274/// the original literal is Int64.
1275fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1276    use datafusion::logical_expr::Operator;
1277    use datafusion::physical_expr::expressions::BinaryExpr;
1278
1279    // Try to match this expression as `_vid = literal`
1280    if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1281        if bin.op() == &Operator::Eq {
1282            // Check both directions: col = lit and lit = col
1283            if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1284                return Some(vid);
1285            }
1286            if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1287                return Some(vid);
1288            }
1289        }
1290        // Recurse into AND conjuncts
1291        if bin.op() == &Operator::And {
1292            if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1293                return Some(vid);
1294            }
1295            return extract_vid_from_physical_filter(bin.right());
1296        }
1297    }
1298    None
1299}
1300
1301/// Try to extract a VID value from a `(column_expr, value_expr)` pair where
1302/// `column_expr` is a `Column` named `_vid` and `value_expr` is a UInt64 or
1303/// non-negative Int64 literal (possibly wrapped in a CAST to UInt64).
1304fn try_extract_vid_eq(
1305    col_side: &Arc<dyn PhysicalExpr>,
1306    val_side: &Arc<dyn PhysicalExpr>,
1307) -> Option<u64> {
1308    use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1309
1310    // Check that col_side is Column("_vid") or Column("variable._vid")
1311    let col = col_side.as_any().downcast_ref::<Column>()?;
1312    if col.name() != "_vid" && !col.name().ends_with("._vid") {
1313        return None;
1314    }
1315
1316    // Try direct literal
1317    if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1318        return scalar_to_u64(lit.value());
1319    }
1320
1321    // Try CAST(literal AS UInt64)
1322    if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1323        && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1324    {
1325        return scalar_to_u64(lit.value());
1326    }
1327
1328    None
1329}
1330
1331/// AND-combine an optional VID filter clause with an optional indexed-property
1332/// filter clause. Either, both, or neither may be present. See issues #55, #57.
1333fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1334    match (vid_filter, extra) {
1335        (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1336        (Some(a), None) => Some(a.to_string()),
1337        (None, Some(b)) => Some(b.to_string()),
1338        (None, None) => None,
1339    }
1340}
1341
1342/// Format a slice of VIDs as a Lance `_vid IN (v1, v2, ...)` filter clause.
1343/// Used by the scan-side IN-list pushdown introduced in issue #55 PR #4.
1344fn format_vid_in_list(vids: &[u64]) -> String {
1345    use std::fmt::Write;
1346    debug_assert!(!vids.is_empty());
1347    let mut s = String::with_capacity(vids.len() * 8 + 12);
1348    s.push_str("_vid IN (");
1349    for (i, v) in vids.iter().enumerate() {
1350        if i > 0 {
1351            s.push(',');
1352        }
1353        let _ = write!(s, "{v}");
1354    }
1355    s.push(')');
1356    s
1357}
1358
1359/// Convert a `ScalarValue` to `u64` if it is a non-negative integer type.
1360fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1361    use datafusion::common::ScalarValue;
1362    match sv {
1363        ScalarValue::UInt64(Some(v)) => Some(*v),
1364        ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1365        ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1366        ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1367        _ => None,
1368    }
1369}
1370
1371/// Build a RecordBatch from L0 buffer data for a given label, matching the
1372/// Lance query's column set.
1373///
1374/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1375/// with later buffers overwriting earlier ones for the same VID.
1376///
1377/// When `target_vids` is `Some`, only those VIDs are collected (direct HashMap
1378/// lookups instead of iterating all VIDs for the label). This must mirror the
1379/// Lance-side VID pushdown — otherwise L0-only (unflushed) rows bypass the
1380/// filter and the scan emits the full label table. See issue #72 item 1.
1381fn build_l0_vertex_batch(
1382    l0_ctx: &crate::query::df_graph::L0Context,
1383    label: &str,
1384    lance_schema: &SchemaRef,
1385    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1386    target_vids: Option<&[u64]>,
1387) -> DFResult<RecordBatch> {
1388    // Collect all L0 vertex data, merging in visibility order
1389    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1390    let mut tombstones: HashSet<u64> = HashSet::new();
1391    // System-managed timestamps: created_at takes the earliest seen
1392    // timestamp across L0 buffers (preserving the original creation
1393    // moment when a row has been touched in multiple buffers); updated_at
1394    // takes the latest (most recent write). Used by `created_at(n)` /
1395    // `updated_at(n)` Cypher functions.
1396    let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1397    let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1398
1399    for l0 in l0_ctx.iter_l0_buffers() {
1400        let guard = l0.read();
1401        // Collect tombstones
1402        for vid in guard.vertex_tombstones.iter() {
1403            tombstones.insert(vid.as_u64());
1404        }
1405        // Collect vertices — restrict to target_vids (single- or multi-VID
1406        // pushdown from id(x) = ? / id(x) IN [...]) when set, else all
1407        // vertices for the label. See issue #72 item 1: without this filter,
1408        // freshly-inserted L0 rows bypass the IN-list pushdown that Lance
1409        // already honors, defeating the optimization.
1410        let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1411            let mut out = Vec::with_capacity(tvs.len());
1412            for &tv in tvs {
1413                let vid = Vid::from(tv);
1414                if guard.vertex_properties.contains_key(&vid)
1415                    && (label.is_empty()
1416                        || guard
1417                            .label_to_vids
1418                            .get(label)
1419                            .is_some_and(|s| s.contains(&vid)))
1420                {
1421                    out.push(vid);
1422                }
1423            }
1424            out
1425        } else {
1426            guard.vids_for_label(label)
1427        };
1428        for vid in candidate_vids {
1429            let vid_u64 = vid.as_u64();
1430            if tombstones.contains(&vid_u64) {
1431                continue;
1432            }
1433            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1434            let entry = vid_data
1435                .entry(vid_u64)
1436                .or_insert_with(|| (Properties::new(), 0));
1437            // Merge properties (later L0 overwrites)
1438            if let Some(props) = guard.vertex_properties.get(&vid) {
1439                for (k, v) in props {
1440                    entry.0.insert(k.clone(), v.clone());
1441                }
1442            }
1443            // Take the highest version
1444            if version > entry.1 {
1445                entry.1 = version;
1446            }
1447            // Merge system timestamps: earliest creation, latest update
1448            if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1449                vid_created_at
1450                    .entry(vid_u64)
1451                    .and_modify(|cur| {
1452                        if ts < *cur {
1453                            *cur = ts;
1454                        }
1455                    })
1456                    .or_insert(ts);
1457            }
1458            if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1459                vid_updated_at
1460                    .entry(vid_u64)
1461                    .and_modify(|cur| {
1462                        if ts > *cur {
1463                            *cur = ts;
1464                        }
1465                    })
1466                    .or_insert(ts);
1467            }
1468        }
1469    }
1470
1471    // Remove tombstoned VIDs
1472    for t in &tombstones {
1473        vid_data.remove(t);
1474    }
1475
1476    if vid_data.is_empty() {
1477        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1478    }
1479
1480    // Sort VIDs for deterministic output
1481    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1482    vids.sort_unstable();
1483
1484    let num_rows = vids.len();
1485    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1486
1487    // Determine which schema property names exist
1488    let schema_prop_names: HashSet<&str> = label_props
1489        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1490        .unwrap_or_default();
1491
1492    for field in lance_schema.fields() {
1493        let col_name = field.name().as_str();
1494        match col_name {
1495            "_vid" => {
1496                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1497            }
1498            "_deleted" => {
1499                // L0 vertices are always live (tombstoned ones are already excluded)
1500                let vals = vec![false; num_rows];
1501                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1502            }
1503            "_version" => {
1504                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1505                columns.push(Arc::new(UInt64Array::from(vals)));
1506            }
1507            "_created_at" => {
1508                let mut builder =
1509                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1510                for v in &vids {
1511                    match vid_created_at.get(v) {
1512                        Some(&ts) => builder.append_value(ts),
1513                        None => builder.append_null(),
1514                    }
1515                }
1516                columns.push(Arc::new(builder.finish()));
1517            }
1518            "_updated_at" => {
1519                let mut builder =
1520                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1521                for v in &vids {
1522                    match vid_updated_at.get(v) {
1523                        Some(&ts) => builder.append_value(ts),
1524                        None => builder.append_null(),
1525                    }
1526                }
1527                columns.push(Arc::new(builder.finish()));
1528            }
1529            "overflow_json" => {
1530                // Collect non-schema properties as CypherValue
1531                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1532                for vid_u64 in &vids {
1533                    let (props, _) = &vid_data[vid_u64];
1534                    let mut overflow: HashMap<String, Value> = HashMap::new();
1535                    for (k, v) in props {
1536                        if k == "ext_id" || k.starts_with('_') {
1537                            continue;
1538                        }
1539                        if !schema_prop_names.contains(k.as_str()) {
1540                            overflow.insert(k.clone(), v.clone());
1541                        }
1542                    }
1543                    if overflow.is_empty() {
1544                        builder.append_null();
1545                    } else {
1546                        builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1547                            overflow,
1548                        )));
1549                    }
1550                }
1551                columns.push(Arc::new(builder.finish()));
1552            }
1553            _ => {
1554                // Schema property column: convert L0 Value → Arrow typed value
1555                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1556                columns.push(col);
1557            }
1558        }
1559    }
1560
1561    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1562}
1563
1564/// Build a single Arrow column from L0 property values.
1565///
1566/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1567fn build_l0_property_column(
1568    vids: &[u64],
1569    vid_data: &HashMap<u64, (Properties, u64)>,
1570    prop_name: &str,
1571    data_type: &DataType,
1572) -> DFResult<ArrayRef> {
1573    // Convert to Vid keys for reuse of existing build_property_column_static
1574    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1575    let props_map: HashMap<Vid, Properties> = vid_data
1576        .iter()
1577        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1578        .collect();
1579
1580    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1581}
1582
1583/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1584/// the DeltaDataset Lance table's column set.
1585///
1586/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1587/// with later buffers overwriting earlier ones for the same EID.
1588fn build_l0_edge_batch(
1589    l0_ctx: &crate::query::df_graph::L0Context,
1590    edge_type: &str,
1591    internal_schema: &SchemaRef,
1592    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1593) -> DFResult<RecordBatch> {
1594    // Collect all L0 edge data, merging in visibility order
1595    // eid -> (src_vid, dst_vid, properties, version)
1596    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1597    let mut tombstones: HashSet<u64> = HashSet::new();
1598    // System-managed timestamps: earliest creation, latest update.
1599    // See `build_l0_vertex_batch` for the rationale.
1600    let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1601    let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1602
1603    for l0 in l0_ctx.iter_l0_buffers() {
1604        let guard = l0.read();
1605        // Collect tombstones
1606        for eid in guard.tombstones.keys() {
1607            tombstones.insert(eid.as_u64());
1608        }
1609        // Collect edges for this type
1610        for eid in guard.eids_for_type(edge_type) {
1611            let eid_u64 = eid.as_u64();
1612            if tombstones.contains(&eid_u64) {
1613                continue;
1614            }
1615            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1616                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1617                None => continue,
1618            };
1619            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1620            let entry = eid_data
1621                .entry(eid_u64)
1622                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1623            // Merge properties (later L0 overwrites)
1624            if let Some(props) = guard.edge_properties.get(&eid) {
1625                for (k, v) in props {
1626                    entry.2.insert(k.clone(), v.clone());
1627                }
1628            }
1629            // Update endpoints from latest L0 layer
1630            entry.0 = src_vid;
1631            entry.1 = dst_vid;
1632            // Take the highest version
1633            if version > entry.3 {
1634                entry.3 = version;
1635            }
1636            // Merge system timestamps
1637            if let Some(&ts) = guard.edge_created_at.get(&eid) {
1638                eid_created_at
1639                    .entry(eid_u64)
1640                    .and_modify(|cur| {
1641                        if ts < *cur {
1642                            *cur = ts;
1643                        }
1644                    })
1645                    .or_insert(ts);
1646            }
1647            if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1648                eid_updated_at
1649                    .entry(eid_u64)
1650                    .and_modify(|cur| {
1651                        if ts > *cur {
1652                            *cur = ts;
1653                        }
1654                    })
1655                    .or_insert(ts);
1656            }
1657        }
1658    }
1659
1660    // Remove tombstoned EIDs
1661    for t in &tombstones {
1662        eid_data.remove(t);
1663    }
1664
1665    if eid_data.is_empty() {
1666        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1667    }
1668
1669    // Sort EIDs for deterministic output
1670    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1671    eids.sort_unstable();
1672
1673    let num_rows = eids.len();
1674    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1675
1676    // Determine which schema property names exist
1677    let schema_prop_names: HashSet<&str> = type_props
1678        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1679        .unwrap_or_default();
1680
1681    for field in internal_schema.fields() {
1682        let col_name = field.name().as_str();
1683        match col_name {
1684            "eid" => {
1685                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1686            }
1687            "src_vid" => {
1688                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1689                columns.push(Arc::new(UInt64Array::from(vals)));
1690            }
1691            "dst_vid" => {
1692                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1693                columns.push(Arc::new(UInt64Array::from(vals)));
1694            }
1695            "op" => {
1696                // L0 edges are always live (tombstoned ones already excluded)
1697                let vals = vec![0u8; num_rows];
1698                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1699            }
1700            "_version" => {
1701                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1702                columns.push(Arc::new(UInt64Array::from(vals)));
1703            }
1704            "_created_at" => {
1705                let mut builder =
1706                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1707                for e in &eids {
1708                    match eid_created_at.get(e) {
1709                        Some(&ts) => builder.append_value(ts),
1710                        None => builder.append_null(),
1711                    }
1712                }
1713                columns.push(Arc::new(builder.finish()));
1714            }
1715            "_updated_at" => {
1716                let mut builder =
1717                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1718                for e in &eids {
1719                    match eid_updated_at.get(e) {
1720                        Some(&ts) => builder.append_value(ts),
1721                        None => builder.append_null(),
1722                    }
1723                }
1724                columns.push(Arc::new(builder.finish()));
1725            }
1726            "overflow_json" => {
1727                // Collect non-schema properties as CypherValue
1728                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1729                for eid_u64 in &eids {
1730                    let (_, _, props, _) = &eid_data[eid_u64];
1731                    let mut overflow: HashMap<String, Value> = HashMap::new();
1732                    for (k, v) in props {
1733                        if k.starts_with('_') {
1734                            continue;
1735                        }
1736                        if !schema_prop_names.contains(k.as_str()) {
1737                            overflow.insert(k.clone(), v.clone());
1738                        }
1739                    }
1740                    if overflow.is_empty() {
1741                        builder.append_null();
1742                    } else {
1743                        builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1744                            overflow,
1745                        )));
1746                    }
1747                }
1748                columns.push(Arc::new(builder.finish()));
1749            }
1750            _ => {
1751                // Schema property column: convert L0 Value → Arrow typed value
1752                let col =
1753                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1754                columns.push(col);
1755            }
1756        }
1757    }
1758
1759    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1760}
1761
1762/// Build a single Arrow column from L0 edge property values.
1763///
1764/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1765fn build_l0_edge_property_column(
1766    eids: &[u64],
1767    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1768    prop_name: &str,
1769    data_type: &DataType,
1770) -> DFResult<ArrayRef> {
1771    // Convert to Vid keys for reuse of existing build_property_column_static
1772    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1773    let props_map: HashMap<Vid, Properties> = eid_data
1774        .iter()
1775        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1776        .collect();
1777
1778    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1779}
1780
1781/// Build the `_labels` column for known-label vertices.
1782///
1783/// Reads `_labels` from the stored Lance batch if available. Falls back to
1784/// `[label]` when the column is absent (legacy data). Additional labels from
1785/// L0 buffers are merged in.
1786fn build_labels_column_for_known_label(
1787    vid_arr: &UInt64Array,
1788    label: &str,
1789    l0_ctx: &crate::query::df_graph::L0Context,
1790    batch_labels_col: Option<&arrow_array::ListArray>,
1791) -> DFResult<ArrayRef> {
1792    use uni_store::storage::arrow_convert::labels_from_list_array;
1793
1794    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1795
1796    for i in 0..vid_arr.len() {
1797        let vid = Vid::from(vid_arr.value(i));
1798
1799        // Start with labels from the stored column, falling back to [label]
1800        let mut labels = match batch_labels_col {
1801            Some(list_arr) => {
1802                let stored = labels_from_list_array(list_arr, i);
1803                if stored.is_empty() {
1804                    vec![label.to_string()]
1805                } else {
1806                    stored
1807                }
1808            }
1809            None => vec![label.to_string()],
1810        };
1811
1812        // Ensure the scanned label is present (defensive)
1813        if !labels.iter().any(|l| l == label) {
1814            labels.push(label.to_string());
1815        }
1816
1817        // Merge additional labels from L0 buffers
1818        for l0 in l0_ctx.iter_l0_buffers() {
1819            let guard = l0.read();
1820            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1821                for lbl in l0_labels {
1822                    if !labels.contains(lbl) {
1823                        labels.push(lbl.clone());
1824                    }
1825                }
1826            }
1827        }
1828
1829        let values = labels_builder.values();
1830        for lbl in &labels {
1831            values.append_value(lbl);
1832        }
1833        labels_builder.append(true);
1834    }
1835
1836    Ok(Arc::new(labels_builder.finish()))
1837}
1838
1839/// Map a Lance-schema batch to the DataFusion output schema.
1840///
1841/// The output schema has `{variable}.{property}` column names, while Lance
1842/// uses bare property names. This function performs the positional mapping,
1843/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1844fn map_to_output_schema(
1845    batch: &RecordBatch,
1846    label: &str,
1847    _variable: &str,
1848    projected_properties: &[String],
1849    output_schema: &SchemaRef,
1850    l0_ctx: &crate::query::df_graph::L0Context,
1851) -> DFResult<RecordBatch> {
1852    if batch.num_rows() == 0 {
1853        return Ok(RecordBatch::new_empty(output_schema.clone()));
1854    }
1855
1856    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1857
1858    // 1. {var}._vid
1859    let vid_col = batch
1860        .column_by_name("_vid")
1861        .ok_or_else(|| {
1862            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1863        })?
1864        .clone();
1865    let vid_arr = vid_col
1866        .as_any()
1867        .downcast_ref::<UInt64Array>()
1868        .ok_or_else(|| {
1869            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1870        })?;
1871
1872    // 2. {var}._labels — read from stored column, overlay L0 additions
1873    let batch_labels_col = batch
1874        .column_by_name("_labels")
1875        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1876    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1877    columns.push(vid_col.clone());
1878    columns.push(labels_col);
1879
1880    // 3. Projected properties
1881    // Pre-load overflow_json column for extracting non-schema properties
1882    let overflow_arr = batch
1883        .column_by_name("overflow_json")
1884        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1885
1886    for prop in projected_properties {
1887        if prop == "overflow_json" {
1888            match batch.column_by_name("overflow_json") {
1889                Some(col) => columns.push(col.clone()),
1890                None => {
1891                    // No overflow_json in Lance — return null column
1892                    columns.push(arrow_array::new_null_array(
1893                        &DataType::LargeBinary,
1894                        batch.num_rows(),
1895                    ));
1896                }
1897            }
1898        } else if prop == "_all_props" {
1899            // Build _all_props from overflow_json + L0 overlay.
1900            // Fast path: if no L0 buffer has vertex property mutations AND
1901            // there are no schema columns to merge, pass through overflow_json.
1902            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1903                let guard = l0.read();
1904                !guard.vertex_properties.is_empty()
1905            });
1906            // Check if this label has schema-defined columns (besides system columns)
1907            let has_schema_cols = projected_properties
1908                .iter()
1909                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1910
1911            if !any_l0_has_vertex_props && !has_schema_cols {
1912                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1913                match batch.column_by_name("overflow_json") {
1914                    Some(col) => columns.push(col.clone()),
1915                    None => {
1916                        columns.push(arrow_array::new_null_array(
1917                            &DataType::LargeBinary,
1918                            batch.num_rows(),
1919                        ));
1920                    }
1921                }
1922            } else {
1923                // Need to merge: schema columns + overflow_json + L0 overlay
1924                let col = build_all_props_column_for_schema_scan(
1925                    batch,
1926                    vid_arr,
1927                    overflow_arr,
1928                    projected_properties,
1929                    l0_ctx,
1930                );
1931                columns.push(col);
1932            }
1933        } else {
1934            match batch.column_by_name(prop) {
1935                Some(col) => columns.push(col.clone()),
1936                None => {
1937                    // Column missing in Lance -- extract from overflow_json
1938                    // CypherValue blob with L0 overlay
1939                    let col = build_overflow_property_column(
1940                        batch.num_rows(),
1941                        vid_arr,
1942                        overflow_arr,
1943                        prop,
1944                        l0_ctx,
1945                    );
1946                    columns.push(col);
1947                }
1948            }
1949        }
1950    }
1951
1952    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1953}
1954
1955/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1956///
1957/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1958/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1959/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1960/// `_version` are dropped.
1961fn map_edge_to_output_schema(
1962    batch: &RecordBatch,
1963    variable: &str,
1964    projected_properties: &[String],
1965    output_schema: &SchemaRef,
1966) -> DFResult<RecordBatch> {
1967    if batch.num_rows() == 0 {
1968        return Ok(RecordBatch::new_empty(output_schema.clone()));
1969    }
1970
1971    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1972
1973    // 1. {var}._eid
1974    let eid_col = batch
1975        .column_by_name("eid")
1976        .ok_or_else(|| {
1977            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1978        })?
1979        .clone();
1980    columns.push(eid_col);
1981
1982    // 2. {var}._src_vid
1983    let src_col = batch
1984        .column_by_name("src_vid")
1985        .ok_or_else(|| {
1986            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1987        })?
1988        .clone();
1989    columns.push(src_col);
1990
1991    // 3. {var}._dst_vid
1992    let dst_col = batch
1993        .column_by_name("dst_vid")
1994        .ok_or_else(|| {
1995            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1996        })?
1997        .clone();
1998    columns.push(dst_col);
1999
2000    // 4. Projected properties
2001    for prop in projected_properties {
2002        if prop == "overflow_json" {
2003            match batch.column_by_name("overflow_json") {
2004                Some(col) => columns.push(col.clone()),
2005                None => {
2006                    columns.push(arrow_array::new_null_array(
2007                        &DataType::LargeBinary,
2008                        batch.num_rows(),
2009                    ));
2010                }
2011            }
2012        } else {
2013            match batch.column_by_name(prop) {
2014                Some(col) => columns.push(col.clone()),
2015                None => {
2016                    // Column missing in Lance — extract from overflow_json CypherValue blob
2017                    // (mirrors the vertex path in map_to_output_schema)
2018                    let overflow_arr = batch
2019                        .column_by_name("overflow_json")
2020                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2021
2022                    if let Some(arr) = overflow_arr {
2023                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2024                        for i in 0..batch.num_rows() {
2025                            if !arr.is_null(i) {
2026                                let blob = arr.value(i);
2027                                // Fast path: extract map entry without decoding entire map
2028                                if let Some(sub_bytes) =
2029                                    uni_common::cypher_value_codec::extract_map_entry_raw(
2030                                        blob, prop,
2031                                    )
2032                                {
2033                                    builder.append_value(&sub_bytes);
2034                                } else {
2035                                    builder.append_null();
2036                                }
2037                            } else {
2038                                builder.append_null();
2039                            }
2040                        }
2041                        columns.push(Arc::new(builder.finish()));
2042                    } else {
2043                        // No overflow_json column either — return null column
2044                        let target_field = output_schema
2045                            .fields()
2046                            .iter()
2047                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
2048                        let dt = target_field
2049                            .map(|f| f.data_type().clone())
2050                            .unwrap_or(DataType::LargeBinary);
2051                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
2052                    }
2053                }
2054            }
2055        }
2056    }
2057
2058    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2059}
2060
2061/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
2062///
2063/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
2064/// for known-label vertex scans. Reads all needed columns in a single Lance query,
2065/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
2066/// and maps to the output schema.
2067#[expect(clippy::too_many_arguments)]
2068async fn columnar_scan_vertex_batch_static(
2069    graph_ctx: &GraphExecutionContext,
2070    label: &str,
2071    variable: &str,
2072    projected_properties: &[String],
2073    output_schema: &SchemaRef,
2074    filter: &Option<Arc<dyn PhysicalExpr>>,
2075    vid_list_filter: Option<&[u64]>,
2076    extra_lance_filter: Option<&str>,
2077    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2078) -> DFResult<RecordBatch> {
2079    let storage = graph_ctx.storage();
2080    let l0_ctx = graph_ctx.l0_context();
2081    let uni_schema = storage.schema_manager().schema();
2082    let label_props = uni_schema.properties.get(label);
2083
2084    // Extract target VID from filter for short-circuit lookup. Single-VID
2085    // pushdown is from the WHERE-clause `id(x) = $literal` path; multi-VID
2086    // pushdown is from the IN-list path (`UNWIND ... WHERE id(x) = e.field`,
2087    // see issue #55 PR #4).
2088    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2089
2090    // Build the list of columns to request from Lance
2091    let mut lance_columns: Vec<String> = vec![
2092        "_vid".to_string(),
2093        "_deleted".to_string(),
2094        "_version".to_string(),
2095    ];
2096    for prop in projected_properties {
2097        if prop == "overflow_json" {
2098            push_column_if_absent(&mut lance_columns, "overflow_json");
2099        } else if prop == "_created_at" || prop == "_updated_at" {
2100            // System-managed timestamps live on every vertex table regardless
2101            // of label schema. Request them directly from Lance.
2102            push_column_if_absent(&mut lance_columns, prop);
2103        } else {
2104            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2105            if exists_in_schema {
2106                push_column_if_absent(&mut lance_columns, prop);
2107            }
2108        }
2109    }
2110
2111    // Ensure overflow_json is present when any projected property is not in the schema
2112    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2113    let needs_overflow = projected_properties.iter().any(|p| {
2114        p == "overflow_json"
2115            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2116                && !label_props.is_some_and(|lp| lp.contains_key(p)))
2117    });
2118    if needs_overflow {
2119        push_column_if_absent(&mut lance_columns, "overflow_json");
2120    }
2121
2122    // Push _vid filter to Lance for O(log N) BTree index lookup instead of full scan.
2123    // Prefer the multi-VID list (formats as `_vid IN (...)`); fall back to
2124    // single-VID `_vid = N` from the WHERE-clause path. AND-combined with
2125    // any indexed-property pushdown (issue #57).
2126    let vid_part = match (vid_list_filter, target_vid) {
2127        (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2128        (_, Some(v)) => Some(format!("_vid = {v}")),
2129        _ => None,
2130    };
2131    let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2132    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2133
2134    // M5h.2: route through plugin Storage if one is registered for
2135    // this label. v1 ships reads only — writes still go to native
2136    // backend. v1 ignores `combined_filter` when delegating (the
2137    // planner re-filters via the surrounding Filter node); per-plugin
2138    // filter pushdown is a v1.1 follow-up (`TODO(M5h.2-filter)`).
2139    let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2140        Some(reg) => match reg.lookup_label_storage(label) {
2141            Some(plugin_storage) => {
2142                let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2143                    datafusion::error::DataFusionError::Execution(format!(
2144                        "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2145                        e.message, e.code
2146                    ))
2147                })?;
2148                use futures::StreamExt;
2149                let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2150                let mut schema_ref: Option<SchemaRef> = None;
2151                while let Some(b) = stream.next().await {
2152                    let b = b.map_err(|e| {
2153                        datafusion::error::DataFusionError::Execution(format!(
2154                            "plugin Storage stream({label}) errored: {e}"
2155                        ))
2156                    })?;
2157                    if schema_ref.is_none() {
2158                        schema_ref = Some(b.schema());
2159                    }
2160                    batches.push(b);
2161                }
2162                if let Some(s) = schema_ref {
2163                    Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2164                        datafusion::error::DataFusionError::Execution(format!(
2165                            "plugin Storage concat({label}) failed: {e}"
2166                        ))
2167                    })?)
2168                } else {
2169                    None
2170                }
2171            }
2172            None => None,
2173        },
2174        None => None,
2175    };
2176
2177    // Track whether the batch came through the property-filtered native scan:
2178    // plugin batches ignore `combined_filter` (re-filtered by the planner), so
2179    // they need no stale-version verification.
2180    let (lance_batch, pushdown_filtered) = match plugin_batch {
2181        Some(b) => (Some(b), false),
2182        None => (
2183            storage
2184                .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2185                .await
2186                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2187            extra_lance_filter.is_some(),
2188        ),
2189    };
2190
2191    // A pushed property predicate hides a vid's CURRENT row from the scan when
2192    // that row no longer matches (MVCC-append: the stale still-matching row
2193    // would win the dedup by default) — drop superseded rows first.
2194    let lance_batch = match (lance_batch, pushdown_filtered) {
2195        (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, Some(label), b).await?),
2196        (b, _) => b,
2197    };
2198
2199    // MVCC dedup the Lance batch
2200    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2201
2202    // Build the internal Lance schema for L0 batch construction.
2203    // Use the Lance batch schema if available, otherwise build from scratch.
2204    let internal_schema = match &lance_deduped {
2205        Some(batch) => batch.schema(),
2206        None => {
2207            let mut fields = vec![
2208                Field::new("_vid", DataType::UInt64, false),
2209                Field::new("_deleted", DataType::Boolean, false),
2210                Field::new("_version", DataType::UInt64, false),
2211            ];
2212            for col in &lance_columns {
2213                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2214                    continue;
2215                }
2216                if col == "overflow_json" {
2217                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2218                } else if col == "_created_at" || col == "_updated_at" {
2219                    fields.push(Field::new(
2220                        col,
2221                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2222                        true,
2223                    ));
2224                } else {
2225                    let arrow_type = label_props
2226                        .and_then(|lp| lp.get(col.as_str()))
2227                        .map(|meta| meta.r#type.to_arrow())
2228                        .unwrap_or(DataType::LargeBinary);
2229                    fields.push(Field::new(col, arrow_type, true));
2230                }
2231            }
2232            Arc::new(Schema::new(fields))
2233        }
2234    };
2235
2236    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2237    // from issue #55 PR #4 — must restrict L0 to the same VID set Lance was
2238    // filtered against, see issue #72 item 1). Fall back to single-VID
2239    // (`id(x) = $literal` short-circuit). One-element buffer keeps the
2240    // borrowed slice alive for the single-VID case.
2241    let single_vid_buf: [u64; 1];
2242    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2243        (Some(vs), _) if !vs.is_empty() => Some(vs),
2244        (_, Some(v)) => {
2245            single_vid_buf = [v];
2246            Some(&single_vid_buf)
2247        }
2248        _ => None,
2249    };
2250    let l0_batch =
2251        build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2252
2253    // Merge Lance + L0
2254    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2255    else {
2256        return Ok(RecordBatch::new_empty(output_schema.clone()));
2257    };
2258
2259    // Filter out MVCC deletion tombstones (_deleted = true)
2260    let merged = filter_deleted_rows(&merged)?;
2261    if merged.num_rows() == 0 {
2262        return Ok(RecordBatch::new_empty(output_schema.clone()));
2263    }
2264
2265    // Filter L0 tombstones
2266    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2267
2268    if filtered.num_rows() == 0 {
2269        return Ok(RecordBatch::new_empty(output_schema.clone()));
2270    }
2271
2272    // Map to output schema
2273    let mapped = map_to_output_schema(
2274        &filtered,
2275        label,
2276        variable,
2277        projected_properties,
2278        output_schema,
2279        l0_ctx,
2280    )?;
2281
2282    // Apply indexed-property runtime filter (issue #57). Lance has already
2283    // filtered the on-disk side via `extra_lance_filter`; this catches any
2284    // L0 rows that slipped through the merge.
2285    apply_runtime_filter(mapped, extra_runtime_filter)
2286}
2287
2288/// Apply the indexed-property runtime filter, if present, to a `RecordBatch`.
2289/// Returns the filtered batch (or the original if no filter is set). Rows
2290/// where the predicate evaluates to NULL are treated as non-matching, same
2291/// as DataFusion `FilterExec`. See issue #57.
2292fn apply_runtime_filter(
2293    batch: RecordBatch,
2294    runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2295) -> DFResult<RecordBatch> {
2296    let Some(filter) = runtime_filter else {
2297        return Ok(batch);
2298    };
2299    if batch.num_rows() == 0 {
2300        return Ok(batch);
2301    }
2302    let result = filter.evaluate(&batch)?;
2303    let array = result.into_array(batch.num_rows())?;
2304    let bools = array
2305        .as_any()
2306        .downcast_ref::<arrow_array::BooleanArray>()
2307        .ok_or_else(|| {
2308            datafusion::error::DataFusionError::Internal(
2309                "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2310            )
2311        })?;
2312    arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2313}
2314
2315/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
2316///
2317/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
2318/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
2319/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
2320/// to the output schema.
2321async fn columnar_scan_edge_batch_static(
2322    graph_ctx: &GraphExecutionContext,
2323    edge_type: &str,
2324    variable: &str,
2325    projected_properties: &[String],
2326    output_schema: &SchemaRef,
2327) -> DFResult<RecordBatch> {
2328    let storage = graph_ctx.storage();
2329    let l0_ctx = graph_ctx.l0_context();
2330    let uni_schema = storage.schema_manager().schema();
2331    let type_props = uni_schema.properties.get(edge_type);
2332
2333    // Build the list of columns to request from DeltaDataset Lance table
2334    let mut lance_columns: Vec<String> = vec![
2335        "eid".to_string(),
2336        "src_vid".to_string(),
2337        "dst_vid".to_string(),
2338        "op".to_string(),
2339        "_version".to_string(),
2340    ];
2341    for prop in projected_properties {
2342        if prop == "overflow_json" {
2343            push_column_if_absent(&mut lance_columns, "overflow_json");
2344        } else if prop == "_created_at" || prop == "_updated_at" {
2345            // System-managed timestamps live on every edge table.
2346            push_column_if_absent(&mut lance_columns, prop);
2347        } else {
2348            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2349            if exists_in_schema {
2350                push_column_if_absent(&mut lance_columns, prop);
2351            }
2352        }
2353    }
2354
2355    // Ensure overflow_json is present when any projected property is not in the schema
2356    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2357    let needs_overflow = projected_properties.iter().any(|p| {
2358        p == "overflow_json"
2359            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2360                && !type_props.is_some_and(|tp| tp.contains_key(p)))
2361    });
2362    if needs_overflow {
2363        push_column_if_absent(&mut lance_columns, "overflow_json");
2364    }
2365
2366    // Try to query DeltaDataset (forward direction) via StorageManager domain method
2367    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2368    let lance_batch = storage
2369        .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2370        .await
2371        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2372
2373    // MVCC dedup the Lance batch (by eid)
2374    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2375
2376    // Build the internal schema for L0 batch construction.
2377    // Use the Lance batch schema if available, otherwise build from scratch.
2378    let internal_schema = match &lance_deduped {
2379        Some(batch) => batch.schema(),
2380        None => {
2381            let mut fields = vec![
2382                Field::new("eid", DataType::UInt64, false),
2383                Field::new("src_vid", DataType::UInt64, false),
2384                Field::new("dst_vid", DataType::UInt64, false),
2385                Field::new("op", DataType::UInt8, false),
2386                Field::new("_version", DataType::UInt64, false),
2387            ];
2388            for col in &lance_columns {
2389                if matches!(
2390                    col.as_str(),
2391                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2392                ) {
2393                    continue;
2394                }
2395                if col == "overflow_json" {
2396                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2397                } else if col == "_created_at" || col == "_updated_at" {
2398                    fields.push(Field::new(
2399                        col,
2400                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2401                        true,
2402                    ));
2403                } else {
2404                    let arrow_type = type_props
2405                        .and_then(|tp| tp.get(col.as_str()))
2406                        .map(|meta| meta.r#type.to_arrow())
2407                        .unwrap_or(DataType::LargeBinary);
2408                    fields.push(Field::new(col, arrow_type, true));
2409                }
2410            }
2411            Arc::new(Schema::new(fields))
2412        }
2413    };
2414
2415    // Build L0 batch
2416    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2417
2418    // Merge Lance + L0
2419    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2420        return Ok(RecordBatch::new_empty(output_schema.clone()));
2421    };
2422
2423    // Filter out MVCC deletion ops (op != 0) after dedup
2424    let merged = filter_deleted_edge_ops(&merged)?;
2425    if merged.num_rows() == 0 {
2426        return Ok(RecordBatch::new_empty(output_schema.clone()));
2427    }
2428
2429    // Filter L0 edge tombstones
2430    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2431
2432    if filtered.num_rows() == 0 {
2433        return Ok(RecordBatch::new_empty(output_schema.clone()));
2434    }
2435
2436    // Map to output schema
2437    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2438}
2439
2440/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
2441///
2442/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
2443/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
2444/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
2445/// buffer data, filters tombstones, and maps to the output schema.
2446#[expect(clippy::too_many_arguments)]
2447async fn columnar_scan_schemaless_vertex_batch_static(
2448    graph_ctx: &GraphExecutionContext,
2449    label: &str,
2450    variable: &str,
2451    projected_properties: &[String],
2452    output_schema: &SchemaRef,
2453    filter: &Option<Arc<dyn PhysicalExpr>>,
2454    vid_list_filter: Option<&[u64]>,
2455    extra_lance_filter: Option<&str>,
2456    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2457) -> DFResult<RecordBatch> {
2458    let storage = graph_ctx.storage();
2459    let l0_ctx = graph_ctx.l0_context();
2460
2461    // Extract target VID from filter for short-circuit lookup. See the
2462    // detailed comment on the per-label scan for the IN-list path
2463    // (issue #55 PR #4).
2464    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2465
2466    // Build the Lance filter expression — do NOT filter _deleted here;
2467    // MVCC dedup must see deletion tombstones to pick the highest version.
2468    let filter = {
2469        let mut parts = Vec::new();
2470
2471        // VID point-lookup filter — uses BTree index on _vid. Prefer the
2472        // multi-VID list (formats as `_vid IN (...)`); fall back to single-VID.
2473        match (vid_list_filter, target_vid) {
2474            (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2475            (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2476            _ => {}
2477        }
2478
2479        // Label filter
2480        if !label.is_empty() {
2481            if label.contains(':') {
2482                // Multi-label: each label must be present
2483                for lbl in label.split(':') {
2484                    parts.push(format!("array_contains(labels, '{}')", lbl));
2485                }
2486            } else {
2487                parts.push(format!("array_contains(labels, '{}')", label));
2488            }
2489        }
2490
2491        // Indexed-property pushdown — issue #57.
2492        if let Some(extra) = extra_lance_filter {
2493            parts.push(extra.to_string());
2494        }
2495
2496        if parts.is_empty() {
2497            None
2498        } else {
2499            Some(parts.join(" AND "))
2500        }
2501    };
2502
2503    // Single Lance query via StorageManager domain method
2504    let lance_batch = storage
2505        .scan_main_vertex_table(
2506            &["_vid", "_deleted", "labels", "props_json", "_version"],
2507            filter.as_deref(),
2508        )
2509        .await
2510        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2511
2512    // A pushed property predicate hides a vid's CURRENT row from the scan when
2513    // that row no longer matches (MVCC-append: the stale still-matching row
2514    // would win the dedup by default) — drop superseded rows first.
2515    let lance_batch = match (lance_batch, extra_lance_filter.is_some()) {
2516        (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, None, b).await?),
2517        (b, _) => b,
2518    };
2519
2520    // MVCC dedup the Lance batch
2521    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2522
2523    // Build the internal schema for L0 batch construction.
2524    // Use the Lance batch schema if available, otherwise build from scratch.
2525    let internal_schema = match &lance_deduped {
2526        Some(batch) => batch.schema(),
2527        None => Arc::new(Schema::new(vec![
2528            Field::new("_vid", DataType::UInt64, false),
2529            Field::new("_deleted", DataType::Boolean, false),
2530            Field::new("labels", labels_data_type(), false),
2531            Field::new("props_json", DataType::LargeBinary, true),
2532            Field::new("_version", DataType::UInt64, false),
2533        ])),
2534    };
2535
2536    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2537    // from issue #55 PR #4 — must restrict L0 to match Lance filtering, see
2538    // issue #72 item 1). Fall back to single-VID.
2539    let single_vid_buf: [u64; 1];
2540    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2541        (Some(vs), _) if !vs.is_empty() => Some(vs),
2542        (_, Some(v)) => {
2543            single_vid_buf = [v];
2544            Some(&single_vid_buf)
2545        }
2546        _ => None,
2547    };
2548    let l0_batch =
2549        build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2550
2551    // Merge Lance + L0
2552    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2553    else {
2554        return Ok(RecordBatch::new_empty(output_schema.clone()));
2555    };
2556
2557    // Filter out MVCC deletion tombstones (_deleted = true)
2558    let merged = filter_deleted_rows(&merged)?;
2559    if merged.num_rows() == 0 {
2560        return Ok(RecordBatch::new_empty(output_schema.clone()));
2561    }
2562
2563    // Filter L0 tombstones
2564    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2565
2566    if filtered.num_rows() == 0 {
2567        return Ok(RecordBatch::new_empty(output_schema.clone()));
2568    }
2569
2570    // Map to output schema
2571    let mapped = map_to_schemaless_output_schema(
2572        &filtered,
2573        variable,
2574        projected_properties,
2575        output_schema,
2576        l0_ctx,
2577    )?;
2578
2579    // Apply indexed-property runtime filter — issue #57.
2580    apply_runtime_filter(mapped, extra_runtime_filter)
2581}
2582
2583/// Build a RecordBatch from L0 buffer data for schemaless vertices.
2584///
2585/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
2586/// with later buffers overwriting earlier ones for the same VID. Produces a batch
2587/// matching the internal schema: `_vid, labels, props_json, _version`.
2588fn build_l0_schemaless_vertex_batch(
2589    l0_ctx: &crate::query::df_graph::L0Context,
2590    label: &str,
2591    internal_schema: &SchemaRef,
2592    target_vids: Option<&[u64]>,
2593) -> DFResult<RecordBatch> {
2594    // Collect all L0 vertex data, merging in visibility order
2595    // vid -> (merged_props, highest_version, labels)
2596    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2597    let mut tombstones: HashSet<u64> = HashSet::new();
2598
2599    // Parse multi-label filter
2600    let label_filter: Vec<&str> = if label.is_empty() {
2601        vec![]
2602    } else if label.contains(':') {
2603        label.split(':').collect()
2604    } else {
2605        vec![label]
2606    };
2607
2608    for l0 in l0_ctx.iter_l0_buffers() {
2609        let guard = l0.read();
2610
2611        // Collect tombstones
2612        for vid in guard.vertex_tombstones.iter() {
2613            tombstones.insert(vid.as_u64());
2614        }
2615
2616        // Collect VIDs matching the label filter — short-circuit when target_vids is set
2617        // (see issue #72 item 1; multi-VID IN-list must filter L0 too).
2618        let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2619            let mut out = Vec::with_capacity(tvs.len());
2620            for &tv in tvs {
2621                let vid = Vid::from(tv);
2622                if !guard.vertex_properties.contains_key(&vid) {
2623                    continue;
2624                }
2625                let label_ok = if label_filter.is_empty() {
2626                    true
2627                } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2628                    label_filter
2629                        .iter()
2630                        .all(|lf| labels.contains(&lf.to_string()))
2631                } else {
2632                    false
2633                };
2634                if label_ok {
2635                    out.push(vid);
2636                }
2637            }
2638            out
2639        } else if label_filter.is_empty() {
2640            guard.all_vertex_vids()
2641        } else if label_filter.len() == 1 {
2642            guard.vids_for_label(label_filter[0])
2643        } else {
2644            guard.vids_with_all_labels(&label_filter)
2645        };
2646
2647        for vid in vids {
2648            let vid_u64 = vid.as_u64();
2649            if tombstones.contains(&vid_u64) {
2650                continue;
2651            }
2652            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2653            let entry = vid_data
2654                .entry(vid_u64)
2655                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2656
2657            // Merge properties (later L0 overwrites)
2658            if let Some(props) = guard.vertex_properties.get(&vid) {
2659                for (k, v) in props {
2660                    entry.0.insert(k.clone(), v.clone());
2661                }
2662            }
2663            // Take the highest version
2664            if version > entry.1 {
2665                entry.1 = version;
2666            }
2667            // Update labels from latest L0 layer
2668            if let Some(labels) = guard.vertex_labels.get(&vid) {
2669                entry.2 = labels.clone();
2670            }
2671        }
2672    }
2673
2674    // Remove tombstoned VIDs
2675    for t in &tombstones {
2676        vid_data.remove(t);
2677    }
2678
2679    if vid_data.is_empty() {
2680        return Ok(RecordBatch::new_empty(internal_schema.clone()));
2681    }
2682
2683    // Sort VIDs for deterministic output
2684    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2685    vids.sort_unstable();
2686
2687    let num_rows = vids.len();
2688    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2689
2690    for field in internal_schema.fields() {
2691        match field.name().as_str() {
2692            "_vid" => {
2693                columns.push(Arc::new(UInt64Array::from(vids.clone())));
2694            }
2695            "labels" => {
2696                let mut labels_builder = ListBuilder::new(StringBuilder::new());
2697                for vid_u64 in &vids {
2698                    let (_, _, labels) = &vid_data[vid_u64];
2699                    let values = labels_builder.values();
2700                    for lbl in labels {
2701                        values.append_value(lbl);
2702                    }
2703                    labels_builder.append(true);
2704                }
2705                columns.push(Arc::new(labels_builder.finish()));
2706            }
2707            "props_json" => {
2708                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2709                for vid_u64 in &vids {
2710                    let (props, _, _) = &vid_data[vid_u64];
2711                    if props.is_empty() {
2712                        builder.append_null();
2713                    } else {
2714                        // Encode properties as a CypherValue blob directly from
2715                        // `Value` so typed values (temporals) are preserved.
2716                        let map: HashMap<String, Value> =
2717                            props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2718                        builder
2719                            .append_value(uni_common::cypher_value_codec::encode(&Value::Map(map)));
2720                    }
2721                }
2722                columns.push(Arc::new(builder.finish()));
2723            }
2724            "_deleted" => {
2725                // L0 vertices are always live (tombstoned ones already excluded)
2726                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2727                    false;
2728                    num_rows
2729                ])));
2730            }
2731            "_version" => {
2732                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2733                columns.push(Arc::new(UInt64Array::from(vals)));
2734            }
2735            _ => {
2736                // Unexpected column — fill with nulls
2737                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2738            }
2739        }
2740    }
2741
2742    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2743}
2744
2745/// Map an internal-schema schemaless batch to the DataFusion output schema.
2746///
2747/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2748/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2749/// Individual properties are extracted from the `props_json` CypherValue blob by
2750/// decoding to a Map and extracting the sub-value.
2751fn map_to_schemaless_output_schema(
2752    batch: &RecordBatch,
2753    _variable: &str,
2754    projected_properties: &[String],
2755    output_schema: &SchemaRef,
2756    l0_ctx: &crate::query::df_graph::L0Context,
2757) -> DFResult<RecordBatch> {
2758    if batch.num_rows() == 0 {
2759        return Ok(RecordBatch::new_empty(output_schema.clone()));
2760    }
2761
2762    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2763
2764    // 1. {var}._vid — passthrough
2765    let vid_col = batch
2766        .column_by_name("_vid")
2767        .ok_or_else(|| {
2768            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2769        })?
2770        .clone();
2771    let vid_arr = vid_col
2772        .as_any()
2773        .downcast_ref::<UInt64Array>()
2774        .ok_or_else(|| {
2775            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2776        })?;
2777    columns.push(vid_col.clone());
2778
2779    // 2. {var}._labels — from labels column with L0 overlay
2780    let labels_col = batch.column_by_name("labels");
2781    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2782
2783    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2784    for i in 0..vid_arr.len() {
2785        let vid_u64 = vid_arr.value(i);
2786        let vid = Vid::from(vid_u64);
2787
2788        // Start with labels from the batch
2789        let mut row_labels: Vec<String> = Vec::new();
2790        if let Some(arr) = labels_arr
2791            && !arr.is_null(i)
2792        {
2793            let list_val = arr.value(i);
2794            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2795                for j in 0..str_arr.len() {
2796                    if !str_arr.is_null(j) {
2797                        row_labels.push(str_arr.value(j).to_string());
2798                    }
2799                }
2800            }
2801        }
2802
2803        // Overlay L0 labels
2804        for l0 in l0_ctx.iter_l0_buffers() {
2805            let guard = l0.read();
2806            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2807                for lbl in l0_labels {
2808                    if !row_labels.contains(lbl) {
2809                        row_labels.push(lbl.clone());
2810                    }
2811                }
2812            }
2813        }
2814
2815        let values = labels_builder.values();
2816        for lbl in &row_labels {
2817            values.append_value(lbl);
2818        }
2819        labels_builder.append(true);
2820    }
2821    columns.push(Arc::new(labels_builder.finish()));
2822
2823    // 3. Projected properties — extract from props_json
2824    let props_col = batch.column_by_name("props_json");
2825    let props_arr =
2826        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2827
2828    for prop in projected_properties {
2829        if prop == "_all_props" {
2830            // Fast path: if no L0 buffer has vertex property mutations,
2831            // the raw props_json passthrough is correct.
2832            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2833                let guard = l0.read();
2834                !guard.vertex_properties.is_empty()
2835            });
2836            if !any_l0_has_vertex_props {
2837                match props_col {
2838                    Some(col) => columns.push(col.clone()),
2839                    None => {
2840                        columns.push(arrow_array::new_null_array(
2841                            &DataType::LargeBinary,
2842                            batch.num_rows(),
2843                        ));
2844                    }
2845                }
2846            } else {
2847                let col = build_all_props_column_with_l0_overlay(
2848                    batch.num_rows(),
2849                    vid_arr,
2850                    props_arr,
2851                    l0_ctx,
2852                );
2853                columns.push(col);
2854            }
2855        } else {
2856            // Extract individual property from CypherValue blob with L0 overlay.
2857            // The raw column is LargeBinary (CypherValue-encoded). If the output
2858            // schema expects a typed column (e.g., Utf8 for String properties),
2859            // decode the CypherValue and build the correct Arrow type.
2860            let expected_type = output_schema
2861                .field_with_name(&format!("{_variable}.{prop}"))
2862                .map(|f| f.data_type().clone())
2863                .unwrap_or(DataType::LargeBinary);
2864
2865            if expected_type == DataType::LargeBinary {
2866                let col = build_overflow_property_column(
2867                    batch.num_rows(),
2868                    vid_arr,
2869                    props_arr,
2870                    prop,
2871                    l0_ctx,
2872                );
2873                columns.push(col);
2874            } else {
2875                // Decode CypherValue to the expected type via build_property_column_static.
2876                let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2877                for i in 0..batch.num_rows() {
2878                    let vid = Vid::from(vid_arr.value(i));
2879                    let resolved =
2880                        resolve_l0_property(&vid, prop, l0_ctx)
2881                            .flatten()
2882                            .or_else(|| {
2883                                extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2884                                    uni_common::cypher_value_codec::decode(&bytes).ok()
2885                                })
2886                            });
2887                    if let Some(val) = resolved {
2888                        prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2889                    }
2890                }
2891                let vids: Vec<Vid> = (0..batch.num_rows())
2892                    .map(|i| Vid::from(vid_arr.value(i)))
2893                    .collect();
2894                let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2895                    .unwrap_or_else(|_| {
2896                        arrow_array::new_null_array(&expected_type, batch.num_rows())
2897                    });
2898                columns.push(col);
2899            }
2900        }
2901    }
2902
2903    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2904}
2905
2906/// Get the property value for a VID, returning None if not found.
2907pub(crate) fn get_property_value(
2908    vid: &Vid,
2909    props_map: &HashMap<Vid, Properties>,
2910    prop_name: &str,
2911) -> Option<Value> {
2912    if prop_name == "_all_props" {
2913        return props_map.get(vid).map(|p| {
2914            let map: HashMap<String, Value> =
2915                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2916            Value::Map(map)
2917        });
2918    }
2919    props_map
2920        .get(vid)
2921        .and_then(|props| props.get(prop_name))
2922        .cloned()
2923}
2924
2925/// Build a numeric column from property values using the specified builder and extractor.
2926macro_rules! build_numeric_column {
2927    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2928        let mut builder = <$builder_ty>::new();
2929        for vid in $vids {
2930            match get_property_value(vid, $props_map, $prop_name) {
2931                Some(ref v) => {
2932                    if let Some(val) = $extractor(v) {
2933                        builder.append_value($cast(val));
2934                    } else {
2935                        builder.append_null();
2936                    }
2937                }
2938                None => builder.append_null(),
2939            }
2940        }
2941        Ok(Arc::new(builder.finish()) as ArrayRef)
2942    }};
2943}
2944
2945/// Build an Arrow column from property values (static version).
2946pub(crate) fn build_property_column_static(
2947    vids: &[Vid],
2948    props_map: &HashMap<Vid, Properties>,
2949    prop_name: &str,
2950    data_type: &DataType,
2951) -> DFResult<ArrayRef> {
2952    match data_type {
2953        DataType::LargeBinary => {
2954            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2955            use arrow_array::builder::LargeBinaryBuilder;
2956            let mut builder = LargeBinaryBuilder::new();
2957
2958            for vid in vids {
2959                match get_property_value(vid, props_map, prop_name) {
2960                    Some(Value::Null) | None => builder.append_null(),
2961                    Some(Value::Bytes(bytes)) => {
2962                        builder.append_value(&bytes);
2963                    }
2964                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2965                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2966                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2967                        let bytes: Vec<u8> = arr
2968                            .iter()
2969                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2970                            .collect();
2971                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2972                            builder.append_value(&bytes);
2973                        } else {
2974                            builder.append_value(uni_common::cypher_value_codec::encode(
2975                                &Value::List(arr),
2976                            ));
2977                        }
2978                    }
2979                    Some(val) => {
2980                        // Encode any other property value directly via the
2981                        // CypherValue codec so typed values (temporals, including
2982                        // BTIC, and nested lists/maps) round-trip losslessly.
2983                        builder.append_value(uni_common::cypher_value_codec::encode(&val));
2984                    }
2985                }
2986            }
2987            Ok(Arc::new(builder.finish()))
2988        }
2989        DataType::Binary => {
2990            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2991            let mut builder = BinaryBuilder::new();
2992            for vid in vids {
2993                let bytes = get_property_value(vid, props_map, prop_name)
2994                    .filter(|v| !v.is_null())
2995                    .and_then(|v| {
2996                        let json_val: serde_json::Value = v.into();
2997                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2998                    })
2999                    .and_then(|crdt| crdt.to_msgpack().ok());
3000                match bytes {
3001                    Some(b) => builder.append_value(&b),
3002                    None => builder.append_null(),
3003                }
3004            }
3005            Ok(Arc::new(builder.finish()))
3006        }
3007        DataType::Utf8 => {
3008            let mut builder = StringBuilder::new();
3009            for vid in vids {
3010                match get_property_value(vid, props_map, prop_name) {
3011                    Some(Value::String(s)) => builder.append_value(s),
3012                    Some(Value::Null) | None => builder.append_null(),
3013                    Some(other) => builder.append_value(other.to_string()),
3014                }
3015            }
3016            Ok(Arc::new(builder.finish()))
3017        }
3018        DataType::Int64 => {
3019            build_numeric_column!(
3020                vids,
3021                props_map,
3022                prop_name,
3023                Int64Builder,
3024                |v: &Value| v.as_i64(),
3025                |v| v
3026            )
3027        }
3028        DataType::Int32 => {
3029            build_numeric_column!(
3030                vids,
3031                props_map,
3032                prop_name,
3033                Int32Builder,
3034                |v: &Value| v.as_i64(),
3035                |v: i64| v as i32
3036            )
3037        }
3038        DataType::Float64 => {
3039            build_numeric_column!(
3040                vids,
3041                props_map,
3042                prop_name,
3043                Float64Builder,
3044                |v: &Value| v.as_f64(),
3045                |v| v
3046            )
3047        }
3048        DataType::Float32 => {
3049            build_numeric_column!(
3050                vids,
3051                props_map,
3052                prop_name,
3053                Float32Builder,
3054                |v: &Value| v.as_f64(),
3055                |v: f64| v as f32
3056            )
3057        }
3058        DataType::Boolean => {
3059            let mut builder = BooleanBuilder::new();
3060            for vid in vids {
3061                match get_property_value(vid, props_map, prop_name) {
3062                    Some(Value::Bool(b)) => builder.append_value(b),
3063                    _ => builder.append_null(),
3064                }
3065            }
3066            Ok(Arc::new(builder.finish()))
3067        }
3068        DataType::UInt64 => {
3069            build_numeric_column!(
3070                vids,
3071                props_map,
3072                prop_name,
3073                UInt64Builder,
3074                |v: &Value| v.as_u64(),
3075                |v| v
3076            )
3077        }
3078        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
3079            // Vector properties: FixedSizeList(Float32, N)
3080            let values_builder = Float32Builder::new();
3081            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
3082            for vid in vids {
3083                match get_property_value(vid, props_map, prop_name) {
3084                    Some(Value::Vector(v)) => {
3085                        for val in v {
3086                            list_builder.values().append_value(val);
3087                        }
3088                        list_builder.append(true);
3089                    }
3090                    Some(Value::List(arr)) => {
3091                        for v in arr {
3092                            list_builder
3093                                .values()
3094                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
3095                        }
3096                        list_builder.append(true);
3097                    }
3098                    _ => {
3099                        // Append dim nulls to inner values, then mark row as null
3100                        for _ in 0..*dim {
3101                            list_builder.values().append_null();
3102                        }
3103                        list_builder.append(false);
3104                    }
3105                }
3106            }
3107            Ok(Arc::new(list_builder.finish()))
3108        }
3109        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3110            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
3111            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3112            for vid in vids {
3113                match get_property_value(vid, props_map, prop_name) {
3114                    Some(Value::Temporal(tv)) => match tv {
3115                        uni_common::TemporalValue::DateTime {
3116                            nanos_since_epoch, ..
3117                        }
3118                        | uni_common::TemporalValue::LocalDateTime {
3119                            nanos_since_epoch, ..
3120                        } => {
3121                            builder.append_value(nanos_since_epoch);
3122                        }
3123                        uni_common::TemporalValue::Date { days_since_epoch } => {
3124                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3125                        }
3126                        _ => builder.append_null(),
3127                    },
3128                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
3129                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3130                        Err(_) => builder.append_null(),
3131                    },
3132                    Some(Value::Int(n)) => {
3133                        builder.append_value(n);
3134                    }
3135                    _ => builder.append_null(),
3136                }
3137            }
3138            Ok(Arc::new(builder.finish()))
3139        }
3140        DataType::Date32 => {
3141            let mut builder = Date32Builder::new();
3142            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3143            for vid in vids {
3144                match get_property_value(vid, props_map, prop_name) {
3145                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3146                        builder.append_value(days_since_epoch);
3147                    }
3148                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3149                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3150                        Err(_) => builder.append_null(),
3151                    },
3152                    Some(Value::Int(n)) => {
3153                        builder.append_value(n as i32);
3154                    }
3155                    _ => builder.append_null(),
3156                }
3157            }
3158            Ok(Arc::new(builder.finish()))
3159        }
3160        DataType::Time64(TimeUnit::Nanosecond) => {
3161            let mut builder = Time64NanosecondBuilder::new();
3162            for vid in vids {
3163                match get_property_value(vid, props_map, prop_name) {
3164                    Some(Value::Temporal(
3165                        uni_common::TemporalValue::LocalTime {
3166                            nanos_since_midnight,
3167                        }
3168                        | uni_common::TemporalValue::Time {
3169                            nanos_since_midnight,
3170                            ..
3171                        },
3172                    )) => {
3173                        builder.append_value(nanos_since_midnight);
3174                    }
3175                    Some(Value::Temporal(_)) => builder.append_null(),
3176                    Some(Value::String(s)) => {
3177                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3178                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3179                        {
3180                            Ok(t) => {
3181                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3182                                    + t.nanosecond() as i64;
3183                                builder.append_value(nanos);
3184                            }
3185                            Err(_) => builder.append_null(),
3186                        }
3187                    }
3188                    Some(Value::Int(n)) => {
3189                        builder.append_value(n);
3190                    }
3191                    _ => builder.append_null(),
3192                }
3193            }
3194            Ok(Arc::new(builder.finish()))
3195        }
3196        DataType::Interval(IntervalUnit::MonthDayNano) => {
3197            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3198                Vec::with_capacity(vids.len());
3199            for vid in vids {
3200                match get_property_value(vid, props_map, prop_name) {
3201                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
3202                        months,
3203                        days,
3204                        nanos,
3205                    })) => {
3206                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3207                            months: months as i32,
3208                            days: days as i32,
3209                            nanoseconds: nanos,
3210                        }));
3211                    }
3212                    Some(Value::Int(_n)) => {
3213                        values.push(None);
3214                    }
3215                    _ => values.push(None),
3216                }
3217            }
3218            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3219            Ok(Arc::new(arr))
3220        }
3221        DataType::List(inner_field) => {
3222            build_list_property_column(vids, props_map, prop_name, inner_field)
3223        }
3224        DataType::Struct(fields) => {
3225            build_struct_property_column(vids, props_map, prop_name, fields)
3226        }
3227        DataType::FixedSizeBinary(24) => {
3228            // BTIC temporal interval columns: encode as FixedSizeBinary(24)
3229            use arrow_array::builder::FixedSizeBinaryBuilder;
3230            const BTIC_LEN: i32 = 24;
3231            let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3232            for vid in vids {
3233                match get_property_value(vid, props_map, prop_name) {
3234                    Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3235                        match uni_btic::Btic::new(lo, hi, meta) {
3236                            Ok(b) => {
3237                                builder
3238                                    .append_value(uni_btic::encode::encode(&b))
3239                                    .map_err(arrow_err)?;
3240                            }
3241                            Err(e) => {
3242                                tracing::warn!(
3243                                    "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3244                                    prop_name,
3245                                    lo,
3246                                    hi,
3247                                    meta,
3248                                    e
3249                                );
3250                                builder.append_null()
3251                            }
3252                        }
3253                    }
3254                    Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3255                        Ok(b) => {
3256                            builder
3257                                .append_value(uni_btic::encode::encode(&b))
3258                                .map_err(arrow_err)?;
3259                        }
3260                        Err(e) => {
3261                            tracing::warn!(
3262                                "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3263                                prop_name,
3264                                s,
3265                                e
3266                            );
3267                            builder.append_null()
3268                        }
3269                    },
3270                    _ => builder.append_null(),
3271                }
3272            }
3273            Ok(Arc::new(builder.finish()))
3274        }
3275        // Default: convert to string
3276        _ => {
3277            let mut builder = StringBuilder::new();
3278            for vid in vids {
3279                match get_property_value(vid, props_map, prop_name) {
3280                    Some(Value::Null) | None => builder.append_null(),
3281                    Some(other) => builder.append_value(other.to_string()),
3282                }
3283            }
3284            Ok(Arc::new(builder.finish()))
3285        }
3286    }
3287}
3288
3289/// Build a List-typed Arrow column from list property values.
3290fn build_list_property_column(
3291    vids: &[Vid],
3292    props_map: &HashMap<Vid, Properties>,
3293    prop_name: &str,
3294    inner_field: &Arc<Field>,
3295) -> DFResult<ArrayRef> {
3296    match inner_field.data_type() {
3297        DataType::Utf8 => {
3298            let mut builder = ListBuilder::new(StringBuilder::new());
3299            for vid in vids {
3300                match get_property_value(vid, props_map, prop_name) {
3301                    Some(Value::List(arr)) => {
3302                        for v in arr {
3303                            match v {
3304                                Value::String(s) => builder.values().append_value(s),
3305                                Value::Null => builder.values().append_null(),
3306                                other => builder.values().append_value(format!("{other:?}")),
3307                            }
3308                        }
3309                        builder.append(true);
3310                    }
3311                    _ => builder.append(false),
3312                }
3313            }
3314            Ok(Arc::new(builder.finish()))
3315        }
3316        DataType::Int64 => {
3317            let mut builder = ListBuilder::new(Int64Builder::new());
3318            for vid in vids {
3319                match get_property_value(vid, props_map, prop_name) {
3320                    Some(Value::List(arr)) => {
3321                        for v in arr {
3322                            match v.as_i64() {
3323                                Some(n) => builder.values().append_value(n),
3324                                None => builder.values().append_null(),
3325                            }
3326                        }
3327                        builder.append(true);
3328                    }
3329                    _ => builder.append(false),
3330                }
3331            }
3332            Ok(Arc::new(builder.finish()))
3333        }
3334        DataType::Float64 => {
3335            let mut builder = ListBuilder::new(Float64Builder::new());
3336            for vid in vids {
3337                match get_property_value(vid, props_map, prop_name) {
3338                    Some(Value::List(arr)) => {
3339                        for v in arr {
3340                            match v.as_f64() {
3341                                Some(n) => builder.values().append_value(n),
3342                                None => builder.values().append_null(),
3343                            }
3344                        }
3345                        builder.append(true);
3346                    }
3347                    _ => builder.append(false),
3348                }
3349            }
3350            Ok(Arc::new(builder.finish()))
3351        }
3352        DataType::Boolean => {
3353            let mut builder = ListBuilder::new(BooleanBuilder::new());
3354            for vid in vids {
3355                match get_property_value(vid, props_map, prop_name) {
3356                    Some(Value::List(arr)) => {
3357                        for v in arr {
3358                            match v.as_bool() {
3359                                Some(b) => builder.values().append_value(b),
3360                                None => builder.values().append_null(),
3361                            }
3362                        }
3363                        builder.append(true);
3364                    }
3365                    _ => builder.append(false),
3366                }
3367            }
3368            Ok(Arc::new(builder.finish()))
3369        }
3370        DataType::Struct(fields) => {
3371            // Map types are List(Struct(key, value)) — build struct inner elements
3372            build_list_of_structs_column(vids, props_map, prop_name, fields)
3373        }
3374        DataType::LargeBinary
3375            if inner_field
3376                .metadata()
3377                .get("uni_raw_bytes")
3378                .is_some_and(|v| v == "true") =>
3379        {
3380            // Typed `List(Bytes)`: store each buffer verbatim in a `LargeBinary`
3381            // child. The child field (reused from the schema) carries the
3382            // `uni_raw_bytes` marker so the read path decodes it as raw `Bytes`.
3383            // CV-encoded `LargeBinary` lists lack the marker and keep the string
3384            // fallback below — no pattern-comprehension/VLP regression.
3385            let mut builder = ListBuilder::new(arrow_array::builder::LargeBinaryBuilder::new())
3386                .with_field(inner_field.clone());
3387            for vid in vids {
3388                match get_property_value(vid, props_map, prop_name) {
3389                    Some(Value::List(arr)) => {
3390                        for v in arr {
3391                            if let Value::Bytes(b) = v {
3392                                builder.values().append_value(b);
3393                            } else {
3394                                builder.values().append_null();
3395                            }
3396                        }
3397                        builder.append(true);
3398                    }
3399                    _ => builder.append(false),
3400                }
3401            }
3402            Ok(Arc::new(builder.finish()))
3403        }
3404        // Fallback: serialize inner elements as strings
3405        _ => {
3406            let mut builder = ListBuilder::new(StringBuilder::new());
3407            for vid in vids {
3408                match get_property_value(vid, props_map, prop_name) {
3409                    Some(Value::List(arr)) => {
3410                        for v in arr {
3411                            match v {
3412                                Value::Null => builder.values().append_null(),
3413                                other => builder.values().append_value(format!("{other:?}")),
3414                            }
3415                        }
3416                        builder.append(true);
3417                    }
3418                    _ => builder.append(false),
3419                }
3420            }
3421            Ok(Arc::new(builder.finish()))
3422        }
3423    }
3424}
3425
3426/// Build a List(Struct(...)) column, used for Map-type properties.
3427///
3428/// Handles two value representations:
3429/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
3430/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
3431fn build_list_of_structs_column(
3432    vids: &[Vid],
3433    props_map: &HashMap<Vid, Properties>,
3434    prop_name: &str,
3435    fields: &Fields,
3436) -> DFResult<ArrayRef> {
3437    use arrow_array::StructArray;
3438
3439    let values: Vec<Option<Value>> = vids
3440        .iter()
3441        .map(|vid| get_property_value(vid, props_map, prop_name))
3442        .collect();
3443
3444    // Convert each row's value to an owned Vec of Maps (key-value pairs).
3445    // This normalizes both List-of-maps and Map representations.
3446    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3447        .iter()
3448        .map(|val| match val {
3449            Some(Value::List(arr)) => {
3450                let objs: Vec<HashMap<String, Value>> = arr
3451                    .iter()
3452                    .filter_map(|v| {
3453                        if let Value::Map(m) = v {
3454                            Some(m.clone())
3455                        } else {
3456                            None
3457                        }
3458                    })
3459                    .collect();
3460                if objs.is_empty() { None } else { Some(objs) }
3461            }
3462            Some(Value::Map(obj)) => {
3463                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
3464                let kv_pairs: Vec<HashMap<String, Value>> = obj
3465                    .iter()
3466                    .map(|(k, v)| {
3467                        let mut m = HashMap::new();
3468                        m.insert("key".to_string(), Value::String(k.clone()));
3469                        m.insert("value".to_string(), v.clone());
3470                        m
3471                    })
3472                    .collect();
3473                Some(kv_pairs)
3474            }
3475            _ => None,
3476        })
3477        .collect();
3478
3479    let total_items: usize = rows
3480        .iter()
3481        .filter_map(|r| r.as_ref())
3482        .map(|v| v.len())
3483        .sum();
3484
3485    // Build child arrays for each field in the struct
3486    let child_arrays: Vec<ArrayRef> = fields
3487        .iter()
3488        .map(|field| {
3489            let field_name = field.name();
3490            match field.data_type() {
3491                DataType::Utf8 => {
3492                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3493                    for obj in rows.iter().flatten().flatten() {
3494                        match obj.get(field_name) {
3495                            Some(Value::String(s)) => builder.append_value(s),
3496                            Some(Value::Null) | None => builder.append_null(),
3497                            Some(other) => builder.append_value(format!("{other:?}")),
3498                        }
3499                    }
3500                    Arc::new(builder.finish()) as ArrayRef
3501                }
3502                DataType::Int64 => {
3503                    let mut builder = Int64Builder::with_capacity(total_items);
3504                    for obj in rows.iter().flatten().flatten() {
3505                        match obj.get(field_name).and_then(|v| v.as_i64()) {
3506                            Some(n) => builder.append_value(n),
3507                            None => builder.append_null(),
3508                        }
3509                    }
3510                    Arc::new(builder.finish()) as ArrayRef
3511                }
3512                DataType::Float64 => {
3513                    let mut builder = Float64Builder::with_capacity(total_items);
3514                    for obj in rows.iter().flatten().flatten() {
3515                        match obj.get(field_name).and_then(|v| v.as_f64()) {
3516                            Some(n) => builder.append_value(n),
3517                            None => builder.append_null(),
3518                        }
3519                    }
3520                    Arc::new(builder.finish()) as ArrayRef
3521                }
3522                // Typed `Map(_, Bytes)` value child: the schema field carries the
3523                // `uni_raw_bytes` marker; store each buffer verbatim in a
3524                // `LargeBinary` so the read path (`try_reconstruct_map`) decodes it
3525                // as raw `Bytes`. CV-encoded values lack the marker → string fallback.
3526                DataType::LargeBinary
3527                    if field
3528                        .metadata()
3529                        .get("uni_raw_bytes")
3530                        .is_some_and(|v| v == "true") =>
3531                {
3532                    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
3533                    for obj in rows.iter().flatten().flatten() {
3534                        match obj.get(field_name) {
3535                            Some(Value::Bytes(b)) => builder.append_value(b),
3536                            _ => builder.append_null(),
3537                        }
3538                    }
3539                    Arc::new(builder.finish()) as ArrayRef
3540                }
3541                // Fallback: serialize as string
3542                _ => {
3543                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3544                    for obj in rows.iter().flatten().flatten() {
3545                        match obj.get(field_name) {
3546                            Some(Value::Null) | None => builder.append_null(),
3547                            Some(other) => builder.append_value(format!("{other:?}")),
3548                        }
3549                    }
3550                    Arc::new(builder.finish()) as ArrayRef
3551                }
3552            }
3553        })
3554        .collect();
3555
3556    // Build struct array from children
3557    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3558        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3559
3560    // Build list offsets
3561    let mut offsets = Vec::with_capacity(vids.len() + 1);
3562    let mut nulls = Vec::with_capacity(vids.len());
3563    let mut offset = 0i32;
3564    offsets.push(offset);
3565    for row in &rows {
3566        match row {
3567            Some(objs) => {
3568                offset += objs.len() as i32;
3569                offsets.push(offset);
3570                nulls.push(true);
3571            }
3572            None => {
3573                offsets.push(offset);
3574                nulls.push(false);
3575            }
3576        }
3577    }
3578
3579    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3580    let list_array = arrow_array::ListArray::try_new(
3581        list_field,
3582        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3583        Arc::new(struct_array),
3584        Some(arrow::buffer::NullBuffer::from(nulls)),
3585    )
3586    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3587
3588    Ok(Arc::new(list_array))
3589}
3590
3591/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
3592/// so that `build_struct_property_column` can extract fields uniformly.
3593fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3594    use uni_common::value::TemporalValue;
3595    let mut m = HashMap::new();
3596    match tv {
3597        TemporalValue::DateTime {
3598            nanos_since_epoch,
3599            offset_seconds,
3600            timezone_name,
3601        } => {
3602            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3603            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3604            if let Some(tz) = timezone_name {
3605                m.insert("timezone_name".into(), Value::String(tz.clone()));
3606            }
3607        }
3608        TemporalValue::LocalDateTime { nanos_since_epoch } => {
3609            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3610        }
3611        TemporalValue::Time {
3612            nanos_since_midnight,
3613            offset_seconds,
3614        } => {
3615            m.insert(
3616                "nanos_since_midnight".into(),
3617                Value::Int(*nanos_since_midnight),
3618            );
3619            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3620        }
3621        TemporalValue::LocalTime {
3622            nanos_since_midnight,
3623        } => {
3624            m.insert(
3625                "nanos_since_midnight".into(),
3626                Value::Int(*nanos_since_midnight),
3627            );
3628        }
3629        TemporalValue::Date { days_since_epoch } => {
3630            m.insert(
3631                "days_since_epoch".into(),
3632                Value::Int(*days_since_epoch as i64),
3633            );
3634        }
3635        TemporalValue::Duration {
3636            months,
3637            days,
3638            nanos,
3639        } => {
3640            m.insert("months".into(), Value::Int(*months));
3641            m.insert("days".into(), Value::Int(*days));
3642            m.insert("nanos".into(), Value::Int(*nanos));
3643        }
3644        TemporalValue::Btic { lo, hi, meta } => {
3645            m.insert("lo".into(), Value::Int(*lo));
3646            m.insert("hi".into(), Value::Int(*hi));
3647            m.insert("meta".into(), Value::Int(*meta as i64));
3648        }
3649    }
3650    m
3651}
3652
3653/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
3654fn build_struct_property_column(
3655    vids: &[Vid],
3656    props_map: &HashMap<Vid, Properties>,
3657    prop_name: &str,
3658    fields: &Fields,
3659) -> DFResult<ArrayRef> {
3660    use arrow_array::StructArray;
3661
3662    // Convert raw values, expanding Temporal values into Map representation
3663    // so the struct field extraction below works uniformly.
3664    let values: Vec<Option<Value>> = vids
3665        .iter()
3666        .map(|vid| {
3667            let val = get_property_value(vid, props_map, prop_name);
3668            match val {
3669                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3670                other => other,
3671            }
3672        })
3673        .collect();
3674
3675    let child_arrays: Vec<ArrayRef> = fields
3676        .iter()
3677        .map(|field| {
3678            let field_name = field.name();
3679            match field.data_type() {
3680                DataType::Float64 => {
3681                    let mut builder = Float64Builder::with_capacity(vids.len());
3682                    for val in &values {
3683                        match val {
3684                            Some(Value::Map(obj)) => {
3685                                match obj.get(field_name).and_then(|v| v.as_f64()) {
3686                                    Some(n) => builder.append_value(n),
3687                                    None => builder.append_null(),
3688                                }
3689                            }
3690                            _ => builder.append_null(),
3691                        }
3692                    }
3693                    Arc::new(builder.finish()) as ArrayRef
3694                }
3695                DataType::Utf8 => {
3696                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3697                    for val in &values {
3698                        match val {
3699                            Some(Value::Map(obj)) => match obj.get(field_name) {
3700                                Some(Value::String(s)) => builder.append_value(s),
3701                                Some(Value::Null) | None => builder.append_null(),
3702                                Some(other) => builder.append_value(format!("{other:?}")),
3703                            },
3704                            _ => builder.append_null(),
3705                        }
3706                    }
3707                    Arc::new(builder.finish()) as ArrayRef
3708                }
3709                DataType::Int64 => {
3710                    let mut builder = Int64Builder::with_capacity(vids.len());
3711                    for val in &values {
3712                        match val {
3713                            Some(Value::Map(obj)) => {
3714                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3715                                    Some(n) => builder.append_value(n),
3716                                    None => builder.append_null(),
3717                                }
3718                            }
3719                            _ => builder.append_null(),
3720                        }
3721                    }
3722                    Arc::new(builder.finish()) as ArrayRef
3723                }
3724                DataType::Timestamp(_, _) => {
3725                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3726                    for val in &values {
3727                        match val {
3728                            Some(Value::Map(obj)) => {
3729                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3730                                    Some(n) => builder.append_value(n),
3731                                    None => builder.append_null(),
3732                                }
3733                            }
3734                            _ => builder.append_null(),
3735                        }
3736                    }
3737                    Arc::new(builder.finish()) as ArrayRef
3738                }
3739                DataType::Int32 => {
3740                    let mut builder = Int32Builder::with_capacity(vids.len());
3741                    for val in &values {
3742                        match val {
3743                            Some(Value::Map(obj)) => {
3744                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3745                                    Some(n) => builder.append_value(n as i32),
3746                                    None => builder.append_null(),
3747                                }
3748                            }
3749                            _ => builder.append_null(),
3750                        }
3751                    }
3752                    Arc::new(builder.finish()) as ArrayRef
3753                }
3754                DataType::Time64(_) => {
3755                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3756                    for val in &values {
3757                        match val {
3758                            Some(Value::Map(obj)) => {
3759                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3760                                    Some(n) => builder.append_value(n),
3761                                    None => builder.append_null(),
3762                                }
3763                            }
3764                            _ => builder.append_null(),
3765                        }
3766                    }
3767                    Arc::new(builder.finish()) as ArrayRef
3768                }
3769                // Fallback: serialize as string
3770                _ => {
3771                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3772                    for val in &values {
3773                        match val {
3774                            Some(Value::Map(obj)) => match obj.get(field_name) {
3775                                Some(Value::Null) | None => builder.append_null(),
3776                                Some(other) => builder.append_value(format!("{other:?}")),
3777                            },
3778                            _ => builder.append_null(),
3779                        }
3780                    }
3781                    Arc::new(builder.finish()) as ArrayRef
3782                }
3783            }
3784        })
3785        .collect();
3786
3787    // Build null bitmap — null when the value is null/missing
3788    let nulls: Vec<bool> = values
3789        .iter()
3790        .map(|v| matches!(v, Some(Value::Map(_))))
3791        .collect();
3792
3793    let struct_array = StructArray::try_new(
3794        fields.clone(),
3795        child_arrays,
3796        Some(arrow::buffer::NullBuffer::from(nulls)),
3797    )
3798    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3799
3800    Ok(Arc::new(struct_array))
3801}
3802
3803impl Stream for GraphScanStream {
3804    type Item = DFResult<RecordBatch>;
3805
3806    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3807        let metrics = self.metrics.clone();
3808        let _timer = metrics.elapsed_compute().timer();
3809        loop {
3810            // Use a temporary to avoid borrow issues
3811            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3812
3813            match state {
3814                GraphScanState::Init => {
3815                    // Create the future with cloned data for ownership
3816                    let graph_ctx = self.graph_ctx.clone();
3817                    let label = self.label.clone();
3818                    let variable = self.variable.clone();
3819                    let properties = self.properties.clone();
3820                    let is_edge_scan = self.is_edge_scan;
3821                    let is_schemaless = self.is_schemaless;
3822                    let filter = self.filter.clone();
3823                    let vid_list_filter = self.vid_list_filter.clone();
3824                    let extra_lance_filter = self.extra_lance_filter.clone();
3825                    let extra_runtime_filter = self.extra_runtime_filter.clone();
3826                    let schema = self.schema.clone();
3827
3828                    let fut = async move {
3829                        graph_ctx.check_timeout().map_err(|e| {
3830                            datafusion::error::DataFusionError::Execution(e.to_string())
3831                        })?;
3832
3833                        let batch = if is_edge_scan {
3834                            columnar_scan_edge_batch_static(
3835                                &graph_ctx,
3836                                &label,
3837                                &variable,
3838                                &properties,
3839                                &schema,
3840                            )
3841                            .await?
3842                        } else if is_schemaless {
3843                            columnar_scan_schemaless_vertex_batch_static(
3844                                &graph_ctx,
3845                                &label,
3846                                &variable,
3847                                &properties,
3848                                &schema,
3849                                &filter,
3850                                vid_list_filter.as_deref(),
3851                                extra_lance_filter.as_deref(),
3852                                extra_runtime_filter.as_ref(),
3853                            )
3854                            .await?
3855                        } else {
3856                            columnar_scan_vertex_batch_static(
3857                                &graph_ctx,
3858                                &label,
3859                                &variable,
3860                                &properties,
3861                                &schema,
3862                                &filter,
3863                                vid_list_filter.as_deref(),
3864                                extra_lance_filter.as_deref(),
3865                                extra_runtime_filter.as_ref(),
3866                            )
3867                            .await?
3868                        };
3869                        Ok(Some(batch))
3870                    };
3871
3872                    self.state = GraphScanState::Executing(Box::pin(fut));
3873                    // Continue loop to poll the future
3874                }
3875                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3876                    Poll::Ready(Ok(batch)) => {
3877                        self.state = GraphScanState::Done;
3878                        self.metrics
3879                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3880                        return Poll::Ready(batch.map(Ok));
3881                    }
3882                    Poll::Ready(Err(e)) => {
3883                        self.state = GraphScanState::Done;
3884                        return Poll::Ready(Some(Err(e)));
3885                    }
3886                    Poll::Pending => {
3887                        self.state = GraphScanState::Executing(fut);
3888                        return Poll::Pending;
3889                    }
3890                },
3891                GraphScanState::Done => {
3892                    return Poll::Ready(None);
3893                }
3894            }
3895        }
3896    }
3897}
3898
3899impl RecordBatchStream for GraphScanStream {
3900    fn schema(&self) -> SchemaRef {
3901        self.schema.clone()
3902    }
3903}
3904
3905#[cfg(test)]
3906mod tests {
3907    use super::*;
3908
3909    #[test]
3910    fn test_build_vertex_schema() {
3911        let uni_schema = UniSchema::default();
3912        let schema = GraphScanExec::build_vertex_schema(
3913            "n",
3914            "Person",
3915            &["name".to_string(), "age".to_string()],
3916            &uni_schema,
3917        );
3918
3919        assert_eq!(schema.fields().len(), 4);
3920        assert_eq!(schema.field(0).name(), "n._vid");
3921        assert_eq!(schema.field(1).name(), "n._labels");
3922        assert_eq!(schema.field(2).name(), "n.name");
3923        assert_eq!(schema.field(3).name(), "n.age");
3924    }
3925
3926    #[test]
3927    fn test_build_edge_schema() {
3928        let uni_schema = UniSchema::default();
3929        let schema =
3930            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3931
3932        assert_eq!(schema.fields().len(), 4);
3933        assert_eq!(schema.field(0).name(), "r._eid");
3934        assert_eq!(schema.field(1).name(), "r._src_vid");
3935        assert_eq!(schema.field(2).name(), "r._dst_vid");
3936        assert_eq!(schema.field(3).name(), "r.weight");
3937    }
3938
3939    #[test]
3940    fn test_build_schemaless_vertex_schema() {
3941        let empty_schema = uni_common::core::schema::Schema::default();
3942        let schema = GraphScanExec::build_schemaless_vertex_schema(
3943            "n",
3944            &["name".to_string(), "age".to_string()],
3945            &empty_schema,
3946        );
3947
3948        assert_eq!(schema.fields().len(), 4);
3949        assert_eq!(schema.field(0).name(), "n._vid");
3950        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3951        assert_eq!(schema.field(1).name(), "n._labels");
3952        assert_eq!(schema.field(2).name(), "n.name");
3953        // With empty schema, falls back to LargeBinary
3954        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3955        assert_eq!(schema.field(3).name(), "n.age");
3956        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3957    }
3958
3959    #[test]
3960    fn test_schemaless_all_scan_has_empty_label() {
3961        let empty_schema = uni_common::core::schema::Schema::default();
3962        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3963
3964        // Verify the schema has _vid and _labels columns for a scan with no properties
3965        assert_eq!(schema.fields().len(), 2);
3966        assert_eq!(schema.field(0).name(), "n._vid");
3967        assert_eq!(schema.field(1).name(), "n._labels");
3968    }
3969
3970    #[test]
3971    fn test_cypher_value_all_props_extraction() {
3972        // Encode a property map directly via the CypherValue codec (the path the
3973        // `_all_props` builders use).
3974        let map: HashMap<String, Value> = [
3975            ("age".to_string(), Value::Int(30)),
3976            ("name".to_string(), Value::String("Alice".to_string())),
3977        ]
3978        .into_iter()
3979        .collect();
3980        let cv_bytes = uni_common::cypher_value_codec::encode(&Value::Map(map));
3981
3982        // Decode and extract "age" value
3983        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3984        match decoded {
3985            uni_common::Value::Map(map) => {
3986                let age_val = map.get("age").unwrap();
3987                assert_eq!(age_val, &uni_common::Value::Int(30));
3988            }
3989            _ => panic!("Expected Map"),
3990        }
3991
3992        // Also test single value encoding
3993        let single_bytes = uni_common::cypher_value_codec::encode(&Value::Int(30));
3994        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3995        assert_eq!(single_decoded, uni_common::Value::Int(30));
3996    }
3997
3998    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3999    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
4000        let schema = Arc::new(Schema::new(vec![
4001            Field::new("_vid", DataType::UInt64, false),
4002            Field::new("_deleted", DataType::Boolean, false),
4003            Field::new("_version", DataType::UInt64, false),
4004            Field::new("name", DataType::Utf8, true),
4005        ]));
4006        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
4007        let names: Vec<String> = vids
4008            .iter()
4009            .zip(versions.iter())
4010            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
4011            .collect();
4012        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
4013
4014        RecordBatch::try_new(
4015            schema,
4016            vec![
4017                Arc::new(UInt64Array::from(vids.to_vec())),
4018                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
4019                Arc::new(UInt64Array::from(versions.to_vec())),
4020                Arc::new(name_arr),
4021            ],
4022        )
4023        .unwrap()
4024    }
4025
4026    #[test]
4027    fn test_mvcc_dedup_multiple_versions() {
4028        // VID 1 at versions 3, 1, 5 — should keep version 5
4029        // VID 2 at versions 2, 4 — should keep version 4
4030        let batch = make_mvcc_batch(
4031            &[1, 1, 1, 2, 2],
4032            &[3, 1, 5, 2, 4],
4033            &[false, false, false, false, false],
4034        );
4035
4036        let result = mvcc_dedup_batch(&batch).unwrap();
4037        assert_eq!(result.num_rows(), 2);
4038
4039        let vid_col = result
4040            .column_by_name("_vid")
4041            .unwrap()
4042            .as_any()
4043            .downcast_ref::<UInt64Array>()
4044            .unwrap();
4045        let ver_col = result
4046            .column_by_name("_version")
4047            .unwrap()
4048            .as_any()
4049            .downcast_ref::<UInt64Array>()
4050            .unwrap();
4051        let name_col = result
4052            .column_by_name("name")
4053            .unwrap()
4054            .as_any()
4055            .downcast_ref::<arrow_array::StringArray>()
4056            .unwrap();
4057
4058        // VID 1 → version 5, VID 2 → version 4
4059        assert_eq!(vid_col.value(0), 1);
4060        assert_eq!(ver_col.value(0), 5);
4061        assert_eq!(name_col.value(0), "v1_ver5");
4062
4063        assert_eq!(vid_col.value(1), 2);
4064        assert_eq!(ver_col.value(1), 4);
4065        assert_eq!(name_col.value(1), "v2_ver4");
4066    }
4067
4068    #[test]
4069    fn test_mvcc_dedup_single_rows() {
4070        // Each VID appears once — nothing should change
4071        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4072        let result = mvcc_dedup_batch(&batch).unwrap();
4073        assert_eq!(result.num_rows(), 3);
4074    }
4075
4076    #[test]
4077    fn test_mvcc_dedup_empty() {
4078        let batch = make_mvcc_batch(&[], &[], &[]);
4079        let result = mvcc_dedup_batch(&batch).unwrap();
4080        assert_eq!(result.num_rows(), 0);
4081    }
4082
4083    #[test]
4084    fn test_filter_l0_tombstones_removes_tombstoned() {
4085        use crate::query::df_graph::L0Context;
4086
4087        // Create a batch with VIDs 1, 2, 3
4088        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4089
4090        // Create L0 context with VID 2 tombstoned
4091        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
4092        {
4093            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
4094            // But we can't easily create one with tombstones through the constructor.
4095            // Use a direct approach.
4096        }
4097        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
4098        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
4099
4100        let l0_ctx = L0Context {
4101            current_l0: Some(l0_buf),
4102            transaction_l0: None,
4103            pending_flush_l0s: vec![],
4104        };
4105
4106        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4107        assert_eq!(result.num_rows(), 2);
4108
4109        let vid_col = result
4110            .column_by_name("_vid")
4111            .unwrap()
4112            .as_any()
4113            .downcast_ref::<UInt64Array>()
4114            .unwrap();
4115        assert_eq!(vid_col.value(0), 1);
4116        assert_eq!(vid_col.value(1), 3);
4117    }
4118
4119    #[test]
4120    fn test_filter_l0_tombstones_none() {
4121        use crate::query::df_graph::L0Context;
4122
4123        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4124        let l0_ctx = L0Context::default();
4125
4126        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4127        assert_eq!(result.num_rows(), 3);
4128    }
4129
4130    #[test]
4131    fn test_map_to_output_schema_basic() {
4132        use crate::query::df_graph::L0Context;
4133
4134        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
4135        let lance_schema = Arc::new(Schema::new(vec![
4136            Field::new("_vid", DataType::UInt64, false),
4137            Field::new("_deleted", DataType::Boolean, false),
4138            Field::new("_version", DataType::UInt64, false),
4139            Field::new("name", DataType::Utf8, true),
4140        ]));
4141        let name_arr: arrow_array::StringArray =
4142            vec![Some("Alice"), Some("Bob")].into_iter().collect();
4143        let batch = RecordBatch::try_new(
4144            lance_schema,
4145            vec![
4146                Arc::new(UInt64Array::from(vec![1u64, 2])),
4147                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
4148                Arc::new(UInt64Array::from(vec![1u64, 1])),
4149                Arc::new(name_arr),
4150            ],
4151        )
4152        .unwrap();
4153
4154        // Output schema: n._vid, n._labels, n.name
4155        let output_schema = Arc::new(Schema::new(vec![
4156            Field::new("n._vid", DataType::UInt64, false),
4157            Field::new("n._labels", labels_data_type(), true),
4158            Field::new("n.name", DataType::Utf8, true),
4159        ]));
4160
4161        let l0_ctx = L0Context::default();
4162        let result = map_to_output_schema(
4163            &batch,
4164            "Person",
4165            "n",
4166            &["name".to_string()],
4167            &output_schema,
4168            &l0_ctx,
4169        )
4170        .unwrap();
4171
4172        assert_eq!(result.num_rows(), 2);
4173        assert_eq!(result.schema().fields().len(), 3);
4174        assert_eq!(result.schema().field(0).name(), "n._vid");
4175        assert_eq!(result.schema().field(1).name(), "n._labels");
4176        assert_eq!(result.schema().field(2).name(), "n.name");
4177
4178        // Check name values carried through
4179        let name_col = result
4180            .column(2)
4181            .as_any()
4182            .downcast_ref::<arrow_array::StringArray>()
4183            .unwrap();
4184        assert_eq!(name_col.value(0), "Alice");
4185        assert_eq!(name_col.value(1), "Bob");
4186    }
4187}