Skip to main content

uni_query/query/df_graph/
scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph scan execution plan for DataFusion.
5//!
6//! This module provides [`GraphScanExec`], a DataFusion `ExecutionPlan` that scans
7//! vertices or edges from storage with property materialization. It wraps the
8//! underlying Lance table scan with:
9//!
10//! - MVCC resolution via L0 buffer overlays
11//! - Property column materialization from `PropertyManager`
12//! - Filter pushdown to storage layer
13//!
14//! # Column Naming Convention
15//!
16//! Properties are materialized as columns named `{variable}.{property}`:
17//! - `n.name` - property "name" for variable "n"
18//! - `n.age` - property "age" for variable "n"
19//!
20//! System columns use underscore prefix:
21//! - `_vid` - vertex ID
22//! - `_eid` - edge ID
23//! - `_src_vid` - source vertex ID (edges only)
24//! - `_dst_vid` - destination vertex ID (edges only)
25
26use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30    BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31    Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32    Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54/// Graph scan execution plan.
55///
56/// Scans vertices or edges from storage with property materialization.
57/// This wraps the underlying Lance table scan with MVCC resolution and
58/// property loading.
59///
60/// # Example
61///
62/// ```ignore
63/// // Create a scan for Person vertices with name and age properties
64/// let scan = GraphScanExec::new(
65///     graph_ctx,
66///     "Person",
67///     "n",
68///     vec!["name".to_string(), "age".to_string()],
69///     None, // No filter
70/// );
71///
72/// let stream = scan.execute(0, task_ctx)?;
73/// // Stream yields batches with columns: _vid, n.name, n.age
74/// ```
75pub struct GraphScanExec {
76    /// Graph execution context with storage and L0 access.
77    graph_ctx: Arc<GraphExecutionContext>,
78
79    /// Label name for vertex scan, or edge type for edge scan.
80    label: String,
81
82    /// Variable name for column prefixing.
83    variable: String,
84
85    /// Properties to materialize as columns.
86    projected_properties: Vec<String>,
87
88    /// Filter expression to push down (used for L0 short-circuit and
89    /// single-VID Lance pushdown). For multi-VID IN-list pushdown, use
90    /// `vid_list_filter` — see issue #55 PR #4.
91    filter: Option<Arc<dyn PhysicalExpr>>,
92
93    /// Multi-VID IN-list filter to push to Lance as `_vid IN (v1, v2, ...)`.
94    /// Set when the planner has resolved a static set of vids from an
95    /// `Expr::In { Property(_, "_vid"), List }` predicate. Bypasses the
96    /// PhysicalExpr roundtrip used by `filter`. See issue #55 PR #4.
97    vid_list_filter: Option<Vec<u64>>,
98
99    /// Pre-rendered Lance filter string for indexed-property pushdown
100    /// (e.g. `name = 'foo'`). AND-combined with the VID filter at scan time.
101    /// Populated by the planner when an indexed-property equality / IN
102    /// predicate is detected — Lance turns it into a hash-index lookup.
103    /// See issue #57.
104    extra_lance_filter: Option<String>,
105
106    /// Arrow-side equivalent of `extra_lance_filter`, applied to the merged
107    /// (Lance + L0) batch in-process so the scan output reflects only
108    /// matching rows even when data is still in L0 (Lance pushdown alone
109    /// can't reach uncommitted/unflushed rows). See issue #57.
110    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112    /// Whether this is an edge scan (vs vertex scan).
113    is_edge_scan: bool,
114
115    /// Whether this is a schemaless scan (uses main table instead of per-label table).
116    is_schemaless: bool,
117
118    /// Output schema with materialized property columns.
119    schema: SchemaRef,
120
121    /// Cached plan properties.
122    properties: Arc<PlanProperties>,
123
124    /// Metrics for execution tracking.
125    metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("GraphScanExec")
131            .field("label", &self.label)
132            .field("variable", &self.variable)
133            .field("projected_properties", &self.projected_properties)
134            .field(
135                "vid_list_filter_len",
136                &self.vid_list_filter.as_ref().map(Vec::len),
137            )
138            .field("is_edge_scan", &self.is_edge_scan)
139            .finish()
140    }
141}
142
143impl GraphScanExec {
144    /// Attach a multi-VID IN-list filter to this scan, pushing
145    /// `_vid IN (v1, v2, ...)` to Lance at execute time. Use this for
146    /// pre-resolved vid sets (e.g. from `UNWIND $list AS e WHERE id(x)=e.field`).
147    /// See issue #55 PR #4.
148    pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149        self.vid_list_filter = Some(vids);
150        self
151    }
152
153    /// Attach a pre-rendered Lance filter string for indexed-property
154    /// pushdown. AND-combined with any VID filter at scan time. See
155    /// issue #57.
156    pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157        self.extra_lance_filter = Some(filter);
158        self
159    }
160
161    /// Attach the Arrow-side counterpart of `extra_lance_filter`. Applied to
162    /// the merged (Lance + L0) batch so the scan output is index-bound even
163    /// for not-yet-flushed L0 rows. See issue #57.
164    pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165        self.extra_runtime_filter = Some(filter);
166        self
167    }
168
169    /// Whether this scan has an extra Lance filter pushed in. Used by the
170    /// EXPLAIN/IndexUsage path to confirm the planner actually pushed.
171    pub fn has_extra_lance_filter(&self) -> bool {
172        self.extra_lance_filter.is_some()
173    }
174
175    /// Execute this scan once with a runtime-supplied list of VIDs as the
176    /// pushdown filter (`_vid IN (v1, v2, ...)`). Returns a single merged
177    /// `RecordBatch`. Used by `VidLookupJoinExec` (issue #55 PR #5) for
178    /// cross-MATCH dynamic pushdown — the build side materializes its keys at
179    /// runtime, then the probe scan runs once with those keys.
180    ///
181    /// Only supported for vertex and schemaless-vertex scans; edge scans
182    /// have a different shape and aren't currently a join target for this
183    /// optimization.
184    pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185        if self.is_edge_scan {
186            return Err(datafusion::error::DataFusionError::Plan(
187                "execute_with_vid_filter not supported for edge scans".into(),
188            ));
189        }
190        if self.is_schemaless {
191            columnar_scan_schemaless_vertex_batch_static(
192                &self.graph_ctx,
193                &self.label,
194                &self.variable,
195                &self.projected_properties,
196                &self.schema,
197                &self.filter,
198                Some(vids),
199                self.extra_lance_filter.as_deref(),
200                self.extra_runtime_filter.as_ref(),
201            )
202            .await
203        } else {
204            columnar_scan_vertex_batch_static(
205                &self.graph_ctx,
206                &self.label,
207                &self.variable,
208                &self.projected_properties,
209                &self.schema,
210                &self.filter,
211                Some(vids),
212                self.extra_lance_filter.as_deref(),
213                self.extra_runtime_filter.as_ref(),
214            )
215            .await
216        }
217    }
218}
219
220impl GraphScanExec {
221    /// Create a new graph scan for vertices.
222    ///
223    /// Scans all vertices of the given label from storage and L0 buffers,
224    /// then materializes the requested properties.
225    pub fn new_vertex_scan(
226        graph_ctx: Arc<GraphExecutionContext>,
227        label: impl Into<String>,
228        variable: impl Into<String>,
229        projected_properties: Vec<String>,
230        filter: Option<Arc<dyn PhysicalExpr>>,
231    ) -> Self {
232        let label = label.into();
233        let variable = variable.into();
234
235        // Build output schema with proper types from Uni schema
236        let uni_schema = graph_ctx.storage().schema_manager().schema();
237        let schema =
238            Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240        let properties = compute_plan_properties(schema.clone());
241
242        Self {
243            graph_ctx,
244            label,
245            variable,
246            projected_properties,
247            filter,
248            vid_list_filter: None,
249            extra_lance_filter: None,
250            extra_runtime_filter: None,
251            is_edge_scan: false,
252            is_schemaless: false,
253            schema,
254            properties,
255            metrics: ExecutionPlanMetricsSet::new(),
256        }
257    }
258
259    /// Create a new schemaless vertex scan.
260    ///
261    /// Scans the main vertices table for vertices with the given label name.
262    /// Properties are extracted from props_json (all treated as Utf8/JSON).
263    /// This is used for labels that aren't in the schema.
264    pub fn new_schemaless_vertex_scan(
265        graph_ctx: Arc<GraphExecutionContext>,
266        label_name: impl Into<String>,
267        variable: impl Into<String>,
268        projected_properties: Vec<String>,
269        filter: Option<Arc<dyn PhysicalExpr>>,
270    ) -> Self {
271        let label = label_name.into();
272        let variable = variable.into();
273
274        // Filter out system columns that are already materialized as dedicated columns
275        // (_vid as UInt64, _labels as List<Utf8>). If these appear in projected_properties
276        // (e.g., from collect_properties_from_plan extracting _vid from filter expressions),
277        // they would create duplicate columns with conflicting types.
278        let projected_properties: Vec<String> = projected_properties
279            .into_iter()
280            .filter(|p| p != "_vid" && p != "_labels")
281            .collect();
282
283        let uni_schema = graph_ctx.storage().schema_manager().schema();
284        let schema =
285            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286        let properties = compute_plan_properties(schema.clone());
287
288        Self {
289            graph_ctx,
290            label,
291            variable,
292            projected_properties,
293            filter,
294            vid_list_filter: None,
295            extra_lance_filter: None,
296            extra_runtime_filter: None,
297            is_edge_scan: false,
298            is_schemaless: true,
299            schema,
300            properties,
301            metrics: ExecutionPlanMetricsSet::new(),
302        }
303    }
304
305    /// Create a new multi-label vertex scan using the main vertices table.
306    ///
307    /// Scans for vertices that have ALL specified labels (intersection semantics).
308    /// Properties are extracted from props_json (schemaless).
309    pub fn new_multi_label_vertex_scan(
310        graph_ctx: Arc<GraphExecutionContext>,
311        labels: Vec<String>,
312        variable: impl Into<String>,
313        projected_properties: Vec<String>,
314        filter: Option<Arc<dyn PhysicalExpr>>,
315    ) -> Self {
316        let variable = variable.into();
317        let projected_properties: Vec<String> = projected_properties
318            .into_iter()
319            .filter(|p| p != "_vid" && p != "_labels")
320            .collect();
321        let uni_schema = graph_ctx.storage().schema_manager().schema();
322        let schema =
323            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324        let properties = compute_plan_properties(schema.clone());
325
326        // Encode labels as colon-separated for the stream to parse
327        let encoded_labels = labels.join(":");
328
329        Self {
330            graph_ctx,
331            label: encoded_labels,
332            variable,
333            projected_properties,
334            filter,
335            vid_list_filter: None,
336            extra_lance_filter: None,
337            extra_runtime_filter: None,
338            is_edge_scan: false,
339            is_schemaless: true,
340            schema,
341            properties,
342            metrics: ExecutionPlanMetricsSet::new(),
343        }
344    }
345
346    /// Create a new schemaless scan for all vertices.
347    ///
348    /// Scans the main vertices table for all vertices regardless of label.
349    /// Properties are extracted from props_json with types resolved from the schema.
350    /// This is used for `MATCH (n)` without label filter.
351    pub fn new_schemaless_all_scan(
352        graph_ctx: Arc<GraphExecutionContext>,
353        variable: impl Into<String>,
354        projected_properties: Vec<String>,
355        filter: Option<Arc<dyn PhysicalExpr>>,
356    ) -> Self {
357        let variable = variable.into();
358        let projected_properties: Vec<String> = projected_properties
359            .into_iter()
360            .filter(|p| p != "_vid" && p != "_labels")
361            .collect();
362
363        let uni_schema = graph_ctx.storage().schema_manager().schema();
364        let schema =
365            Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366        let properties = compute_plan_properties(schema.clone());
367
368        Self {
369            graph_ctx,
370            label: String::new(), // Empty label signals "scan all vertices"
371            variable,
372            projected_properties,
373            filter,
374            vid_list_filter: None,
375            extra_lance_filter: None,
376            extra_runtime_filter: None,
377            is_edge_scan: false,
378            is_schemaless: true,
379            schema,
380            properties,
381            metrics: ExecutionPlanMetricsSet::new(),
382        }
383    }
384
385    /// Build schema for schemaless vertex scan.
386    ///
387    /// Resolves property types from all labels in the schema. Falls back to
388    /// LargeBinary (CypherValue encoding) for properties not found in any
389    /// label's schema.
390    fn build_schemaless_vertex_schema(
391        variable: &str,
392        properties: &[String],
393        uni_schema: &uni_common::core::schema::Schema,
394    ) -> SchemaRef {
395        // Merge property metadata from all labels for type resolution.
396        let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397            std::collections::HashMap::new();
398        for label_props in uni_schema.properties.values() {
399            for (name, meta) in label_props {
400                merged.entry(name.as_str()).or_insert(meta);
401            }
402        }
403
404        let mut fields = vec![
405            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406            Field::new(format!("{}._labels", variable), labels_data_type(), true),
407        ];
408
409        for prop in properties {
410            let col_name = format!("{}.{}", variable, prop);
411            let arrow_type = merged
412                .get(prop.as_str())
413                .map(|meta| meta.r#type.to_arrow())
414                .unwrap_or(DataType::LargeBinary);
415            fields.push(Field::new(&col_name, arrow_type, true));
416        }
417
418        Arc::new(Schema::new(fields))
419    }
420
421    /// Create a new graph scan for edges.
422    ///
423    /// Scans all edges of the given type from storage and L0 buffers,
424    /// then materializes the requested properties.
425    pub fn new_edge_scan(
426        graph_ctx: Arc<GraphExecutionContext>,
427        edge_type: impl Into<String>,
428        variable: impl Into<String>,
429        projected_properties: Vec<String>,
430        filter: Option<Arc<dyn PhysicalExpr>>,
431    ) -> Self {
432        let label = edge_type.into();
433        let variable = variable.into();
434
435        // Build output schema with proper types from Uni schema
436        let uni_schema = graph_ctx.storage().schema_manager().schema();
437        let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439        let properties = compute_plan_properties(schema.clone());
440
441        Self {
442            graph_ctx,
443            label,
444            variable,
445            projected_properties,
446            filter,
447            vid_list_filter: None,
448            extra_lance_filter: None,
449            extra_runtime_filter: None,
450            is_edge_scan: true,
451            is_schemaless: false,
452            schema,
453            properties,
454            metrics: ExecutionPlanMetricsSet::new(),
455        }
456    }
457
458    /// Build output schema for vertex scan with proper Arrow types.
459    fn build_vertex_schema(
460        variable: &str,
461        label: &str,
462        properties: &[String],
463        uni_schema: &UniSchema,
464    ) -> SchemaRef {
465        let mut fields = vec![
466            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467            Field::new(format!("{}._labels", variable), labels_data_type(), true),
468        ];
469        let label_props = uni_schema.properties.get(label);
470        for prop in properties {
471            let col_name = format!("{}.{}", variable, prop);
472            let arrow_type = resolve_property_type(prop, label_props);
473            fields.push(Field::new(&col_name, arrow_type, true));
474        }
475        Arc::new(Schema::new(fields))
476    }
477
478    /// Build output schema for edge scan with proper Arrow types.
479    fn build_edge_schema(
480        variable: &str,
481        edge_type: &str,
482        properties: &[String],
483        uni_schema: &UniSchema,
484    ) -> SchemaRef {
485        let mut fields = vec![
486            Field::new(format!("{}._eid", variable), DataType::UInt64, false),
487            Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
488            Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
489        ];
490        let edge_props = uni_schema.properties.get(edge_type);
491        for prop in properties {
492            let col_name = format!("{}.{}", variable, prop);
493            let arrow_type = resolve_property_type(prop, edge_props);
494            fields.push(Field::new(&col_name, arrow_type, true));
495        }
496        Arc::new(Schema::new(fields))
497    }
498}
499
500impl DisplayAs for GraphScanExec {
501    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502        let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
503        write!(
504            f,
505            "GraphScanExec: {}={}, properties={:?}",
506            scan_type, self.label, self.projected_properties
507        )?;
508        if self.filter.is_some() {
509            write!(f, ", filter=<pushed>")?;
510        }
511        Ok(())
512    }
513}
514
515impl ExecutionPlan for GraphScanExec {
516    fn name(&self) -> &str {
517        "GraphScanExec"
518    }
519
520    fn as_any(&self) -> &dyn Any {
521        self
522    }
523
524    fn schema(&self) -> SchemaRef {
525        self.schema.clone()
526    }
527
528    fn properties(&self) -> &Arc<PlanProperties> {
529        &self.properties
530    }
531
532    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
533        vec![]
534    }
535
536    fn with_new_children(
537        self: Arc<Self>,
538        children: Vec<Arc<dyn ExecutionPlan>>,
539    ) -> DFResult<Arc<dyn ExecutionPlan>> {
540        if children.is_empty() {
541            Ok(self)
542        } else {
543            Err(datafusion::error::DataFusionError::Plan(
544                "GraphScanExec does not accept children".to_string(),
545            ))
546        }
547    }
548
549    fn execute(
550        &self,
551        partition: usize,
552        _context: Arc<TaskContext>,
553    ) -> DFResult<SendableRecordBatchStream> {
554        let metrics = BaselineMetrics::new(&self.metrics, partition);
555
556        Ok(Box::pin(GraphScanStream::new(
557            self.graph_ctx.clone(),
558            self.label.clone(),
559            self.variable.clone(),
560            self.projected_properties.clone(),
561            self.is_edge_scan,
562            self.is_schemaless,
563            self.filter.clone(),
564            self.vid_list_filter.clone(),
565            self.extra_lance_filter.clone(),
566            self.extra_runtime_filter.clone(),
567            self.schema.clone(),
568            metrics,
569        )))
570    }
571
572    fn metrics(&self) -> Option<MetricsSet> {
573        Some(self.metrics.clone_inner())
574    }
575}
576
577/// State machine for graph scan stream execution.
578enum GraphScanState {
579    /// Initial state, ready to start scanning.
580    Init,
581    /// Executing the async scan.
582    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
583    /// Stream is done.
584    Done,
585}
586
587/// Stream that scans vertices or edges and materializes properties.
588///
589/// For known-label vertex scans, uses a single columnar Lance query with
590/// MVCC dedup and L0 overlay. For edge and schemaless scans, falls back
591/// to the two-phase VID-scan + property-materialize flow.
592struct GraphScanStream {
593    /// Graph execution context.
594    graph_ctx: Arc<GraphExecutionContext>,
595
596    /// Label (vertex) or edge type name.
597    label: String,
598
599    /// Variable name for column prefixing (e.g., "n" in `n.name`).
600    variable: String,
601
602    /// Properties to materialize.
603    properties: Vec<String>,
604
605    /// Whether this is an edge scan.
606    is_edge_scan: bool,
607
608    /// Whether this is a schemaless scan.
609    is_schemaless: bool,
610
611    /// Pushed-down filter expression (used for VID short-circuit in L0 scans).
612    filter: Option<Arc<dyn PhysicalExpr>>,
613
614    /// Multi-VID IN-list filter for Lance pushdown. See issue #55 PR #4.
615    vid_list_filter: Option<Vec<u64>>,
616
617    /// Extra Lance filter string (e.g. `name = 'foo'`) for indexed-property
618    /// pushdown. See issue #57.
619    extra_lance_filter: Option<String>,
620
621    /// Arrow-side equivalent of `extra_lance_filter`. See issue #57.
622    extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
623
624    /// Output schema.
625    schema: SchemaRef,
626
627    /// Stream state.
628    state: GraphScanState,
629
630    /// Metrics.
631    metrics: BaselineMetrics,
632}
633
634impl GraphScanStream {
635    /// Create a new graph scan stream.
636    #[expect(clippy::too_many_arguments)]
637    fn new(
638        graph_ctx: Arc<GraphExecutionContext>,
639        label: String,
640        variable: String,
641        properties: Vec<String>,
642        is_edge_scan: bool,
643        is_schemaless: bool,
644        filter: Option<Arc<dyn PhysicalExpr>>,
645        vid_list_filter: Option<Vec<u64>>,
646        extra_lance_filter: Option<String>,
647        extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
648        schema: SchemaRef,
649        metrics: BaselineMetrics,
650    ) -> Self {
651        Self {
652            graph_ctx,
653            label,
654            variable,
655            properties,
656            is_edge_scan,
657            is_schemaless,
658            filter,
659            vid_list_filter,
660            extra_lance_filter,
661            extra_runtime_filter,
662            schema,
663            state: GraphScanState::Init,
664            metrics,
665        }
666    }
667}
668
669/// Resolve the Arrow data type for a property, handling system columns like `overflow_json`.
670///
671/// Falls back to `LargeBinary` (CypherValue) if the property is not found in the schema,
672/// preserving original value types for overflow/unknown properties.
673pub(crate) fn resolve_property_type(
674    prop: &str,
675    schema_props: Option<
676        &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
677    >,
678) -> DataType {
679    if prop == "overflow_json" {
680        DataType::LargeBinary
681    } else if prop == "_created_at" || prop == "_updated_at" {
682        // System-managed timestamps surfaced via `created_at(n)` /
683        // `updated_at(n)`. Stored on every vertex/edge by the L0 buffer
684        // and the on-disk Arrow tables as Timestamp(Nanosecond, UTC).
685        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
686    } else {
687        schema_props
688            .and_then(|props| props.get(prop))
689            .map(|meta| meta.r#type.to_arrow())
690            .unwrap_or(DataType::LargeBinary)
691    }
692}
693
694// ============================================================================
695// Columnar-first scan helpers
696// ============================================================================
697
698/// MVCC deduplication: keep only the highest-version row for each `_vid`.
699///
700/// Sorts by (_vid ASC, _version DESC), then keeps the first occurrence of each
701/// _vid (= the highest version). This is a pure Arrow-compute operation.
702#[cfg(test)]
703fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
704    mvcc_dedup_batch_by(batch, "_vid")
705}
706
707/// Dedup a Lance batch and return `Some` only when rows remain.
708///
709/// Wraps the common pattern of dedup + empty-check that appears in every
710/// columnar scan path (vertex, edge, schemaless).
711fn mvcc_dedup_to_option(
712    batch: Option<RecordBatch>,
713    id_column: &str,
714) -> DFResult<Option<RecordBatch>> {
715    match batch {
716        Some(b) => {
717            let deduped = mvcc_dedup_batch_by(&b, id_column)?;
718            Ok(if deduped.num_rows() > 0 {
719                Some(deduped)
720            } else {
721                None
722            })
723        }
724        None => Ok(None),
725    }
726}
727
728/// Merge a deduped Lance batch with an L0 batch, re-deduplicating the combined
729/// result. Returns an empty batch (against `output_schema`) when both inputs
730/// are empty.
731fn merge_lance_and_l0(
732    lance_deduped: Option<RecordBatch>,
733    l0_batch: RecordBatch,
734    internal_schema: &SchemaRef,
735    id_column: &str,
736) -> DFResult<Option<RecordBatch>> {
737    let has_l0 = l0_batch.num_rows() > 0;
738    match (lance_deduped, has_l0) {
739        (Some(lance), true) => {
740            let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
741                .map_err(arrow_err)?;
742            Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
743        }
744        (Some(lance), false) => Ok(Some(lance)),
745        (None, true) => Ok(Some(l0_batch)),
746        (None, false) => Ok(None),
747    }
748}
749
750/// Drop rows superseded by a newer persisted version that the pushed
751/// property predicate filtered out (issue #57 × MVCC-append tables).
752///
753/// Lance evaluates a pushed property predicate per ROW, before the per-vid
754/// max-`_version` pick — so when a vid's property was rewritten and
755/// re-flushed, its CURRENT row fails the predicate and never reaches the
756/// dedup, while the stale still-matching row wins it by default. Re-reads
757/// `_vid`/`_version` for the candidate vids WITHOUT the property predicate
758/// (per-label table when `label_table` is `Some`, the main vertex table
759/// otherwise) and keeps only rows carrying their vid's true maximum
760/// persisted version. Must run on the RAW filtered batch, before
761/// [`mvcc_dedup_to_option`].
762async fn drop_superseded_pushdown_rows(
763    storage: &Arc<uni_store::storage::manager::StorageManager>,
764    label_table: Option<&str>,
765    batch: RecordBatch,
766) -> DFResult<RecordBatch> {
767    if batch.num_rows() == 0 {
768        return Ok(batch);
769    }
770    let (Some(vid_col), Some(ver_col)) = (
771        batch
772            .column_by_name("_vid")
773            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
774        batch
775            .column_by_name("_version")
776            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
777    ) else {
778        return Err(datafusion::error::DataFusionError::Execution(
779            "pushdown version verification: scan batch missing _vid/_version".to_string(),
780        ));
781    };
782
783    let mut candidates: Vec<u64> = Vec::new();
784    let mut seen: HashSet<u64> = HashSet::new();
785    for i in 0..vid_col.len() {
786        let vid = vid_col.value(i);
787        if seen.insert(vid) {
788            candidates.push(vid);
789        }
790    }
791
792    // True max persisted version per candidate vid — unfiltered apart from
793    // the vid list, so rewritten-key rows and deletion tombstones are seen.
794    // Chunked to bound the `_vid IN (…)` filter-string size.
795    const VERIFY_CHUNK: usize = 1000;
796    let mut max_ver: HashMap<u64, u64> = HashMap::with_capacity(candidates.len());
797    for chunk in candidates.chunks(VERIFY_CHUNK) {
798        let filter = format_vid_in_list(chunk);
799        let scanned = match label_table {
800            Some(label) => {
801                storage
802                    .scan_vertex_table(label, &["_vid", "_version"], Some(&filter))
803                    .await
804            }
805            None => {
806                storage
807                    .scan_main_vertex_table(&["_vid", "_version"], Some(&filter))
808                    .await
809            }
810        }
811        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
812        let Some(vbatch) = scanned else { continue };
813        let (Some(v_vid), Some(v_ver)) = (
814            vbatch
815                .column_by_name("_vid")
816                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
817            vbatch
818                .column_by_name("_version")
819                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
820        ) else {
821            return Err(datafusion::error::DataFusionError::Execution(
822                "pushdown version verification: rescan missing _vid/_version".to_string(),
823            ));
824        };
825        for i in 0..v_vid.len() {
826            let entry = max_ver.entry(v_vid.value(i)).or_insert(0);
827            *entry = (*entry).max(v_ver.value(i));
828        }
829    }
830
831    let keep: arrow_array::BooleanArray = (0..batch.num_rows())
832        .map(|i| {
833            Some(
834                max_ver
835                    .get(&vid_col.value(i))
836                    .is_none_or(|&max| ver_col.value(i) >= max),
837            )
838        })
839        .collect();
840    arrow::compute::filter_record_batch(&batch, &keep).map_err(arrow_err)
841}
842
843/// Push `col_name` into `columns` if not already present.
844///
845/// Avoids the verbose `!columns.contains(&col_name.to_string())` pattern
846/// that creates a temporary `String` allocation on every check.
847fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
848    if !columns.iter().any(|c| c == col_name) {
849        columns.push(col_name.to_string());
850    }
851}
852
853/// Extract a property value from an overflow_json CypherValue blob.
854///
855/// Returns the raw CypherValue bytes for `prop` if found in the blob,
856/// or `None` if the blob is null or the key is absent.
857fn extract_from_overflow_blob(
858    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
859    row: usize,
860    prop: &str,
861) -> Option<Vec<u8>> {
862    let arr = overflow_arr?;
863    if arr.is_null(row) {
864        return None;
865    }
866    uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
867}
868
869/// Build a `LargeBinary` column by extracting a property from overflow_json
870/// blobs, with L0 buffer overlay.
871///
872/// For each row, checks L0 buffers first (later buffers take precedence).
873/// If the property is not in L0, falls back to extracting from the
874/// overflow_json CypherValue blob.
875fn build_overflow_property_column(
876    num_rows: usize,
877    vid_arr: &UInt64Array,
878    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
879    prop: &str,
880    l0_ctx: &crate::query::df_graph::L0Context,
881) -> ArrayRef {
882    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
883    for i in 0..num_rows {
884        let vid = Vid::from(vid_arr.value(i));
885
886        // Check L0 buffers (later overwrites earlier)
887        let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
888
889        if let Some(val_opt) = l0_val {
890            append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
891        } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
892            builder.append_value(&bytes);
893        } else {
894            builder.append_null();
895        }
896    }
897    Arc::new(builder.finish())
898}
899
900/// Resolve a property value from the L0 visibility chain.
901///
902/// Returns `Some(Some(val))` when the property exists with a non-null value,
903/// `Some(None)` when it exists but is null, and `None` when no L0 buffer
904/// has the property.
905fn resolve_l0_property(
906    vid: &Vid,
907    prop: &str,
908    l0_ctx: &crate::query::df_graph::L0Context,
909) -> Option<Option<Value>> {
910    let mut result = None;
911    for l0 in l0_ctx.iter_l0_buffers() {
912        let guard = l0.read();
913        if let Some(props) = guard.vertex_properties.get(vid)
914            && let Some(val) = props.get(prop)
915        {
916            result = Some(Some(val.clone()));
917        }
918    }
919    result
920}
921
922/// Append a `Value` to a `LargeBinaryBuilder` as CypherValue bytes.
923///
924/// Encoded directly via the CypherValue codec so typed values (temporals,
925/// nested lists/maps) round-trip losslessly. Null values produce null entries.
926fn append_value_as_cypher_binary(
927    builder: &mut arrow_array::builder::LargeBinaryBuilder,
928    val: Option<&Value>,
929) {
930    match val {
931        Some(v) if !v.is_null() => {
932            builder.append_value(uni_common::cypher_value_codec::encode(v));
933        }
934        _ => builder.append_null(),
935    }
936}
937
938/// Build the `_all_props` column by overlaying L0 buffer properties onto
939/// the batch's `props_json` column.
940///
941/// For each row, decodes the stored CypherValue blob, merges in any L0 buffer
942/// properties (in visibility order: pending → current → transaction), and
943/// re-encodes the result. This ensures `properties()` and `keys()` reflect
944/// uncommitted L0 mutations.
945fn build_all_props_column_with_l0_overlay(
946    num_rows: usize,
947    vid_arr: &UInt64Array,
948    props_arr: Option<&arrow_array::LargeBinaryArray>,
949    l0_ctx: &crate::query::df_graph::L0Context,
950) -> ArrayRef {
951    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
952    for i in 0..num_rows {
953        let vid = Vid::from(vid_arr.value(i));
954
955        // 1. Decode props_json blob from storage (stays in `Value` space so
956        //    typed values such as temporals are preserved).
957        let mut merged_props: HashMap<String, Value> = HashMap::new();
958        if let Some(arr) = props_arr
959            && !arr.is_null(i)
960            && let Ok(uni_common::Value::Map(map)) =
961                uni_common::cypher_value_codec::decode(arr.value(i))
962        {
963            merged_props.extend(map);
964        }
965
966        // 2. Overlay L0 properties (visibility order: pending → current → transaction)
967        for l0 in l0_ctx.iter_l0_buffers() {
968            let guard = l0.read();
969            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
970                for (k, v) in l0_props {
971                    merged_props.insert(k.clone(), v.clone());
972                }
973            }
974        }
975
976        // 3. Encode merged result directly via the CypherValue codec.
977        if merged_props.is_empty() {
978            builder.append_null();
979        } else {
980            builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
981                merged_props,
982            )));
983        }
984    }
985    Arc::new(builder.finish())
986}
987
988/// Build `_all_props` for a schema-based scan by merging:
989/// 1. Schema-defined columns from the batch
990/// 2. Overflow_json properties
991/// 3. L0 buffer properties
992fn build_all_props_column_for_schema_scan(
993    batch: &RecordBatch,
994    vid_arr: &UInt64Array,
995    overflow_arr: Option<&arrow_array::LargeBinaryArray>,
996    projected_properties: &[String],
997    l0_ctx: &crate::query::df_graph::L0Context,
998) -> ArrayRef {
999    // Collect schema-defined property column names (non-internal, non-overflow, non-_all_props)
1000    let schema_props: Vec<&str> = projected_properties
1001        .iter()
1002        .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
1003        .map(String::as_str)
1004        .collect();
1005
1006    let num_rows = batch.num_rows();
1007    let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1008    for i in 0..num_rows {
1009        let vid = Vid::from(vid_arr.value(i));
1010        // Build the merged map in `Value` space so typed values (temporals,
1011        // nested lists/maps) are preserved through to the CypherValue blob.
1012        let mut merged_props: HashMap<String, Value> = HashMap::new();
1013
1014        // 1. Schema-defined columns
1015        for &prop in &schema_props {
1016            if let Some(col) = batch.column_by_name(prop) {
1017                let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
1018                if !val.is_null() {
1019                    merged_props.insert(prop.to_string(), val);
1020                }
1021            }
1022        }
1023
1024        // 2. Overflow_json properties
1025        if let Some(arr) = overflow_arr
1026            && !arr.is_null(i)
1027            && let Ok(uni_common::Value::Map(map)) =
1028                uni_common::cypher_value_codec::decode(arr.value(i))
1029        {
1030            merged_props.extend(map);
1031        }
1032
1033        // 3. L0 buffer overlay (pending → current → transaction)
1034        for l0 in l0_ctx.iter_l0_buffers() {
1035            let guard = l0.read();
1036            if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1037                for (k, v) in l0_props {
1038                    merged_props.insert(k.clone(), v.clone());
1039                }
1040            }
1041        }
1042
1043        if merged_props.is_empty() {
1044            builder.append_null();
1045        } else {
1046            builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1047                merged_props,
1048            )));
1049        }
1050    }
1051    Arc::new(builder.finish())
1052}
1053
1054/// MVCC deduplication: keep only the highest-version row for each unique value
1055/// in the given `id_column`.
1056///
1057/// Sorts by (id_column ASC, _version DESC), then keeps the first occurrence of
1058/// each id (= the highest version). This is a pure Arrow-compute operation.
1059fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
1060    if batch.num_rows() == 0 {
1061        return Ok(batch.clone());
1062    }
1063
1064    let id_col = batch
1065        .column_by_name(id_column)
1066        .ok_or_else(|| {
1067            datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
1068        })?
1069        .clone();
1070    let version_col = batch
1071        .column_by_name("_version")
1072        .ok_or_else(|| {
1073            datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
1074        })?
1075        .clone();
1076
1077    // Sort by (id_column ASC, _version DESC)
1078    let sort_columns = vec![
1079        arrow::compute::SortColumn {
1080            values: id_col,
1081            options: Some(arrow::compute::SortOptions {
1082                descending: false,
1083                nulls_first: false,
1084            }),
1085        },
1086        arrow::compute::SortColumn {
1087            values: version_col,
1088            options: Some(arrow::compute::SortOptions {
1089                descending: true,
1090                nulls_first: false,
1091            }),
1092        },
1093    ];
1094    let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1095
1096    // Reorder all columns by sorted indices
1097    let sorted_columns: Vec<ArrayRef> = batch
1098        .columns()
1099        .iter()
1100        .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1101        .collect::<Result<_, _>>()
1102        .map_err(arrow_err)?;
1103    let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1104
1105    // Build dedup mask: keep first occurrence of each id
1106    let sorted_id = sorted
1107        .column_by_name(id_column)
1108        .unwrap()
1109        .as_any()
1110        .downcast_ref::<UInt64Array>()
1111        .unwrap();
1112
1113    let mut keep = vec![false; sorted.num_rows()];
1114    if !keep.is_empty() {
1115        keep[0] = true;
1116        for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1117            if sorted_id.value(i) != sorted_id.value(i - 1) {
1118                *flag = true;
1119            }
1120        }
1121    }
1122
1123    let mask = arrow_array::BooleanArray::from(keep);
1124    arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1125}
1126
1127/// Filter out edge rows where `op != 0` (non-INSERT) after MVCC dedup.
1128fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1129    if batch.num_rows() == 0 {
1130        return Ok(batch.clone());
1131    }
1132    let op_col = match batch.column_by_name("op") {
1133        Some(col) => col
1134            .as_any()
1135            .downcast_ref::<arrow_array::UInt8Array>()
1136            .unwrap(),
1137        None => return Ok(batch.clone()),
1138    };
1139    let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1140    let mask = arrow_array::BooleanArray::from(keep);
1141    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1142}
1143
1144/// Filter out rows where `_deleted = true` after MVCC dedup.
1145fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1146    if batch.num_rows() == 0 {
1147        return Ok(batch.clone());
1148    }
1149    let deleted_col = match batch.column_by_name("_deleted") {
1150        Some(col) => col
1151            .as_any()
1152            .downcast_ref::<arrow_array::BooleanArray>()
1153            .unwrap(),
1154        None => return Ok(batch.clone()),
1155    };
1156    let keep: Vec<bool> = (0..deleted_col.len())
1157        .map(|i| !deleted_col.value(i))
1158        .collect();
1159    let mask = arrow_array::BooleanArray::from(keep);
1160    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1161}
1162
1163/// Filter out rows whose `_vid` appears in L0 tombstones.
1164fn filter_l0_tombstones(
1165    batch: &RecordBatch,
1166    l0_ctx: &crate::query::df_graph::L0Context,
1167) -> DFResult<RecordBatch> {
1168    if batch.num_rows() == 0 {
1169        return Ok(batch.clone());
1170    }
1171
1172    let mut tombstones: HashSet<u64> = HashSet::new();
1173    for l0 in l0_ctx.iter_l0_buffers() {
1174        let guard = l0.read();
1175        for vid in guard.vertex_tombstones.iter() {
1176            tombstones.insert(vid.as_u64());
1177        }
1178    }
1179
1180    if tombstones.is_empty() {
1181        return Ok(batch.clone());
1182    }
1183
1184    let vid_col = batch
1185        .column_by_name("_vid")
1186        .ok_or_else(|| {
1187            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1188        })?
1189        .as_any()
1190        .downcast_ref::<UInt64Array>()
1191        .unwrap();
1192
1193    let keep: Vec<bool> = (0..vid_col.len())
1194        .map(|i| !tombstones.contains(&vid_col.value(i)))
1195        .collect();
1196    let mask = arrow_array::BooleanArray::from(keep);
1197    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1198}
1199
1200/// Filter out rows whose `eid` appears in L0 edge tombstones.
1201fn filter_l0_edge_tombstones(
1202    batch: &RecordBatch,
1203    l0_ctx: &crate::query::df_graph::L0Context,
1204) -> DFResult<RecordBatch> {
1205    if batch.num_rows() == 0 {
1206        return Ok(batch.clone());
1207    }
1208
1209    let mut tombstones: HashSet<u64> = HashSet::new();
1210    for l0 in l0_ctx.iter_l0_buffers() {
1211        let guard = l0.read();
1212        for eid in guard.tombstones.keys() {
1213            tombstones.insert(eid.as_u64());
1214        }
1215    }
1216
1217    if tombstones.is_empty() {
1218        return Ok(batch.clone());
1219    }
1220
1221    let eid_col = batch
1222        .column_by_name("eid")
1223        .ok_or_else(|| {
1224            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1225        })?
1226        .as_any()
1227        .downcast_ref::<UInt64Array>()
1228        .unwrap();
1229
1230    let keep: Vec<bool> = (0..eid_col.len())
1231        .map(|i| !tombstones.contains(&eid_col.value(i)))
1232        .collect();
1233    let mask = arrow_array::BooleanArray::from(keep);
1234    arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1235}
1236
1237/// Extract a target VID from a DataFusion physical filter expression.
1238///
1239/// Looks for patterns like `_vid = <uint64_literal>` or `<uint64_literal> = _vid`
1240/// in the top-level expression or as a conjunct of an AND chain. Returns the first
1241/// VID literal found, or `None` if the filter does not contain such a pattern.
1242///
1243/// This also handles `CAST(literal AS UInt64)` which DataFusion may insert when
1244/// the original literal is Int64.
1245fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1246    use datafusion::logical_expr::Operator;
1247    use datafusion::physical_expr::expressions::BinaryExpr;
1248
1249    // Try to match this expression as `_vid = literal`
1250    if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1251        if bin.op() == &Operator::Eq {
1252            // Check both directions: col = lit and lit = col
1253            if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1254                return Some(vid);
1255            }
1256            if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1257                return Some(vid);
1258            }
1259        }
1260        // Recurse into AND conjuncts
1261        if bin.op() == &Operator::And {
1262            if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1263                return Some(vid);
1264            }
1265            return extract_vid_from_physical_filter(bin.right());
1266        }
1267    }
1268    None
1269}
1270
1271/// Try to extract a VID value from a `(column_expr, value_expr)` pair where
1272/// `column_expr` is a `Column` named `_vid` and `value_expr` is a UInt64 or
1273/// non-negative Int64 literal (possibly wrapped in a CAST to UInt64).
1274fn try_extract_vid_eq(
1275    col_side: &Arc<dyn PhysicalExpr>,
1276    val_side: &Arc<dyn PhysicalExpr>,
1277) -> Option<u64> {
1278    use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1279
1280    // Check that col_side is Column("_vid") or Column("variable._vid")
1281    let col = col_side.as_any().downcast_ref::<Column>()?;
1282    if col.name() != "_vid" && !col.name().ends_with("._vid") {
1283        return None;
1284    }
1285
1286    // Try direct literal
1287    if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1288        return scalar_to_u64(lit.value());
1289    }
1290
1291    // Try CAST(literal AS UInt64)
1292    if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1293        && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1294    {
1295        return scalar_to_u64(lit.value());
1296    }
1297
1298    None
1299}
1300
1301/// AND-combine an optional VID filter clause with an optional indexed-property
1302/// filter clause. Either, both, or neither may be present. See issues #55, #57.
1303fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1304    match (vid_filter, extra) {
1305        (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1306        (Some(a), None) => Some(a.to_string()),
1307        (None, Some(b)) => Some(b.to_string()),
1308        (None, None) => None,
1309    }
1310}
1311
1312/// Format a slice of VIDs as a Lance `_vid IN (v1, v2, ...)` filter clause.
1313/// Used by the scan-side IN-list pushdown introduced in issue #55 PR #4.
1314fn format_vid_in_list(vids: &[u64]) -> String {
1315    use std::fmt::Write;
1316    debug_assert!(!vids.is_empty());
1317    let mut s = String::with_capacity(vids.len() * 8 + 12);
1318    s.push_str("_vid IN (");
1319    for (i, v) in vids.iter().enumerate() {
1320        if i > 0 {
1321            s.push(',');
1322        }
1323        let _ = write!(s, "{v}");
1324    }
1325    s.push(')');
1326    s
1327}
1328
1329/// Convert a `ScalarValue` to `u64` if it is a non-negative integer type.
1330fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1331    use datafusion::common::ScalarValue;
1332    match sv {
1333        ScalarValue::UInt64(Some(v)) => Some(*v),
1334        ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1335        ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1336        ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1337        _ => None,
1338    }
1339}
1340
1341/// Build a RecordBatch from L0 buffer data for a given label, matching the
1342/// Lance query's column set.
1343///
1344/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1345/// with later buffers overwriting earlier ones for the same VID.
1346///
1347/// When `target_vids` is `Some`, only those VIDs are collected (direct HashMap
1348/// lookups instead of iterating all VIDs for the label). This must mirror the
1349/// Lance-side VID pushdown — otherwise L0-only (unflushed) rows bypass the
1350/// filter and the scan emits the full label table. See issue #72 item 1.
1351fn build_l0_vertex_batch(
1352    l0_ctx: &crate::query::df_graph::L0Context,
1353    label: &str,
1354    lance_schema: &SchemaRef,
1355    label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1356    target_vids: Option<&[u64]>,
1357) -> DFResult<RecordBatch> {
1358    // Collect all L0 vertex data, merging in visibility order
1359    let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); // vid -> (props, version)
1360    let mut tombstones: HashSet<u64> = HashSet::new();
1361    // System-managed timestamps: created_at takes the earliest seen
1362    // timestamp across L0 buffers (preserving the original creation
1363    // moment when a row has been touched in multiple buffers); updated_at
1364    // takes the latest (most recent write). Used by `created_at(n)` /
1365    // `updated_at(n)` Cypher functions.
1366    let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1367    let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1368
1369    for l0 in l0_ctx.iter_l0_buffers() {
1370        let guard = l0.read();
1371        // Collect tombstones
1372        for vid in guard.vertex_tombstones.iter() {
1373            tombstones.insert(vid.as_u64());
1374        }
1375        // Collect vertices — restrict to target_vids (single- or multi-VID
1376        // pushdown from id(x) = ? / id(x) IN [...]) when set, else all
1377        // vertices for the label. See issue #72 item 1: without this filter,
1378        // freshly-inserted L0 rows bypass the IN-list pushdown that Lance
1379        // already honors, defeating the optimization.
1380        let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1381            let mut out = Vec::with_capacity(tvs.len());
1382            for &tv in tvs {
1383                let vid = Vid::from(tv);
1384                if guard.vertex_properties.contains_key(&vid)
1385                    && (label.is_empty()
1386                        || guard
1387                            .label_to_vids
1388                            .get(label)
1389                            .is_some_and(|s| s.contains(&vid)))
1390                {
1391                    out.push(vid);
1392                }
1393            }
1394            out
1395        } else {
1396            guard.vids_for_label(label)
1397        };
1398        for vid in candidate_vids {
1399            let vid_u64 = vid.as_u64();
1400            if tombstones.contains(&vid_u64) {
1401                continue;
1402            }
1403            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1404            let entry = vid_data
1405                .entry(vid_u64)
1406                .or_insert_with(|| (Properties::new(), 0));
1407            // Merge properties (later L0 overwrites)
1408            if let Some(props) = guard.vertex_properties.get(&vid) {
1409                for (k, v) in props {
1410                    entry.0.insert(k.clone(), v.clone());
1411                }
1412            }
1413            // Take the highest version
1414            if version > entry.1 {
1415                entry.1 = version;
1416            }
1417            // Merge system timestamps: earliest creation, latest update
1418            if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1419                vid_created_at
1420                    .entry(vid_u64)
1421                    .and_modify(|cur| {
1422                        if ts < *cur {
1423                            *cur = ts;
1424                        }
1425                    })
1426                    .or_insert(ts);
1427            }
1428            if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1429                vid_updated_at
1430                    .entry(vid_u64)
1431                    .and_modify(|cur| {
1432                        if ts > *cur {
1433                            *cur = ts;
1434                        }
1435                    })
1436                    .or_insert(ts);
1437            }
1438        }
1439    }
1440
1441    // Remove tombstoned VIDs
1442    for t in &tombstones {
1443        vid_data.remove(t);
1444    }
1445
1446    if vid_data.is_empty() {
1447        return Ok(RecordBatch::new_empty(lance_schema.clone()));
1448    }
1449
1450    // Sort VIDs for deterministic output
1451    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1452    vids.sort_unstable();
1453
1454    let num_rows = vids.len();
1455    let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1456
1457    // Determine which schema property names exist
1458    let schema_prop_names: HashSet<&str> = label_props
1459        .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1460        .unwrap_or_default();
1461
1462    for field in lance_schema.fields() {
1463        let col_name = field.name().as_str();
1464        match col_name {
1465            "_vid" => {
1466                columns.push(Arc::new(UInt64Array::from(vids.clone())));
1467            }
1468            "_deleted" => {
1469                // L0 vertices are always live (tombstoned ones are already excluded)
1470                let vals = vec![false; num_rows];
1471                columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1472            }
1473            "_version" => {
1474                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1475                columns.push(Arc::new(UInt64Array::from(vals)));
1476            }
1477            "_created_at" => {
1478                let mut builder =
1479                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1480                for v in &vids {
1481                    match vid_created_at.get(v) {
1482                        Some(&ts) => builder.append_value(ts),
1483                        None => builder.append_null(),
1484                    }
1485                }
1486                columns.push(Arc::new(builder.finish()));
1487            }
1488            "_updated_at" => {
1489                let mut builder =
1490                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1491                for v in &vids {
1492                    match vid_updated_at.get(v) {
1493                        Some(&ts) => builder.append_value(ts),
1494                        None => builder.append_null(),
1495                    }
1496                }
1497                columns.push(Arc::new(builder.finish()));
1498            }
1499            "overflow_json" => {
1500                // Collect non-schema properties as CypherValue
1501                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1502                for vid_u64 in &vids {
1503                    let (props, _) = &vid_data[vid_u64];
1504                    let mut overflow: HashMap<String, Value> = HashMap::new();
1505                    for (k, v) in props {
1506                        if k == "ext_id" || k.starts_with('_') {
1507                            continue;
1508                        }
1509                        if !schema_prop_names.contains(k.as_str()) {
1510                            overflow.insert(k.clone(), v.clone());
1511                        }
1512                    }
1513                    if overflow.is_empty() {
1514                        builder.append_null();
1515                    } else {
1516                        builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1517                            overflow,
1518                        )));
1519                    }
1520                }
1521                columns.push(Arc::new(builder.finish()));
1522            }
1523            _ => {
1524                // Schema property column: convert L0 Value → Arrow typed value
1525                let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1526                columns.push(col);
1527            }
1528        }
1529    }
1530
1531    RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1532}
1533
1534/// Build a single Arrow column from L0 property values.
1535///
1536/// Operates on the `vid_data` map produced by `build_l0_vertex_batch`.
1537fn build_l0_property_column(
1538    vids: &[u64],
1539    vid_data: &HashMap<u64, (Properties, u64)>,
1540    prop_name: &str,
1541    data_type: &DataType,
1542) -> DFResult<ArrayRef> {
1543    // Convert to Vid keys for reuse of existing build_property_column_static
1544    let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1545    let props_map: HashMap<Vid, Properties> = vid_data
1546        .iter()
1547        .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1548        .collect();
1549
1550    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1551}
1552
1553/// Build a RecordBatch from L0 buffer data for a given edge type, matching
1554/// the DeltaDataset Lance table's column set.
1555///
1556/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
1557/// with later buffers overwriting earlier ones for the same EID.
1558fn build_l0_edge_batch(
1559    l0_ctx: &crate::query::df_graph::L0Context,
1560    edge_type: &str,
1561    internal_schema: &SchemaRef,
1562    type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1563) -> DFResult<RecordBatch> {
1564    // Collect all L0 edge data, merging in visibility order
1565    // eid -> (src_vid, dst_vid, properties, version)
1566    let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1567    let mut tombstones: HashSet<u64> = HashSet::new();
1568    // System-managed timestamps: earliest creation, latest update.
1569    // See `build_l0_vertex_batch` for the rationale.
1570    let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1571    let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1572
1573    for l0 in l0_ctx.iter_l0_buffers() {
1574        let guard = l0.read();
1575        // Collect tombstones
1576        for eid in guard.tombstones.keys() {
1577            tombstones.insert(eid.as_u64());
1578        }
1579        // Collect edges for this type
1580        for eid in guard.eids_for_type(edge_type) {
1581            let eid_u64 = eid.as_u64();
1582            if tombstones.contains(&eid_u64) {
1583                continue;
1584            }
1585            let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1586                Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1587                None => continue,
1588            };
1589            let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1590            let entry = eid_data
1591                .entry(eid_u64)
1592                .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1593            // Merge properties (later L0 overwrites)
1594            if let Some(props) = guard.edge_properties.get(&eid) {
1595                for (k, v) in props {
1596                    entry.2.insert(k.clone(), v.clone());
1597                }
1598            }
1599            // Update endpoints from latest L0 layer
1600            entry.0 = src_vid;
1601            entry.1 = dst_vid;
1602            // Take the highest version
1603            if version > entry.3 {
1604                entry.3 = version;
1605            }
1606            // Merge system timestamps
1607            if let Some(&ts) = guard.edge_created_at.get(&eid) {
1608                eid_created_at
1609                    .entry(eid_u64)
1610                    .and_modify(|cur| {
1611                        if ts < *cur {
1612                            *cur = ts;
1613                        }
1614                    })
1615                    .or_insert(ts);
1616            }
1617            if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1618                eid_updated_at
1619                    .entry(eid_u64)
1620                    .and_modify(|cur| {
1621                        if ts > *cur {
1622                            *cur = ts;
1623                        }
1624                    })
1625                    .or_insert(ts);
1626            }
1627        }
1628    }
1629
1630    // Remove tombstoned EIDs
1631    for t in &tombstones {
1632        eid_data.remove(t);
1633    }
1634
1635    if eid_data.is_empty() {
1636        return Ok(RecordBatch::new_empty(internal_schema.clone()));
1637    }
1638
1639    // Sort EIDs for deterministic output
1640    let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1641    eids.sort_unstable();
1642
1643    let num_rows = eids.len();
1644    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1645
1646    // Determine which schema property names exist
1647    let schema_prop_names: HashSet<&str> = type_props
1648        .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1649        .unwrap_or_default();
1650
1651    for field in internal_schema.fields() {
1652        let col_name = field.name().as_str();
1653        match col_name {
1654            "eid" => {
1655                columns.push(Arc::new(UInt64Array::from(eids.clone())));
1656            }
1657            "src_vid" => {
1658                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1659                columns.push(Arc::new(UInt64Array::from(vals)));
1660            }
1661            "dst_vid" => {
1662                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1663                columns.push(Arc::new(UInt64Array::from(vals)));
1664            }
1665            "op" => {
1666                // L0 edges are always live (tombstoned ones already excluded)
1667                let vals = vec![0u8; num_rows];
1668                columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1669            }
1670            "_version" => {
1671                let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1672                columns.push(Arc::new(UInt64Array::from(vals)));
1673            }
1674            "_created_at" => {
1675                let mut builder =
1676                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1677                for e in &eids {
1678                    match eid_created_at.get(e) {
1679                        Some(&ts) => builder.append_value(ts),
1680                        None => builder.append_null(),
1681                    }
1682                }
1683                columns.push(Arc::new(builder.finish()));
1684            }
1685            "_updated_at" => {
1686                let mut builder =
1687                    arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1688                for e in &eids {
1689                    match eid_updated_at.get(e) {
1690                        Some(&ts) => builder.append_value(ts),
1691                        None => builder.append_null(),
1692                    }
1693                }
1694                columns.push(Arc::new(builder.finish()));
1695            }
1696            "overflow_json" => {
1697                // Collect non-schema properties as CypherValue
1698                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1699                for eid_u64 in &eids {
1700                    let (_, _, props, _) = &eid_data[eid_u64];
1701                    let mut overflow: HashMap<String, Value> = HashMap::new();
1702                    for (k, v) in props {
1703                        if k.starts_with('_') {
1704                            continue;
1705                        }
1706                        if !schema_prop_names.contains(k.as_str()) {
1707                            overflow.insert(k.clone(), v.clone());
1708                        }
1709                    }
1710                    if overflow.is_empty() {
1711                        builder.append_null();
1712                    } else {
1713                        builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1714                            overflow,
1715                        )));
1716                    }
1717                }
1718                columns.push(Arc::new(builder.finish()));
1719            }
1720            _ => {
1721                // Schema property column: convert L0 Value → Arrow typed value
1722                let col =
1723                    build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1724                columns.push(col);
1725            }
1726        }
1727    }
1728
1729    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1730}
1731
1732/// Build a single Arrow column from L0 edge property values.
1733///
1734/// Operates on the `eid_data` map produced by `build_l0_edge_batch`.
1735fn build_l0_edge_property_column(
1736    eids: &[u64],
1737    eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1738    prop_name: &str,
1739    data_type: &DataType,
1740) -> DFResult<ArrayRef> {
1741    // Convert to Vid keys for reuse of existing build_property_column_static
1742    let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1743    let props_map: HashMap<Vid, Properties> = eid_data
1744        .iter()
1745        .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1746        .collect();
1747
1748    build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1749}
1750
1751/// Build the `_labels` column for known-label vertices.
1752///
1753/// Reads `_labels` from the stored Lance batch if available. Falls back to
1754/// `[label]` when the column is absent (legacy data). Additional labels from
1755/// L0 buffers are merged in.
1756fn build_labels_column_for_known_label(
1757    vid_arr: &UInt64Array,
1758    label: &str,
1759    l0_ctx: &crate::query::df_graph::L0Context,
1760    batch_labels_col: Option<&arrow_array::ListArray>,
1761) -> DFResult<ArrayRef> {
1762    use uni_store::storage::arrow_convert::labels_from_list_array;
1763
1764    let mut labels_builder = ListBuilder::new(StringBuilder::new());
1765
1766    for i in 0..vid_arr.len() {
1767        let vid = Vid::from(vid_arr.value(i));
1768
1769        // Start with labels from the stored column, falling back to [label]
1770        let mut labels = match batch_labels_col {
1771            Some(list_arr) => {
1772                let stored = labels_from_list_array(list_arr, i);
1773                if stored.is_empty() {
1774                    vec![label.to_string()]
1775                } else {
1776                    stored
1777                }
1778            }
1779            None => vec![label.to_string()],
1780        };
1781
1782        // Ensure the scanned label is present (defensive)
1783        if !labels.iter().any(|l| l == label) {
1784            labels.push(label.to_string());
1785        }
1786
1787        // Merge additional labels from L0 buffers
1788        for l0 in l0_ctx.iter_l0_buffers() {
1789            let guard = l0.read();
1790            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1791                for lbl in l0_labels {
1792                    if !labels.contains(lbl) {
1793                        labels.push(lbl.clone());
1794                    }
1795                }
1796            }
1797        }
1798
1799        let values = labels_builder.values();
1800        for lbl in &labels {
1801            values.append_value(lbl);
1802        }
1803        labels_builder.append(true);
1804    }
1805
1806    Ok(Arc::new(labels_builder.finish()))
1807}
1808
1809/// Map a Lance-schema batch to the DataFusion output schema.
1810///
1811/// The output schema has `{variable}.{property}` column names, while Lance
1812/// uses bare property names. This function performs the positional mapping,
1813/// adds the `_labels` column, and drops internal columns like `_deleted`/`_version`.
1814fn map_to_output_schema(
1815    batch: &RecordBatch,
1816    label: &str,
1817    _variable: &str,
1818    projected_properties: &[String],
1819    output_schema: &SchemaRef,
1820    l0_ctx: &crate::query::df_graph::L0Context,
1821) -> DFResult<RecordBatch> {
1822    if batch.num_rows() == 0 {
1823        return Ok(RecordBatch::new_empty(output_schema.clone()));
1824    }
1825
1826    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1827
1828    // 1. {var}._vid
1829    let vid_col = batch
1830        .column_by_name("_vid")
1831        .ok_or_else(|| {
1832            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1833        })?
1834        .clone();
1835    let vid_arr = vid_col
1836        .as_any()
1837        .downcast_ref::<UInt64Array>()
1838        .ok_or_else(|| {
1839            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1840        })?;
1841
1842    // 2. {var}._labels — read from stored column, overlay L0 additions
1843    let batch_labels_col = batch
1844        .column_by_name("_labels")
1845        .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1846    let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1847    columns.push(vid_col.clone());
1848    columns.push(labels_col);
1849
1850    // 3. Projected properties
1851    // Pre-load overflow_json column for extracting non-schema properties
1852    let overflow_arr = batch
1853        .column_by_name("overflow_json")
1854        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1855
1856    for prop in projected_properties {
1857        if prop == "overflow_json" {
1858            match batch.column_by_name("overflow_json") {
1859                Some(col) => columns.push(col.clone()),
1860                None => {
1861                    // No overflow_json in Lance — return null column
1862                    columns.push(arrow_array::new_null_array(
1863                        &DataType::LargeBinary,
1864                        batch.num_rows(),
1865                    ));
1866                }
1867            }
1868        } else if prop == "_all_props" {
1869            // Build _all_props from overflow_json + L0 overlay.
1870            // Fast path: if no L0 buffer has vertex property mutations AND
1871            // there are no schema columns to merge, pass through overflow_json.
1872            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1873                let guard = l0.read();
1874                !guard.vertex_properties.is_empty()
1875            });
1876            // Check if this label has schema-defined columns (besides system columns)
1877            let has_schema_cols = projected_properties
1878                .iter()
1879                .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1880
1881            if !any_l0_has_vertex_props && !has_schema_cols {
1882                // No L0 mutations, no schema cols to merge: overflow_json IS _all_props
1883                match batch.column_by_name("overflow_json") {
1884                    Some(col) => columns.push(col.clone()),
1885                    None => {
1886                        columns.push(arrow_array::new_null_array(
1887                            &DataType::LargeBinary,
1888                            batch.num_rows(),
1889                        ));
1890                    }
1891                }
1892            } else {
1893                // Need to merge: schema columns + overflow_json + L0 overlay
1894                let col = build_all_props_column_for_schema_scan(
1895                    batch,
1896                    vid_arr,
1897                    overflow_arr,
1898                    projected_properties,
1899                    l0_ctx,
1900                );
1901                columns.push(col);
1902            }
1903        } else {
1904            match batch.column_by_name(prop) {
1905                Some(col) => columns.push(col.clone()),
1906                None => {
1907                    // Column missing in Lance -- extract from overflow_json
1908                    // CypherValue blob with L0 overlay
1909                    let col = build_overflow_property_column(
1910                        batch.num_rows(),
1911                        vid_arr,
1912                        overflow_arr,
1913                        prop,
1914                        l0_ctx,
1915                    );
1916                    columns.push(col);
1917                }
1918            }
1919        }
1920    }
1921
1922    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1923}
1924
1925/// Map an internal DeltaDataset-schema edge batch to the DataFusion output schema.
1926///
1927/// The internal batch has `eid`, `src_vid`, `dst_vid`, `op`, `_version`, and property
1928/// columns. The output schema has `{variable}._eid`, `{variable}._src_vid`,
1929/// `{variable}._dst_vid`, and per-property columns. Internal columns `op` and
1930/// `_version` are dropped.
1931fn map_edge_to_output_schema(
1932    batch: &RecordBatch,
1933    variable: &str,
1934    projected_properties: &[String],
1935    output_schema: &SchemaRef,
1936) -> DFResult<RecordBatch> {
1937    if batch.num_rows() == 0 {
1938        return Ok(RecordBatch::new_empty(output_schema.clone()));
1939    }
1940
1941    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1942
1943    // 1. {var}._eid
1944    let eid_col = batch
1945        .column_by_name("eid")
1946        .ok_or_else(|| {
1947            datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1948        })?
1949        .clone();
1950    columns.push(eid_col);
1951
1952    // 2. {var}._src_vid
1953    let src_col = batch
1954        .column_by_name("src_vid")
1955        .ok_or_else(|| {
1956            datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1957        })?
1958        .clone();
1959    columns.push(src_col);
1960
1961    // 3. {var}._dst_vid
1962    let dst_col = batch
1963        .column_by_name("dst_vid")
1964        .ok_or_else(|| {
1965            datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1966        })?
1967        .clone();
1968    columns.push(dst_col);
1969
1970    // 4. Projected properties
1971    for prop in projected_properties {
1972        if prop == "overflow_json" {
1973            match batch.column_by_name("overflow_json") {
1974                Some(col) => columns.push(col.clone()),
1975                None => {
1976                    columns.push(arrow_array::new_null_array(
1977                        &DataType::LargeBinary,
1978                        batch.num_rows(),
1979                    ));
1980                }
1981            }
1982        } else {
1983            match batch.column_by_name(prop) {
1984                Some(col) => columns.push(col.clone()),
1985                None => {
1986                    // Column missing in Lance — extract from overflow_json CypherValue blob
1987                    // (mirrors the vertex path in map_to_output_schema)
1988                    let overflow_arr = batch
1989                        .column_by_name("overflow_json")
1990                        .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1991
1992                    if let Some(arr) = overflow_arr {
1993                        let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1994                        for i in 0..batch.num_rows() {
1995                            if !arr.is_null(i) {
1996                                let blob = arr.value(i);
1997                                // Fast path: extract map entry without decoding entire map
1998                                if let Some(sub_bytes) =
1999                                    uni_common::cypher_value_codec::extract_map_entry_raw(
2000                                        blob, prop,
2001                                    )
2002                                {
2003                                    builder.append_value(&sub_bytes);
2004                                } else {
2005                                    builder.append_null();
2006                                }
2007                            } else {
2008                                builder.append_null();
2009                            }
2010                        }
2011                        columns.push(Arc::new(builder.finish()));
2012                    } else {
2013                        // No overflow_json column either — return null column
2014                        let target_field = output_schema
2015                            .fields()
2016                            .iter()
2017                            .find(|f| f.name() == &format!("{}.{}", variable, prop));
2018                        let dt = target_field
2019                            .map(|f| f.data_type().clone())
2020                            .unwrap_or(DataType::LargeBinary);
2021                        columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
2022                    }
2023                }
2024            }
2025        }
2026    }
2027
2028    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2029}
2030
2031/// Columnar-first vertex scan: single Lance query with MVCC dedup and L0 overlay.
2032///
2033/// Replaces the two-phase `scan_vertex_vids_static()` + `materialize_vertex_batch_static()`
2034/// for known-label vertex scans. Reads all needed columns in a single Lance query,
2035/// performs MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones,
2036/// and maps to the output schema.
2037#[expect(clippy::too_many_arguments)]
2038async fn columnar_scan_vertex_batch_static(
2039    graph_ctx: &GraphExecutionContext,
2040    label: &str,
2041    variable: &str,
2042    projected_properties: &[String],
2043    output_schema: &SchemaRef,
2044    filter: &Option<Arc<dyn PhysicalExpr>>,
2045    vid_list_filter: Option<&[u64]>,
2046    extra_lance_filter: Option<&str>,
2047    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2048) -> DFResult<RecordBatch> {
2049    let storage = graph_ctx.storage();
2050    let l0_ctx = graph_ctx.l0_context();
2051    let uni_schema = storage.schema_manager().schema();
2052    let label_props = uni_schema.properties.get(label);
2053
2054    // Extract target VID from filter for short-circuit lookup. Single-VID
2055    // pushdown is from the WHERE-clause `id(x) = $literal` path; multi-VID
2056    // pushdown is from the IN-list path (`UNWIND ... WHERE id(x) = e.field`,
2057    // see issue #55 PR #4).
2058    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2059
2060    // Build the list of columns to request from Lance
2061    let mut lance_columns: Vec<String> = vec![
2062        "_vid".to_string(),
2063        "_deleted".to_string(),
2064        "_version".to_string(),
2065    ];
2066    for prop in projected_properties {
2067        if prop == "overflow_json" {
2068            push_column_if_absent(&mut lance_columns, "overflow_json");
2069        } else if prop == "_created_at" || prop == "_updated_at" {
2070            // System-managed timestamps live on every vertex table regardless
2071            // of label schema. Request them directly from Lance.
2072            push_column_if_absent(&mut lance_columns, prop);
2073        } else {
2074            let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2075            if exists_in_schema {
2076                push_column_if_absent(&mut lance_columns, prop);
2077            }
2078        }
2079    }
2080
2081    // Ensure overflow_json is present when any projected property is not in the schema
2082    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2083    let needs_overflow = projected_properties.iter().any(|p| {
2084        p == "overflow_json"
2085            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2086                && !label_props.is_some_and(|lp| lp.contains_key(p)))
2087    });
2088    if needs_overflow {
2089        push_column_if_absent(&mut lance_columns, "overflow_json");
2090    }
2091
2092    // Push _vid filter to Lance for O(log N) BTree index lookup instead of full scan.
2093    // Prefer the multi-VID list (formats as `_vid IN (...)`); fall back to
2094    // single-VID `_vid = N` from the WHERE-clause path. AND-combined with
2095    // any indexed-property pushdown (issue #57).
2096    let vid_part = match (vid_list_filter, target_vid) {
2097        (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2098        (_, Some(v)) => Some(format!("_vid = {v}")),
2099        _ => None,
2100    };
2101    let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2102    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2103
2104    // M5h.2: route through plugin Storage if one is registered for
2105    // this label. v1 ships reads only — writes still go to native
2106    // backend. v1 ignores `combined_filter` when delegating (the
2107    // planner re-filters via the surrounding Filter node); per-plugin
2108    // filter pushdown is a v1.1 follow-up (`TODO(M5h.2-filter)`).
2109    let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2110        Some(reg) => match reg.lookup_label_storage(label) {
2111            Some(plugin_storage) => {
2112                let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2113                    datafusion::error::DataFusionError::Execution(format!(
2114                        "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2115                        e.message, e.code
2116                    ))
2117                })?;
2118                use futures::StreamExt;
2119                let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2120                let mut schema_ref: Option<SchemaRef> = None;
2121                while let Some(b) = stream.next().await {
2122                    let b = b.map_err(|e| {
2123                        datafusion::error::DataFusionError::Execution(format!(
2124                            "plugin Storage stream({label}) errored: {e}"
2125                        ))
2126                    })?;
2127                    if schema_ref.is_none() {
2128                        schema_ref = Some(b.schema());
2129                    }
2130                    batches.push(b);
2131                }
2132                if let Some(s) = schema_ref {
2133                    Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2134                        datafusion::error::DataFusionError::Execution(format!(
2135                            "plugin Storage concat({label}) failed: {e}"
2136                        ))
2137                    })?)
2138                } else {
2139                    None
2140                }
2141            }
2142            None => None,
2143        },
2144        None => None,
2145    };
2146
2147    // Track whether the batch came through the property-filtered native scan:
2148    // plugin batches ignore `combined_filter` (re-filtered by the planner), so
2149    // they need no stale-version verification.
2150    let (lance_batch, pushdown_filtered) = match plugin_batch {
2151        Some(b) => (Some(b), false),
2152        None => (
2153            storage
2154                .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2155                .await
2156                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2157            extra_lance_filter.is_some(),
2158        ),
2159    };
2160
2161    // A pushed property predicate hides a vid's CURRENT row from the scan when
2162    // that row no longer matches (MVCC-append: the stale still-matching row
2163    // would win the dedup by default) — drop superseded rows first.
2164    let lance_batch = match (lance_batch, pushdown_filtered) {
2165        (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, Some(label), b).await?),
2166        (b, _) => b,
2167    };
2168
2169    // MVCC dedup the Lance batch
2170    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2171
2172    // Build the internal Lance schema for L0 batch construction.
2173    // Use the Lance batch schema if available, otherwise build from scratch.
2174    let internal_schema = match &lance_deduped {
2175        Some(batch) => batch.schema(),
2176        None => {
2177            let mut fields = vec![
2178                Field::new("_vid", DataType::UInt64, false),
2179                Field::new("_deleted", DataType::Boolean, false),
2180                Field::new("_version", DataType::UInt64, false),
2181            ];
2182            for col in &lance_columns {
2183                if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2184                    continue;
2185                }
2186                if col == "overflow_json" {
2187                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2188                } else if col == "_created_at" || col == "_updated_at" {
2189                    fields.push(Field::new(
2190                        col,
2191                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2192                        true,
2193                    ));
2194                } else {
2195                    let arrow_type = label_props
2196                        .and_then(|lp| lp.get(col.as_str()))
2197                        .map(|meta| meta.r#type.to_arrow())
2198                        .unwrap_or(DataType::LargeBinary);
2199                    fields.push(Field::new(col, arrow_type, true));
2200                }
2201            }
2202            Arc::new(Schema::new(fields))
2203        }
2204    };
2205
2206    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2207    // from issue #55 PR #4 — must restrict L0 to the same VID set Lance was
2208    // filtered against, see issue #72 item 1). Fall back to single-VID
2209    // (`id(x) = $literal` short-circuit). One-element buffer keeps the
2210    // borrowed slice alive for the single-VID case.
2211    let single_vid_buf: [u64; 1];
2212    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2213        (Some(vs), _) if !vs.is_empty() => Some(vs),
2214        (_, Some(v)) => {
2215            single_vid_buf = [v];
2216            Some(&single_vid_buf)
2217        }
2218        _ => None,
2219    };
2220    let l0_batch =
2221        build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2222
2223    // Merge Lance + L0
2224    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2225    else {
2226        return Ok(RecordBatch::new_empty(output_schema.clone()));
2227    };
2228
2229    // Filter out MVCC deletion tombstones (_deleted = true)
2230    let merged = filter_deleted_rows(&merged)?;
2231    if merged.num_rows() == 0 {
2232        return Ok(RecordBatch::new_empty(output_schema.clone()));
2233    }
2234
2235    // Filter L0 tombstones
2236    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2237
2238    if filtered.num_rows() == 0 {
2239        return Ok(RecordBatch::new_empty(output_schema.clone()));
2240    }
2241
2242    // Map to output schema
2243    let mapped = map_to_output_schema(
2244        &filtered,
2245        label,
2246        variable,
2247        projected_properties,
2248        output_schema,
2249        l0_ctx,
2250    )?;
2251
2252    // Apply indexed-property runtime filter (issue #57). Lance has already
2253    // filtered the on-disk side via `extra_lance_filter`; this catches any
2254    // L0 rows that slipped through the merge.
2255    apply_runtime_filter(mapped, extra_runtime_filter)
2256}
2257
2258/// Apply the indexed-property runtime filter, if present, to a `RecordBatch`.
2259/// Returns the filtered batch (or the original if no filter is set). Rows
2260/// where the predicate evaluates to NULL are treated as non-matching, same
2261/// as DataFusion `FilterExec`. See issue #57.
2262fn apply_runtime_filter(
2263    batch: RecordBatch,
2264    runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2265) -> DFResult<RecordBatch> {
2266    let Some(filter) = runtime_filter else {
2267        return Ok(batch);
2268    };
2269    if batch.num_rows() == 0 {
2270        return Ok(batch);
2271    }
2272    let result = filter.evaluate(&batch)?;
2273    let array = result.into_array(batch.num_rows())?;
2274    let bools = array
2275        .as_any()
2276        .downcast_ref::<arrow_array::BooleanArray>()
2277        .ok_or_else(|| {
2278            datafusion::error::DataFusionError::Internal(
2279                "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2280            )
2281        })?;
2282    arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2283}
2284
2285/// Columnar-first edge scan: single Lance query with MVCC dedup and L0 overlay.
2286///
2287/// Replaces the two-phase `scan_edge_eids_static()` + `materialize_edge_batch_static()`
2288/// for edge scans. Reads all needed columns in a single DeltaDataset query, performs
2289/// MVCC dedup via Arrow compute, merges L0 buffer data, filters tombstones, and maps
2290/// to the output schema.
2291async fn columnar_scan_edge_batch_static(
2292    graph_ctx: &GraphExecutionContext,
2293    edge_type: &str,
2294    variable: &str,
2295    projected_properties: &[String],
2296    output_schema: &SchemaRef,
2297) -> DFResult<RecordBatch> {
2298    let storage = graph_ctx.storage();
2299    let l0_ctx = graph_ctx.l0_context();
2300    let uni_schema = storage.schema_manager().schema();
2301    let type_props = uni_schema.properties.get(edge_type);
2302
2303    // Build the list of columns to request from DeltaDataset Lance table
2304    let mut lance_columns: Vec<String> = vec![
2305        "eid".to_string(),
2306        "src_vid".to_string(),
2307        "dst_vid".to_string(),
2308        "op".to_string(),
2309        "_version".to_string(),
2310    ];
2311    for prop in projected_properties {
2312        if prop == "overflow_json" {
2313            push_column_if_absent(&mut lance_columns, "overflow_json");
2314        } else if prop == "_created_at" || prop == "_updated_at" {
2315            // System-managed timestamps live on every edge table.
2316            push_column_if_absent(&mut lance_columns, prop);
2317        } else {
2318            let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2319            if exists_in_schema {
2320                push_column_if_absent(&mut lance_columns, prop);
2321            }
2322        }
2323    }
2324
2325    // Ensure overflow_json is present when any projected property is not in the schema
2326    // (excluding system-managed columns like `_created_at` / `_updated_at`).
2327    let needs_overflow = projected_properties.iter().any(|p| {
2328        p == "overflow_json"
2329            || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2330                && !type_props.is_some_and(|tp| tp.contains_key(p)))
2331    });
2332    if needs_overflow {
2333        push_column_if_absent(&mut lance_columns, "overflow_json");
2334    }
2335
2336    // Try to query DeltaDataset (forward direction) via StorageManager domain method
2337    let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2338    let lance_batch = storage
2339        .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2340        .await
2341        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2342
2343    // MVCC dedup the Lance batch (by eid)
2344    let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2345
2346    // Build the internal schema for L0 batch construction.
2347    // Use the Lance batch schema if available, otherwise build from scratch.
2348    let internal_schema = match &lance_deduped {
2349        Some(batch) => batch.schema(),
2350        None => {
2351            let mut fields = vec![
2352                Field::new("eid", DataType::UInt64, false),
2353                Field::new("src_vid", DataType::UInt64, false),
2354                Field::new("dst_vid", DataType::UInt64, false),
2355                Field::new("op", DataType::UInt8, false),
2356                Field::new("_version", DataType::UInt64, false),
2357            ];
2358            for col in &lance_columns {
2359                if matches!(
2360                    col.as_str(),
2361                    "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2362                ) {
2363                    continue;
2364                }
2365                if col == "overflow_json" {
2366                    fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2367                } else if col == "_created_at" || col == "_updated_at" {
2368                    fields.push(Field::new(
2369                        col,
2370                        DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2371                        true,
2372                    ));
2373                } else {
2374                    let arrow_type = type_props
2375                        .and_then(|tp| tp.get(col.as_str()))
2376                        .map(|meta| meta.r#type.to_arrow())
2377                        .unwrap_or(DataType::LargeBinary);
2378                    fields.push(Field::new(col, arrow_type, true));
2379                }
2380            }
2381            Arc::new(Schema::new(fields))
2382        }
2383    };
2384
2385    // Build L0 batch
2386    let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2387
2388    // Merge Lance + L0
2389    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2390        return Ok(RecordBatch::new_empty(output_schema.clone()));
2391    };
2392
2393    // Filter out MVCC deletion ops (op != 0) after dedup
2394    let merged = filter_deleted_edge_ops(&merged)?;
2395    if merged.num_rows() == 0 {
2396        return Ok(RecordBatch::new_empty(output_schema.clone()));
2397    }
2398
2399    // Filter L0 edge tombstones
2400    let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2401
2402    if filtered.num_rows() == 0 {
2403        return Ok(RecordBatch::new_empty(output_schema.clone()));
2404    }
2405
2406    // Map to output schema
2407    map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2408}
2409
2410/// Columnar-first schemaless vertex scan: single Lance query with MVCC dedup and L0 overlay.
2411///
2412/// Replaces the two-phase `scan_*_vids_*()` + `materialize_schemaless_vertex_batch_static()`
2413/// for schemaless vertex scans. Reads `_vid`, `labels`, `props_json`, `_version` in a single
2414/// Lance query on the main vertices table, performs MVCC dedup via Arrow compute, merges L0
2415/// buffer data, filters tombstones, and maps to the output schema.
2416#[expect(clippy::too_many_arguments)]
2417async fn columnar_scan_schemaless_vertex_batch_static(
2418    graph_ctx: &GraphExecutionContext,
2419    label: &str,
2420    variable: &str,
2421    projected_properties: &[String],
2422    output_schema: &SchemaRef,
2423    filter: &Option<Arc<dyn PhysicalExpr>>,
2424    vid_list_filter: Option<&[u64]>,
2425    extra_lance_filter: Option<&str>,
2426    extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2427) -> DFResult<RecordBatch> {
2428    let storage = graph_ctx.storage();
2429    let l0_ctx = graph_ctx.l0_context();
2430
2431    // Extract target VID from filter for short-circuit lookup. See the
2432    // detailed comment on the per-label scan for the IN-list path
2433    // (issue #55 PR #4).
2434    let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2435
2436    // Build the Lance filter expression — do NOT filter _deleted here;
2437    // MVCC dedup must see deletion tombstones to pick the highest version.
2438    let filter = {
2439        let mut parts = Vec::new();
2440
2441        // VID point-lookup filter — uses BTree index on _vid. Prefer the
2442        // multi-VID list (formats as `_vid IN (...)`); fall back to single-VID.
2443        match (vid_list_filter, target_vid) {
2444            (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2445            (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2446            _ => {}
2447        }
2448
2449        // Label filter
2450        if !label.is_empty() {
2451            if label.contains(':') {
2452                // Multi-label: each label must be present
2453                for lbl in label.split(':') {
2454                    parts.push(format!("array_contains(labels, '{}')", lbl));
2455                }
2456            } else {
2457                parts.push(format!("array_contains(labels, '{}')", label));
2458            }
2459        }
2460
2461        // Indexed-property pushdown — issue #57.
2462        if let Some(extra) = extra_lance_filter {
2463            parts.push(extra.to_string());
2464        }
2465
2466        if parts.is_empty() {
2467            None
2468        } else {
2469            Some(parts.join(" AND "))
2470        }
2471    };
2472
2473    // Single Lance query via StorageManager domain method
2474    let lance_batch = storage
2475        .scan_main_vertex_table(
2476            &["_vid", "_deleted", "labels", "props_json", "_version"],
2477            filter.as_deref(),
2478        )
2479        .await
2480        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2481
2482    // A pushed property predicate hides a vid's CURRENT row from the scan when
2483    // that row no longer matches (MVCC-append: the stale still-matching row
2484    // would win the dedup by default) — drop superseded rows first.
2485    let lance_batch = match (lance_batch, extra_lance_filter.is_some()) {
2486        (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, None, b).await?),
2487        (b, _) => b,
2488    };
2489
2490    // MVCC dedup the Lance batch
2491    let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2492
2493    // Build the internal schema for L0 batch construction.
2494    // Use the Lance batch schema if available, otherwise build from scratch.
2495    let internal_schema = match &lance_deduped {
2496        Some(batch) => batch.schema(),
2497        None => Arc::new(Schema::new(vec![
2498            Field::new("_vid", DataType::UInt64, false),
2499            Field::new("_deleted", DataType::Boolean, false),
2500            Field::new("labels", labels_data_type(), false),
2501            Field::new("props_json", DataType::LargeBinary, true),
2502            Field::new("_version", DataType::UInt64, false),
2503        ])),
2504    };
2505
2506    // Build L0 batch. Prefer the multi-VID list when present (IN-list pushdown
2507    // from issue #55 PR #4 — must restrict L0 to match Lance filtering, see
2508    // issue #72 item 1). Fall back to single-VID.
2509    let single_vid_buf: [u64; 1];
2510    let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2511        (Some(vs), _) if !vs.is_empty() => Some(vs),
2512        (_, Some(v)) => {
2513            single_vid_buf = [v];
2514            Some(&single_vid_buf)
2515        }
2516        _ => None,
2517    };
2518    let l0_batch =
2519        build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2520
2521    // Merge Lance + L0
2522    let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2523    else {
2524        return Ok(RecordBatch::new_empty(output_schema.clone()));
2525    };
2526
2527    // Filter out MVCC deletion tombstones (_deleted = true)
2528    let merged = filter_deleted_rows(&merged)?;
2529    if merged.num_rows() == 0 {
2530        return Ok(RecordBatch::new_empty(output_schema.clone()));
2531    }
2532
2533    // Filter L0 tombstones
2534    let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2535
2536    if filtered.num_rows() == 0 {
2537        return Ok(RecordBatch::new_empty(output_schema.clone()));
2538    }
2539
2540    // Map to output schema
2541    let mapped = map_to_schemaless_output_schema(
2542        &filtered,
2543        variable,
2544        projected_properties,
2545        output_schema,
2546        l0_ctx,
2547    )?;
2548
2549    // Apply indexed-property runtime filter — issue #57.
2550    apply_runtime_filter(mapped, extra_runtime_filter)
2551}
2552
2553/// Build a RecordBatch from L0 buffer data for schemaless vertices.
2554///
2555/// Merges L0 buffers in visibility order (pending_flush → current → transaction),
2556/// with later buffers overwriting earlier ones for the same VID. Produces a batch
2557/// matching the internal schema: `_vid, labels, props_json, _version`.
2558fn build_l0_schemaless_vertex_batch(
2559    l0_ctx: &crate::query::df_graph::L0Context,
2560    label: &str,
2561    internal_schema: &SchemaRef,
2562    target_vids: Option<&[u64]>,
2563) -> DFResult<RecordBatch> {
2564    // Collect all L0 vertex data, merging in visibility order
2565    // vid -> (merged_props, highest_version, labels)
2566    let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2567    let mut tombstones: HashSet<u64> = HashSet::new();
2568
2569    // Parse multi-label filter
2570    let label_filter: Vec<&str> = if label.is_empty() {
2571        vec![]
2572    } else if label.contains(':') {
2573        label.split(':').collect()
2574    } else {
2575        vec![label]
2576    };
2577
2578    for l0 in l0_ctx.iter_l0_buffers() {
2579        let guard = l0.read();
2580
2581        // Collect tombstones
2582        for vid in guard.vertex_tombstones.iter() {
2583            tombstones.insert(vid.as_u64());
2584        }
2585
2586        // Collect VIDs matching the label filter — short-circuit when target_vids is set
2587        // (see issue #72 item 1; multi-VID IN-list must filter L0 too).
2588        let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2589            let mut out = Vec::with_capacity(tvs.len());
2590            for &tv in tvs {
2591                let vid = Vid::from(tv);
2592                if !guard.vertex_properties.contains_key(&vid) {
2593                    continue;
2594                }
2595                let label_ok = if label_filter.is_empty() {
2596                    true
2597                } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2598                    label_filter
2599                        .iter()
2600                        .all(|lf| labels.contains(&lf.to_string()))
2601                } else {
2602                    false
2603                };
2604                if label_ok {
2605                    out.push(vid);
2606                }
2607            }
2608            out
2609        } else if label_filter.is_empty() {
2610            guard.all_vertex_vids()
2611        } else if label_filter.len() == 1 {
2612            guard.vids_for_label(label_filter[0])
2613        } else {
2614            guard.vids_with_all_labels(&label_filter)
2615        };
2616
2617        for vid in vids {
2618            let vid_u64 = vid.as_u64();
2619            if tombstones.contains(&vid_u64) {
2620                continue;
2621            }
2622            let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2623            let entry = vid_data
2624                .entry(vid_u64)
2625                .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2626
2627            // Merge properties (later L0 overwrites)
2628            if let Some(props) = guard.vertex_properties.get(&vid) {
2629                for (k, v) in props {
2630                    entry.0.insert(k.clone(), v.clone());
2631                }
2632            }
2633            // Take the highest version
2634            if version > entry.1 {
2635                entry.1 = version;
2636            }
2637            // Update labels from latest L0 layer
2638            if let Some(labels) = guard.vertex_labels.get(&vid) {
2639                entry.2 = labels.clone();
2640            }
2641        }
2642    }
2643
2644    // Remove tombstoned VIDs
2645    for t in &tombstones {
2646        vid_data.remove(t);
2647    }
2648
2649    if vid_data.is_empty() {
2650        return Ok(RecordBatch::new_empty(internal_schema.clone()));
2651    }
2652
2653    // Sort VIDs for deterministic output
2654    let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2655    vids.sort_unstable();
2656
2657    let num_rows = vids.len();
2658    let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2659
2660    for field in internal_schema.fields() {
2661        match field.name().as_str() {
2662            "_vid" => {
2663                columns.push(Arc::new(UInt64Array::from(vids.clone())));
2664            }
2665            "labels" => {
2666                let mut labels_builder = ListBuilder::new(StringBuilder::new());
2667                for vid_u64 in &vids {
2668                    let (_, _, labels) = &vid_data[vid_u64];
2669                    let values = labels_builder.values();
2670                    for lbl in labels {
2671                        values.append_value(lbl);
2672                    }
2673                    labels_builder.append(true);
2674                }
2675                columns.push(Arc::new(labels_builder.finish()));
2676            }
2677            "props_json" => {
2678                let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2679                for vid_u64 in &vids {
2680                    let (props, _, _) = &vid_data[vid_u64];
2681                    if props.is_empty() {
2682                        builder.append_null();
2683                    } else {
2684                        // Encode properties as a CypherValue blob directly from
2685                        // `Value` so typed values (temporals) are preserved.
2686                        let map: HashMap<String, Value> =
2687                            props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2688                        builder
2689                            .append_value(uni_common::cypher_value_codec::encode(&Value::Map(map)));
2690                    }
2691                }
2692                columns.push(Arc::new(builder.finish()));
2693            }
2694            "_deleted" => {
2695                // L0 vertices are always live (tombstoned ones already excluded)
2696                columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2697                    false;
2698                    num_rows
2699                ])));
2700            }
2701            "_version" => {
2702                let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2703                columns.push(Arc::new(UInt64Array::from(vals)));
2704            }
2705            _ => {
2706                // Unexpected column — fill with nulls
2707                columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2708            }
2709        }
2710    }
2711
2712    RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2713}
2714
2715/// Map an internal-schema schemaless batch to the DataFusion output schema.
2716///
2717/// The internal batch has `_vid, labels, props_json, _version` columns. The output
2718/// schema has `{variable}._vid`, `{variable}._labels`, and per-property columns.
2719/// Individual properties are extracted from the `props_json` CypherValue blob by
2720/// decoding to a Map and extracting the sub-value.
2721fn map_to_schemaless_output_schema(
2722    batch: &RecordBatch,
2723    _variable: &str,
2724    projected_properties: &[String],
2725    output_schema: &SchemaRef,
2726    l0_ctx: &crate::query::df_graph::L0Context,
2727) -> DFResult<RecordBatch> {
2728    if batch.num_rows() == 0 {
2729        return Ok(RecordBatch::new_empty(output_schema.clone()));
2730    }
2731
2732    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2733
2734    // 1. {var}._vid — passthrough
2735    let vid_col = batch
2736        .column_by_name("_vid")
2737        .ok_or_else(|| {
2738            datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2739        })?
2740        .clone();
2741    let vid_arr = vid_col
2742        .as_any()
2743        .downcast_ref::<UInt64Array>()
2744        .ok_or_else(|| {
2745            datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2746        })?;
2747    columns.push(vid_col.clone());
2748
2749    // 2. {var}._labels — from labels column with L0 overlay
2750    let labels_col = batch.column_by_name("labels");
2751    let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2752
2753    let mut labels_builder = ListBuilder::new(StringBuilder::new());
2754    for i in 0..vid_arr.len() {
2755        let vid_u64 = vid_arr.value(i);
2756        let vid = Vid::from(vid_u64);
2757
2758        // Start with labels from the batch
2759        let mut row_labels: Vec<String> = Vec::new();
2760        if let Some(arr) = labels_arr
2761            && !arr.is_null(i)
2762        {
2763            let list_val = arr.value(i);
2764            if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2765                for j in 0..str_arr.len() {
2766                    if !str_arr.is_null(j) {
2767                        row_labels.push(str_arr.value(j).to_string());
2768                    }
2769                }
2770            }
2771        }
2772
2773        // Overlay L0 labels
2774        for l0 in l0_ctx.iter_l0_buffers() {
2775            let guard = l0.read();
2776            if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2777                for lbl in l0_labels {
2778                    if !row_labels.contains(lbl) {
2779                        row_labels.push(lbl.clone());
2780                    }
2781                }
2782            }
2783        }
2784
2785        let values = labels_builder.values();
2786        for lbl in &row_labels {
2787            values.append_value(lbl);
2788        }
2789        labels_builder.append(true);
2790    }
2791    columns.push(Arc::new(labels_builder.finish()));
2792
2793    // 3. Projected properties — extract from props_json
2794    let props_col = batch.column_by_name("props_json");
2795    let props_arr =
2796        props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2797
2798    for prop in projected_properties {
2799        if prop == "_all_props" {
2800            // Fast path: if no L0 buffer has vertex property mutations,
2801            // the raw props_json passthrough is correct.
2802            let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2803                let guard = l0.read();
2804                !guard.vertex_properties.is_empty()
2805            });
2806            if !any_l0_has_vertex_props {
2807                match props_col {
2808                    Some(col) => columns.push(col.clone()),
2809                    None => {
2810                        columns.push(arrow_array::new_null_array(
2811                            &DataType::LargeBinary,
2812                            batch.num_rows(),
2813                        ));
2814                    }
2815                }
2816            } else {
2817                let col = build_all_props_column_with_l0_overlay(
2818                    batch.num_rows(),
2819                    vid_arr,
2820                    props_arr,
2821                    l0_ctx,
2822                );
2823                columns.push(col);
2824            }
2825        } else {
2826            // Extract individual property from CypherValue blob with L0 overlay.
2827            // The raw column is LargeBinary (CypherValue-encoded). If the output
2828            // schema expects a typed column (e.g., Utf8 for String properties),
2829            // decode the CypherValue and build the correct Arrow type.
2830            let expected_type = output_schema
2831                .field_with_name(&format!("{_variable}.{prop}"))
2832                .map(|f| f.data_type().clone())
2833                .unwrap_or(DataType::LargeBinary);
2834
2835            if expected_type == DataType::LargeBinary {
2836                let col = build_overflow_property_column(
2837                    batch.num_rows(),
2838                    vid_arr,
2839                    props_arr,
2840                    prop,
2841                    l0_ctx,
2842                );
2843                columns.push(col);
2844            } else {
2845                // Decode CypherValue to the expected type via build_property_column_static.
2846                let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2847                for i in 0..batch.num_rows() {
2848                    let vid = Vid::from(vid_arr.value(i));
2849                    let resolved =
2850                        resolve_l0_property(&vid, prop, l0_ctx)
2851                            .flatten()
2852                            .or_else(|| {
2853                                extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2854                                    uni_common::cypher_value_codec::decode(&bytes).ok()
2855                                })
2856                            });
2857                    if let Some(val) = resolved {
2858                        prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2859                    }
2860                }
2861                let vids: Vec<Vid> = (0..batch.num_rows())
2862                    .map(|i| Vid::from(vid_arr.value(i)))
2863                    .collect();
2864                let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2865                    .unwrap_or_else(|_| {
2866                        arrow_array::new_null_array(&expected_type, batch.num_rows())
2867                    });
2868                columns.push(col);
2869            }
2870        }
2871    }
2872
2873    RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2874}
2875
2876/// Get the property value for a VID, returning None if not found.
2877pub(crate) fn get_property_value(
2878    vid: &Vid,
2879    props_map: &HashMap<Vid, Properties>,
2880    prop_name: &str,
2881) -> Option<Value> {
2882    if prop_name == "_all_props" {
2883        return props_map.get(vid).map(|p| {
2884            let map: HashMap<String, Value> =
2885                p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2886            Value::Map(map)
2887        });
2888    }
2889    props_map
2890        .get(vid)
2891        .and_then(|props| props.get(prop_name))
2892        .cloned()
2893}
2894
2895/// Build a numeric column from property values using the specified builder and extractor.
2896macro_rules! build_numeric_column {
2897    ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2898        let mut builder = <$builder_ty>::new();
2899        for vid in $vids {
2900            match get_property_value(vid, $props_map, $prop_name) {
2901                Some(ref v) => {
2902                    if let Some(val) = $extractor(v) {
2903                        builder.append_value($cast(val));
2904                    } else {
2905                        builder.append_null();
2906                    }
2907                }
2908                None => builder.append_null(),
2909            }
2910        }
2911        Ok(Arc::new(builder.finish()) as ArrayRef)
2912    }};
2913}
2914
2915/// Build an Arrow column from property values (static version).
2916pub(crate) fn build_property_column_static(
2917    vids: &[Vid],
2918    props_map: &HashMap<Vid, Properties>,
2919    prop_name: &str,
2920    data_type: &DataType,
2921) -> DFResult<ArrayRef> {
2922    match data_type {
2923        DataType::LargeBinary => {
2924            // Handle CypherValue binary columns (overflow_json and Json-typed properties).
2925            use arrow_array::builder::LargeBinaryBuilder;
2926            let mut builder = LargeBinaryBuilder::new();
2927
2928            for vid in vids {
2929                match get_property_value(vid, props_map, prop_name) {
2930                    Some(Value::Null) | None => builder.append_null(),
2931                    Some(Value::Bytes(bytes)) => {
2932                        builder.append_value(&bytes);
2933                    }
2934                    Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2935                        // Potential raw CypherValue bytes stored as list<u8> from PropertyManager.
2936                        // Guard against misclassifying normal integer lists (e.g. [42, 43]) as bytes.
2937                        let bytes: Vec<u8> = arr
2938                            .iter()
2939                            .filter_map(|v| v.as_u64().map(|n| n as u8))
2940                            .collect();
2941                        if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2942                            builder.append_value(&bytes);
2943                        } else {
2944                            builder.append_value(uni_common::cypher_value_codec::encode(
2945                                &Value::List(arr),
2946                            ));
2947                        }
2948                    }
2949                    Some(val) => {
2950                        // Encode any other property value directly via the
2951                        // CypherValue codec so typed values (temporals, including
2952                        // BTIC, and nested lists/maps) round-trip losslessly.
2953                        builder.append_value(uni_common::cypher_value_codec::encode(&val));
2954                    }
2955                }
2956            }
2957            Ok(Arc::new(builder.finish()))
2958        }
2959        DataType::Binary => {
2960            // CRDT binary properties: JSON-decoded CRDTs re-encoded to MessagePack
2961            let mut builder = BinaryBuilder::new();
2962            for vid in vids {
2963                let bytes = get_property_value(vid, props_map, prop_name)
2964                    .filter(|v| !v.is_null())
2965                    .and_then(|v| {
2966                        let json_val: serde_json::Value = v.into();
2967                        serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2968                    })
2969                    .and_then(|crdt| crdt.to_msgpack().ok());
2970                match bytes {
2971                    Some(b) => builder.append_value(&b),
2972                    None => builder.append_null(),
2973                }
2974            }
2975            Ok(Arc::new(builder.finish()))
2976        }
2977        DataType::Utf8 => {
2978            let mut builder = StringBuilder::new();
2979            for vid in vids {
2980                match get_property_value(vid, props_map, prop_name) {
2981                    Some(Value::String(s)) => builder.append_value(s),
2982                    Some(Value::Null) | None => builder.append_null(),
2983                    Some(other) => builder.append_value(other.to_string()),
2984                }
2985            }
2986            Ok(Arc::new(builder.finish()))
2987        }
2988        DataType::Int64 => {
2989            build_numeric_column!(
2990                vids,
2991                props_map,
2992                prop_name,
2993                Int64Builder,
2994                |v: &Value| v.as_i64(),
2995                |v| v
2996            )
2997        }
2998        DataType::Int32 => {
2999            build_numeric_column!(
3000                vids,
3001                props_map,
3002                prop_name,
3003                Int32Builder,
3004                |v: &Value| v.as_i64(),
3005                |v: i64| v as i32
3006            )
3007        }
3008        DataType::Float64 => {
3009            build_numeric_column!(
3010                vids,
3011                props_map,
3012                prop_name,
3013                Float64Builder,
3014                |v: &Value| v.as_f64(),
3015                |v| v
3016            )
3017        }
3018        DataType::Float32 => {
3019            build_numeric_column!(
3020                vids,
3021                props_map,
3022                prop_name,
3023                Float32Builder,
3024                |v: &Value| v.as_f64(),
3025                |v: f64| v as f32
3026            )
3027        }
3028        DataType::Boolean => {
3029            let mut builder = BooleanBuilder::new();
3030            for vid in vids {
3031                match get_property_value(vid, props_map, prop_name) {
3032                    Some(Value::Bool(b)) => builder.append_value(b),
3033                    _ => builder.append_null(),
3034                }
3035            }
3036            Ok(Arc::new(builder.finish()))
3037        }
3038        DataType::UInt64 => {
3039            build_numeric_column!(
3040                vids,
3041                props_map,
3042                prop_name,
3043                UInt64Builder,
3044                |v: &Value| v.as_u64(),
3045                |v| v
3046            )
3047        }
3048        DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
3049            // Vector properties: FixedSizeList(Float32, N)
3050            let values_builder = Float32Builder::new();
3051            let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
3052            for vid in vids {
3053                match get_property_value(vid, props_map, prop_name) {
3054                    Some(Value::Vector(v)) => {
3055                        for val in v {
3056                            list_builder.values().append_value(val);
3057                        }
3058                        list_builder.append(true);
3059                    }
3060                    Some(Value::List(arr)) => {
3061                        for v in arr {
3062                            list_builder
3063                                .values()
3064                                .append_value(v.as_f64().unwrap_or(0.0) as f32);
3065                        }
3066                        list_builder.append(true);
3067                    }
3068                    _ => {
3069                        // Append dim nulls to inner values, then mark row as null
3070                        for _ in 0..*dim {
3071                            list_builder.values().append_null();
3072                        }
3073                        list_builder.append(false);
3074                    }
3075                }
3076            }
3077            Ok(Arc::new(list_builder.finish()))
3078        }
3079        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3080            // Timestamp properties stored as Value::Temporal, ISO 8601 strings, or i64 nanoseconds
3081            let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3082            for vid in vids {
3083                match get_property_value(vid, props_map, prop_name) {
3084                    Some(Value::Temporal(tv)) => match tv {
3085                        uni_common::TemporalValue::DateTime {
3086                            nanos_since_epoch, ..
3087                        }
3088                        | uni_common::TemporalValue::LocalDateTime {
3089                            nanos_since_epoch, ..
3090                        } => {
3091                            builder.append_value(nanos_since_epoch);
3092                        }
3093                        uni_common::TemporalValue::Date { days_since_epoch } => {
3094                            builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3095                        }
3096                        _ => builder.append_null(),
3097                    },
3098                    Some(Value::String(s)) => match parse_datetime_utc(&s) {
3099                        Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3100                        Err(_) => builder.append_null(),
3101                    },
3102                    Some(Value::Int(n)) => {
3103                        builder.append_value(n);
3104                    }
3105                    _ => builder.append_null(),
3106                }
3107            }
3108            Ok(Arc::new(builder.finish()))
3109        }
3110        DataType::Date32 => {
3111            let mut builder = Date32Builder::new();
3112            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3113            for vid in vids {
3114                match get_property_value(vid, props_map, prop_name) {
3115                    Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3116                        builder.append_value(days_since_epoch);
3117                    }
3118                    Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3119                        Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3120                        Err(_) => builder.append_null(),
3121                    },
3122                    Some(Value::Int(n)) => {
3123                        builder.append_value(n as i32);
3124                    }
3125                    _ => builder.append_null(),
3126                }
3127            }
3128            Ok(Arc::new(builder.finish()))
3129        }
3130        DataType::Time64(TimeUnit::Nanosecond) => {
3131            let mut builder = Time64NanosecondBuilder::new();
3132            for vid in vids {
3133                match get_property_value(vid, props_map, prop_name) {
3134                    Some(Value::Temporal(
3135                        uni_common::TemporalValue::LocalTime {
3136                            nanos_since_midnight,
3137                        }
3138                        | uni_common::TemporalValue::Time {
3139                            nanos_since_midnight,
3140                            ..
3141                        },
3142                    )) => {
3143                        builder.append_value(nanos_since_midnight);
3144                    }
3145                    Some(Value::Temporal(_)) => builder.append_null(),
3146                    Some(Value::String(s)) => {
3147                        match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3148                            .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3149                        {
3150                            Ok(t) => {
3151                                let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3152                                    + t.nanosecond() as i64;
3153                                builder.append_value(nanos);
3154                            }
3155                            Err(_) => builder.append_null(),
3156                        }
3157                    }
3158                    Some(Value::Int(n)) => {
3159                        builder.append_value(n);
3160                    }
3161                    _ => builder.append_null(),
3162                }
3163            }
3164            Ok(Arc::new(builder.finish()))
3165        }
3166        DataType::Interval(IntervalUnit::MonthDayNano) => {
3167            let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3168                Vec::with_capacity(vids.len());
3169            for vid in vids {
3170                match get_property_value(vid, props_map, prop_name) {
3171                    Some(Value::Temporal(uni_common::TemporalValue::Duration {
3172                        months,
3173                        days,
3174                        nanos,
3175                    })) => {
3176                        values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3177                            months: months as i32,
3178                            days: days as i32,
3179                            nanoseconds: nanos,
3180                        }));
3181                    }
3182                    Some(Value::Int(_n)) => {
3183                        values.push(None);
3184                    }
3185                    _ => values.push(None),
3186                }
3187            }
3188            let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3189            Ok(Arc::new(arr))
3190        }
3191        DataType::List(inner_field) => {
3192            build_list_property_column(vids, props_map, prop_name, inner_field)
3193        }
3194        DataType::Struct(fields) => {
3195            build_struct_property_column(vids, props_map, prop_name, fields)
3196        }
3197        DataType::FixedSizeBinary(24) => {
3198            // BTIC temporal interval columns: encode as FixedSizeBinary(24)
3199            use arrow_array::builder::FixedSizeBinaryBuilder;
3200            const BTIC_LEN: i32 = 24;
3201            let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3202            for vid in vids {
3203                match get_property_value(vid, props_map, prop_name) {
3204                    Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3205                        match uni_btic::Btic::new(lo, hi, meta) {
3206                            Ok(b) => {
3207                                builder
3208                                    .append_value(uni_btic::encode::encode(&b))
3209                                    .map_err(arrow_err)?;
3210                            }
3211                            Err(e) => {
3212                                tracing::warn!(
3213                                    "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3214                                    prop_name,
3215                                    lo,
3216                                    hi,
3217                                    meta,
3218                                    e
3219                                );
3220                                builder.append_null()
3221                            }
3222                        }
3223                    }
3224                    Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3225                        Ok(b) => {
3226                            builder
3227                                .append_value(uni_btic::encode::encode(&b))
3228                                .map_err(arrow_err)?;
3229                        }
3230                        Err(e) => {
3231                            tracing::warn!(
3232                                "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3233                                prop_name,
3234                                s,
3235                                e
3236                            );
3237                            builder.append_null()
3238                        }
3239                    },
3240                    _ => builder.append_null(),
3241                }
3242            }
3243            Ok(Arc::new(builder.finish()))
3244        }
3245        // Default: convert to string
3246        _ => {
3247            let mut builder = StringBuilder::new();
3248            for vid in vids {
3249                match get_property_value(vid, props_map, prop_name) {
3250                    Some(Value::Null) | None => builder.append_null(),
3251                    Some(other) => builder.append_value(other.to_string()),
3252                }
3253            }
3254            Ok(Arc::new(builder.finish()))
3255        }
3256    }
3257}
3258
3259/// Build a List-typed Arrow column from list property values.
3260fn build_list_property_column(
3261    vids: &[Vid],
3262    props_map: &HashMap<Vid, Properties>,
3263    prop_name: &str,
3264    inner_field: &Arc<Field>,
3265) -> DFResult<ArrayRef> {
3266    match inner_field.data_type() {
3267        DataType::Utf8 => {
3268            let mut builder = ListBuilder::new(StringBuilder::new());
3269            for vid in vids {
3270                match get_property_value(vid, props_map, prop_name) {
3271                    Some(Value::List(arr)) => {
3272                        for v in arr {
3273                            match v {
3274                                Value::String(s) => builder.values().append_value(s),
3275                                Value::Null => builder.values().append_null(),
3276                                other => builder.values().append_value(format!("{other:?}")),
3277                            }
3278                        }
3279                        builder.append(true);
3280                    }
3281                    _ => builder.append(false),
3282                }
3283            }
3284            Ok(Arc::new(builder.finish()))
3285        }
3286        DataType::Int64 => {
3287            let mut builder = ListBuilder::new(Int64Builder::new());
3288            for vid in vids {
3289                match get_property_value(vid, props_map, prop_name) {
3290                    Some(Value::List(arr)) => {
3291                        for v in arr {
3292                            match v.as_i64() {
3293                                Some(n) => builder.values().append_value(n),
3294                                None => builder.values().append_null(),
3295                            }
3296                        }
3297                        builder.append(true);
3298                    }
3299                    _ => builder.append(false),
3300                }
3301            }
3302            Ok(Arc::new(builder.finish()))
3303        }
3304        DataType::Float64 => {
3305            let mut builder = ListBuilder::new(Float64Builder::new());
3306            for vid in vids {
3307                match get_property_value(vid, props_map, prop_name) {
3308                    Some(Value::List(arr)) => {
3309                        for v in arr {
3310                            match v.as_f64() {
3311                                Some(n) => builder.values().append_value(n),
3312                                None => builder.values().append_null(),
3313                            }
3314                        }
3315                        builder.append(true);
3316                    }
3317                    _ => builder.append(false),
3318                }
3319            }
3320            Ok(Arc::new(builder.finish()))
3321        }
3322        DataType::Boolean => {
3323            let mut builder = ListBuilder::new(BooleanBuilder::new());
3324            for vid in vids {
3325                match get_property_value(vid, props_map, prop_name) {
3326                    Some(Value::List(arr)) => {
3327                        for v in arr {
3328                            match v.as_bool() {
3329                                Some(b) => builder.values().append_value(b),
3330                                None => builder.values().append_null(),
3331                            }
3332                        }
3333                        builder.append(true);
3334                    }
3335                    _ => builder.append(false),
3336                }
3337            }
3338            Ok(Arc::new(builder.finish()))
3339        }
3340        DataType::Struct(fields) => {
3341            // Map types are List(Struct(key, value)) — build struct inner elements
3342            build_list_of_structs_column(vids, props_map, prop_name, fields)
3343        }
3344        // Fallback: serialize inner elements as strings
3345        _ => {
3346            let mut builder = ListBuilder::new(StringBuilder::new());
3347            for vid in vids {
3348                match get_property_value(vid, props_map, prop_name) {
3349                    Some(Value::List(arr)) => {
3350                        for v in arr {
3351                            match v {
3352                                Value::Null => builder.values().append_null(),
3353                                other => builder.values().append_value(format!("{other:?}")),
3354                            }
3355                        }
3356                        builder.append(true);
3357                    }
3358                    _ => builder.append(false),
3359                }
3360            }
3361            Ok(Arc::new(builder.finish()))
3362        }
3363    }
3364}
3365
3366/// Build a List(Struct(...)) column, used for Map-type properties.
3367///
3368/// Handles two value representations:
3369/// - `Value::List([Map{key: k, value: v}, ...])` — pre-converted kv pairs
3370/// - `Value::Map({k1: v1, k2: v2})` — raw map objects (converted to kv pairs)
3371fn build_list_of_structs_column(
3372    vids: &[Vid],
3373    props_map: &HashMap<Vid, Properties>,
3374    prop_name: &str,
3375    fields: &Fields,
3376) -> DFResult<ArrayRef> {
3377    use arrow_array::StructArray;
3378
3379    let values: Vec<Option<Value>> = vids
3380        .iter()
3381        .map(|vid| get_property_value(vid, props_map, prop_name))
3382        .collect();
3383
3384    // Convert each row's value to an owned Vec of Maps (key-value pairs).
3385    // This normalizes both List-of-maps and Map representations.
3386    let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3387        .iter()
3388        .map(|val| match val {
3389            Some(Value::List(arr)) => {
3390                let objs: Vec<HashMap<String, Value>> = arr
3391                    .iter()
3392                    .filter_map(|v| {
3393                        if let Value::Map(m) = v {
3394                            Some(m.clone())
3395                        } else {
3396                            None
3397                        }
3398                    })
3399                    .collect();
3400                if objs.is_empty() { None } else { Some(objs) }
3401            }
3402            Some(Value::Map(obj)) => {
3403                // Map property: convert {k1: v1, k2: v2} -> [{key: k1, value: v1}, ...]
3404                let kv_pairs: Vec<HashMap<String, Value>> = obj
3405                    .iter()
3406                    .map(|(k, v)| {
3407                        let mut m = HashMap::new();
3408                        m.insert("key".to_string(), Value::String(k.clone()));
3409                        m.insert("value".to_string(), v.clone());
3410                        m
3411                    })
3412                    .collect();
3413                Some(kv_pairs)
3414            }
3415            _ => None,
3416        })
3417        .collect();
3418
3419    let total_items: usize = rows
3420        .iter()
3421        .filter_map(|r| r.as_ref())
3422        .map(|v| v.len())
3423        .sum();
3424
3425    // Build child arrays for each field in the struct
3426    let child_arrays: Vec<ArrayRef> = fields
3427        .iter()
3428        .map(|field| {
3429            let field_name = field.name();
3430            match field.data_type() {
3431                DataType::Utf8 => {
3432                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3433                    for obj in rows.iter().flatten().flatten() {
3434                        match obj.get(field_name) {
3435                            Some(Value::String(s)) => builder.append_value(s),
3436                            Some(Value::Null) | None => builder.append_null(),
3437                            Some(other) => builder.append_value(format!("{other:?}")),
3438                        }
3439                    }
3440                    Arc::new(builder.finish()) as ArrayRef
3441                }
3442                DataType::Int64 => {
3443                    let mut builder = Int64Builder::with_capacity(total_items);
3444                    for obj in rows.iter().flatten().flatten() {
3445                        match obj.get(field_name).and_then(|v| v.as_i64()) {
3446                            Some(n) => builder.append_value(n),
3447                            None => builder.append_null(),
3448                        }
3449                    }
3450                    Arc::new(builder.finish()) as ArrayRef
3451                }
3452                DataType::Float64 => {
3453                    let mut builder = Float64Builder::with_capacity(total_items);
3454                    for obj in rows.iter().flatten().flatten() {
3455                        match obj.get(field_name).and_then(|v| v.as_f64()) {
3456                            Some(n) => builder.append_value(n),
3457                            None => builder.append_null(),
3458                        }
3459                    }
3460                    Arc::new(builder.finish()) as ArrayRef
3461                }
3462                // Fallback: serialize as string
3463                _ => {
3464                    let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3465                    for obj in rows.iter().flatten().flatten() {
3466                        match obj.get(field_name) {
3467                            Some(Value::Null) | None => builder.append_null(),
3468                            Some(other) => builder.append_value(format!("{other:?}")),
3469                        }
3470                    }
3471                    Arc::new(builder.finish()) as ArrayRef
3472                }
3473            }
3474        })
3475        .collect();
3476
3477    // Build struct array from children
3478    let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3479        .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3480
3481    // Build list offsets
3482    let mut offsets = Vec::with_capacity(vids.len() + 1);
3483    let mut nulls = Vec::with_capacity(vids.len());
3484    let mut offset = 0i32;
3485    offsets.push(offset);
3486    for row in &rows {
3487        match row {
3488            Some(objs) => {
3489                offset += objs.len() as i32;
3490                offsets.push(offset);
3491                nulls.push(true);
3492            }
3493            None => {
3494                offsets.push(offset);
3495                nulls.push(false);
3496            }
3497        }
3498    }
3499
3500    let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3501    let list_array = arrow_array::ListArray::try_new(
3502        list_field,
3503        arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3504        Arc::new(struct_array),
3505        Some(arrow::buffer::NullBuffer::from(nulls)),
3506    )
3507    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3508
3509    Ok(Arc::new(list_array))
3510}
3511
3512/// Convert a TemporalValue into a HashMap matching the Arrow struct field names,
3513/// so that `build_struct_property_column` can extract fields uniformly.
3514fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3515    use uni_common::value::TemporalValue;
3516    let mut m = HashMap::new();
3517    match tv {
3518        TemporalValue::DateTime {
3519            nanos_since_epoch,
3520            offset_seconds,
3521            timezone_name,
3522        } => {
3523            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3524            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3525            if let Some(tz) = timezone_name {
3526                m.insert("timezone_name".into(), Value::String(tz.clone()));
3527            }
3528        }
3529        TemporalValue::LocalDateTime { nanos_since_epoch } => {
3530            m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3531        }
3532        TemporalValue::Time {
3533            nanos_since_midnight,
3534            offset_seconds,
3535        } => {
3536            m.insert(
3537                "nanos_since_midnight".into(),
3538                Value::Int(*nanos_since_midnight),
3539            );
3540            m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3541        }
3542        TemporalValue::LocalTime {
3543            nanos_since_midnight,
3544        } => {
3545            m.insert(
3546                "nanos_since_midnight".into(),
3547                Value::Int(*nanos_since_midnight),
3548            );
3549        }
3550        TemporalValue::Date { days_since_epoch } => {
3551            m.insert(
3552                "days_since_epoch".into(),
3553                Value::Int(*days_since_epoch as i64),
3554            );
3555        }
3556        TemporalValue::Duration {
3557            months,
3558            days,
3559            nanos,
3560        } => {
3561            m.insert("months".into(), Value::Int(*months));
3562            m.insert("days".into(), Value::Int(*days));
3563            m.insert("nanos".into(), Value::Int(*nanos));
3564        }
3565        TemporalValue::Btic { lo, hi, meta } => {
3566            m.insert("lo".into(), Value::Int(*lo));
3567            m.insert("hi".into(), Value::Int(*hi));
3568            m.insert("meta".into(), Value::Int(*meta as i64));
3569        }
3570    }
3571    m
3572}
3573
3574/// Build a Struct-typed Arrow column from Map property values (e.g. Point types).
3575fn build_struct_property_column(
3576    vids: &[Vid],
3577    props_map: &HashMap<Vid, Properties>,
3578    prop_name: &str,
3579    fields: &Fields,
3580) -> DFResult<ArrayRef> {
3581    use arrow_array::StructArray;
3582
3583    // Convert raw values, expanding Temporal values into Map representation
3584    // so the struct field extraction below works uniformly.
3585    let values: Vec<Option<Value>> = vids
3586        .iter()
3587        .map(|vid| {
3588            let val = get_property_value(vid, props_map, prop_name);
3589            match val {
3590                Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3591                other => other,
3592            }
3593        })
3594        .collect();
3595
3596    let child_arrays: Vec<ArrayRef> = fields
3597        .iter()
3598        .map(|field| {
3599            let field_name = field.name();
3600            match field.data_type() {
3601                DataType::Float64 => {
3602                    let mut builder = Float64Builder::with_capacity(vids.len());
3603                    for val in &values {
3604                        match val {
3605                            Some(Value::Map(obj)) => {
3606                                match obj.get(field_name).and_then(|v| v.as_f64()) {
3607                                    Some(n) => builder.append_value(n),
3608                                    None => builder.append_null(),
3609                                }
3610                            }
3611                            _ => builder.append_null(),
3612                        }
3613                    }
3614                    Arc::new(builder.finish()) as ArrayRef
3615                }
3616                DataType::Utf8 => {
3617                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3618                    for val in &values {
3619                        match val {
3620                            Some(Value::Map(obj)) => match obj.get(field_name) {
3621                                Some(Value::String(s)) => builder.append_value(s),
3622                                Some(Value::Null) | None => builder.append_null(),
3623                                Some(other) => builder.append_value(format!("{other:?}")),
3624                            },
3625                            _ => builder.append_null(),
3626                        }
3627                    }
3628                    Arc::new(builder.finish()) as ArrayRef
3629                }
3630                DataType::Int64 => {
3631                    let mut builder = Int64Builder::with_capacity(vids.len());
3632                    for val in &values {
3633                        match val {
3634                            Some(Value::Map(obj)) => {
3635                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3636                                    Some(n) => builder.append_value(n),
3637                                    None => builder.append_null(),
3638                                }
3639                            }
3640                            _ => builder.append_null(),
3641                        }
3642                    }
3643                    Arc::new(builder.finish()) as ArrayRef
3644                }
3645                DataType::Timestamp(_, _) => {
3646                    let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3647                    for val in &values {
3648                        match val {
3649                            Some(Value::Map(obj)) => {
3650                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3651                                    Some(n) => builder.append_value(n),
3652                                    None => builder.append_null(),
3653                                }
3654                            }
3655                            _ => builder.append_null(),
3656                        }
3657                    }
3658                    Arc::new(builder.finish()) as ArrayRef
3659                }
3660                DataType::Int32 => {
3661                    let mut builder = Int32Builder::with_capacity(vids.len());
3662                    for val in &values {
3663                        match val {
3664                            Some(Value::Map(obj)) => {
3665                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3666                                    Some(n) => builder.append_value(n as i32),
3667                                    None => builder.append_null(),
3668                                }
3669                            }
3670                            _ => builder.append_null(),
3671                        }
3672                    }
3673                    Arc::new(builder.finish()) as ArrayRef
3674                }
3675                DataType::Time64(_) => {
3676                    let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3677                    for val in &values {
3678                        match val {
3679                            Some(Value::Map(obj)) => {
3680                                match obj.get(field_name).and_then(|v| v.as_i64()) {
3681                                    Some(n) => builder.append_value(n),
3682                                    None => builder.append_null(),
3683                                }
3684                            }
3685                            _ => builder.append_null(),
3686                        }
3687                    }
3688                    Arc::new(builder.finish()) as ArrayRef
3689                }
3690                // Fallback: serialize as string
3691                _ => {
3692                    let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3693                    for val in &values {
3694                        match val {
3695                            Some(Value::Map(obj)) => match obj.get(field_name) {
3696                                Some(Value::Null) | None => builder.append_null(),
3697                                Some(other) => builder.append_value(format!("{other:?}")),
3698                            },
3699                            _ => builder.append_null(),
3700                        }
3701                    }
3702                    Arc::new(builder.finish()) as ArrayRef
3703                }
3704            }
3705        })
3706        .collect();
3707
3708    // Build null bitmap — null when the value is null/missing
3709    let nulls: Vec<bool> = values
3710        .iter()
3711        .map(|v| matches!(v, Some(Value::Map(_))))
3712        .collect();
3713
3714    let struct_array = StructArray::try_new(
3715        fields.clone(),
3716        child_arrays,
3717        Some(arrow::buffer::NullBuffer::from(nulls)),
3718    )
3719    .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3720
3721    Ok(Arc::new(struct_array))
3722}
3723
3724impl Stream for GraphScanStream {
3725    type Item = DFResult<RecordBatch>;
3726
3727    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3728        let metrics = self.metrics.clone();
3729        let _timer = metrics.elapsed_compute().timer();
3730        loop {
3731            // Use a temporary to avoid borrow issues
3732            let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3733
3734            match state {
3735                GraphScanState::Init => {
3736                    // Create the future with cloned data for ownership
3737                    let graph_ctx = self.graph_ctx.clone();
3738                    let label = self.label.clone();
3739                    let variable = self.variable.clone();
3740                    let properties = self.properties.clone();
3741                    let is_edge_scan = self.is_edge_scan;
3742                    let is_schemaless = self.is_schemaless;
3743                    let filter = self.filter.clone();
3744                    let vid_list_filter = self.vid_list_filter.clone();
3745                    let extra_lance_filter = self.extra_lance_filter.clone();
3746                    let extra_runtime_filter = self.extra_runtime_filter.clone();
3747                    let schema = self.schema.clone();
3748
3749                    let fut = async move {
3750                        graph_ctx.check_timeout().map_err(|e| {
3751                            datafusion::error::DataFusionError::Execution(e.to_string())
3752                        })?;
3753
3754                        let batch = if is_edge_scan {
3755                            columnar_scan_edge_batch_static(
3756                                &graph_ctx,
3757                                &label,
3758                                &variable,
3759                                &properties,
3760                                &schema,
3761                            )
3762                            .await?
3763                        } else if is_schemaless {
3764                            columnar_scan_schemaless_vertex_batch_static(
3765                                &graph_ctx,
3766                                &label,
3767                                &variable,
3768                                &properties,
3769                                &schema,
3770                                &filter,
3771                                vid_list_filter.as_deref(),
3772                                extra_lance_filter.as_deref(),
3773                                extra_runtime_filter.as_ref(),
3774                            )
3775                            .await?
3776                        } else {
3777                            columnar_scan_vertex_batch_static(
3778                                &graph_ctx,
3779                                &label,
3780                                &variable,
3781                                &properties,
3782                                &schema,
3783                                &filter,
3784                                vid_list_filter.as_deref(),
3785                                extra_lance_filter.as_deref(),
3786                                extra_runtime_filter.as_ref(),
3787                            )
3788                            .await?
3789                        };
3790                        Ok(Some(batch))
3791                    };
3792
3793                    self.state = GraphScanState::Executing(Box::pin(fut));
3794                    // Continue loop to poll the future
3795                }
3796                GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3797                    Poll::Ready(Ok(batch)) => {
3798                        self.state = GraphScanState::Done;
3799                        self.metrics
3800                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3801                        return Poll::Ready(batch.map(Ok));
3802                    }
3803                    Poll::Ready(Err(e)) => {
3804                        self.state = GraphScanState::Done;
3805                        return Poll::Ready(Some(Err(e)));
3806                    }
3807                    Poll::Pending => {
3808                        self.state = GraphScanState::Executing(fut);
3809                        return Poll::Pending;
3810                    }
3811                },
3812                GraphScanState::Done => {
3813                    return Poll::Ready(None);
3814                }
3815            }
3816        }
3817    }
3818}
3819
3820impl RecordBatchStream for GraphScanStream {
3821    fn schema(&self) -> SchemaRef {
3822        self.schema.clone()
3823    }
3824}
3825
3826#[cfg(test)]
3827mod tests {
3828    use super::*;
3829
3830    #[test]
3831    fn test_build_vertex_schema() {
3832        let uni_schema = UniSchema::default();
3833        let schema = GraphScanExec::build_vertex_schema(
3834            "n",
3835            "Person",
3836            &["name".to_string(), "age".to_string()],
3837            &uni_schema,
3838        );
3839
3840        assert_eq!(schema.fields().len(), 4);
3841        assert_eq!(schema.field(0).name(), "n._vid");
3842        assert_eq!(schema.field(1).name(), "n._labels");
3843        assert_eq!(schema.field(2).name(), "n.name");
3844        assert_eq!(schema.field(3).name(), "n.age");
3845    }
3846
3847    #[test]
3848    fn test_build_edge_schema() {
3849        let uni_schema = UniSchema::default();
3850        let schema =
3851            GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3852
3853        assert_eq!(schema.fields().len(), 4);
3854        assert_eq!(schema.field(0).name(), "r._eid");
3855        assert_eq!(schema.field(1).name(), "r._src_vid");
3856        assert_eq!(schema.field(2).name(), "r._dst_vid");
3857        assert_eq!(schema.field(3).name(), "r.weight");
3858    }
3859
3860    #[test]
3861    fn test_build_schemaless_vertex_schema() {
3862        let empty_schema = uni_common::core::schema::Schema::default();
3863        let schema = GraphScanExec::build_schemaless_vertex_schema(
3864            "n",
3865            &["name".to_string(), "age".to_string()],
3866            &empty_schema,
3867        );
3868
3869        assert_eq!(schema.fields().len(), 4);
3870        assert_eq!(schema.field(0).name(), "n._vid");
3871        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3872        assert_eq!(schema.field(1).name(), "n._labels");
3873        assert_eq!(schema.field(2).name(), "n.name");
3874        // With empty schema, falls back to LargeBinary
3875        assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3876        assert_eq!(schema.field(3).name(), "n.age");
3877        assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3878    }
3879
3880    #[test]
3881    fn test_schemaless_all_scan_has_empty_label() {
3882        let empty_schema = uni_common::core::schema::Schema::default();
3883        let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3884
3885        // Verify the schema has _vid and _labels columns for a scan with no properties
3886        assert_eq!(schema.fields().len(), 2);
3887        assert_eq!(schema.field(0).name(), "n._vid");
3888        assert_eq!(schema.field(1).name(), "n._labels");
3889    }
3890
3891    #[test]
3892    fn test_cypher_value_all_props_extraction() {
3893        // Encode a property map directly via the CypherValue codec (the path the
3894        // `_all_props` builders use).
3895        let map: HashMap<String, Value> = [
3896            ("age".to_string(), Value::Int(30)),
3897            ("name".to_string(), Value::String("Alice".to_string())),
3898        ]
3899        .into_iter()
3900        .collect();
3901        let cv_bytes = uni_common::cypher_value_codec::encode(&Value::Map(map));
3902
3903        // Decode and extract "age" value
3904        let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3905        match decoded {
3906            uni_common::Value::Map(map) => {
3907                let age_val = map.get("age").unwrap();
3908                assert_eq!(age_val, &uni_common::Value::Int(30));
3909            }
3910            _ => panic!("Expected Map"),
3911        }
3912
3913        // Also test single value encoding
3914        let single_bytes = uni_common::cypher_value_codec::encode(&Value::Int(30));
3915        let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3916        assert_eq!(single_decoded, uni_common::Value::Int(30));
3917    }
3918
3919    /// Helper to build a RecordBatch with _vid, _deleted, _version columns for testing.
3920    fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3921        let schema = Arc::new(Schema::new(vec![
3922            Field::new("_vid", DataType::UInt64, false),
3923            Field::new("_deleted", DataType::Boolean, false),
3924            Field::new("_version", DataType::UInt64, false),
3925            Field::new("name", DataType::Utf8, true),
3926        ]));
3927        // Generate name values like "v{vid}_ver{version}" for tracking which row wins
3928        let names: Vec<String> = vids
3929            .iter()
3930            .zip(versions.iter())
3931            .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3932            .collect();
3933        let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3934
3935        RecordBatch::try_new(
3936            schema,
3937            vec![
3938                Arc::new(UInt64Array::from(vids.to_vec())),
3939                Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3940                Arc::new(UInt64Array::from(versions.to_vec())),
3941                Arc::new(name_arr),
3942            ],
3943        )
3944        .unwrap()
3945    }
3946
3947    #[test]
3948    fn test_mvcc_dedup_multiple_versions() {
3949        // VID 1 at versions 3, 1, 5 — should keep version 5
3950        // VID 2 at versions 2, 4 — should keep version 4
3951        let batch = make_mvcc_batch(
3952            &[1, 1, 1, 2, 2],
3953            &[3, 1, 5, 2, 4],
3954            &[false, false, false, false, false],
3955        );
3956
3957        let result = mvcc_dedup_batch(&batch).unwrap();
3958        assert_eq!(result.num_rows(), 2);
3959
3960        let vid_col = result
3961            .column_by_name("_vid")
3962            .unwrap()
3963            .as_any()
3964            .downcast_ref::<UInt64Array>()
3965            .unwrap();
3966        let ver_col = result
3967            .column_by_name("_version")
3968            .unwrap()
3969            .as_any()
3970            .downcast_ref::<UInt64Array>()
3971            .unwrap();
3972        let name_col = result
3973            .column_by_name("name")
3974            .unwrap()
3975            .as_any()
3976            .downcast_ref::<arrow_array::StringArray>()
3977            .unwrap();
3978
3979        // VID 1 → version 5, VID 2 → version 4
3980        assert_eq!(vid_col.value(0), 1);
3981        assert_eq!(ver_col.value(0), 5);
3982        assert_eq!(name_col.value(0), "v1_ver5");
3983
3984        assert_eq!(vid_col.value(1), 2);
3985        assert_eq!(ver_col.value(1), 4);
3986        assert_eq!(name_col.value(1), "v2_ver4");
3987    }
3988
3989    #[test]
3990    fn test_mvcc_dedup_single_rows() {
3991        // Each VID appears once — nothing should change
3992        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3993        let result = mvcc_dedup_batch(&batch).unwrap();
3994        assert_eq!(result.num_rows(), 3);
3995    }
3996
3997    #[test]
3998    fn test_mvcc_dedup_empty() {
3999        let batch = make_mvcc_batch(&[], &[], &[]);
4000        let result = mvcc_dedup_batch(&batch).unwrap();
4001        assert_eq!(result.num_rows(), 0);
4002    }
4003
4004    #[test]
4005    fn test_filter_l0_tombstones_removes_tombstoned() {
4006        use crate::query::df_graph::L0Context;
4007
4008        // Create a batch with VIDs 1, 2, 3
4009        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4010
4011        // Create L0 context with VID 2 tombstoned
4012        let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
4013        {
4014            // We need to insert a tombstone — L0Buffer has pub vertex_tombstones
4015            // But we can't easily create one with tombstones through the constructor.
4016            // Use a direct approach.
4017        }
4018        let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
4019        l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
4020
4021        let l0_ctx = L0Context {
4022            current_l0: Some(l0_buf),
4023            transaction_l0: None,
4024            pending_flush_l0s: vec![],
4025        };
4026
4027        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4028        assert_eq!(result.num_rows(), 2);
4029
4030        let vid_col = result
4031            .column_by_name("_vid")
4032            .unwrap()
4033            .as_any()
4034            .downcast_ref::<UInt64Array>()
4035            .unwrap();
4036        assert_eq!(vid_col.value(0), 1);
4037        assert_eq!(vid_col.value(1), 3);
4038    }
4039
4040    #[test]
4041    fn test_filter_l0_tombstones_none() {
4042        use crate::query::df_graph::L0Context;
4043
4044        let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4045        let l0_ctx = L0Context::default();
4046
4047        let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4048        assert_eq!(result.num_rows(), 3);
4049    }
4050
4051    #[test]
4052    fn test_map_to_output_schema_basic() {
4053        use crate::query::df_graph::L0Context;
4054
4055        // Input: Lance-schema batch with _vid, _deleted, _version, name columns
4056        let lance_schema = Arc::new(Schema::new(vec![
4057            Field::new("_vid", DataType::UInt64, false),
4058            Field::new("_deleted", DataType::Boolean, false),
4059            Field::new("_version", DataType::UInt64, false),
4060            Field::new("name", DataType::Utf8, true),
4061        ]));
4062        let name_arr: arrow_array::StringArray =
4063            vec![Some("Alice"), Some("Bob")].into_iter().collect();
4064        let batch = RecordBatch::try_new(
4065            lance_schema,
4066            vec![
4067                Arc::new(UInt64Array::from(vec![1u64, 2])),
4068                Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
4069                Arc::new(UInt64Array::from(vec![1u64, 1])),
4070                Arc::new(name_arr),
4071            ],
4072        )
4073        .unwrap();
4074
4075        // Output schema: n._vid, n._labels, n.name
4076        let output_schema = Arc::new(Schema::new(vec![
4077            Field::new("n._vid", DataType::UInt64, false),
4078            Field::new("n._labels", labels_data_type(), true),
4079            Field::new("n.name", DataType::Utf8, true),
4080        ]));
4081
4082        let l0_ctx = L0Context::default();
4083        let result = map_to_output_schema(
4084            &batch,
4085            "Person",
4086            "n",
4087            &["name".to_string()],
4088            &output_schema,
4089            &l0_ctx,
4090        )
4091        .unwrap();
4092
4093        assert_eq!(result.num_rows(), 2);
4094        assert_eq!(result.schema().fields().len(), 3);
4095        assert_eq!(result.schema().field(0).name(), "n._vid");
4096        assert_eq!(result.schema().field(1).name(), "n._labels");
4097        assert_eq!(result.schema().field(2).name(), "n.name");
4098
4099        // Check name values carried through
4100        let name_col = result
4101            .column(2)
4102            .as_any()
4103            .downcast_ref::<arrow_array::StringArray>()
4104            .unwrap();
4105        assert_eq!(name_col.value(0), "Alice");
4106        assert_eq!(name_col.value(1), "Bob");
4107    }
4108}