Skip to main content

uni_query/query/df_graph/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Common helpers shared across graph execution plan implementations.
5//!
6//! This module provides shared utilities to reduce code duplication across
7//! the df_graph module's execution plan implementations.
8
9use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_planner::HybridPhysicalPlanner;
29use crate::query::planner::LogicalPlan;
30
31/// Compute standard plan properties for graph operators.
32///
33/// All graph operators use the same plan properties:
34/// - Unknown partitioning with 1 partition
35/// - Incremental emission type
36/// - Bounded execution
37pub fn compute_plan_properties(schema: SchemaRef) -> PlanProperties {
38    PlanProperties::new(
39        EquivalenceProperties::new(schema),
40        Partitioning::UnknownPartitioning(1),
41        datafusion::physical_plan::execution_plan::EmissionType::Incremental,
42        datafusion::physical_plan::execution_plan::Boundedness::Bounded,
43    )
44}
45
46/// Return the Arrow `DataType` for `_labels` columns: `List<Utf8>`.
47///
48/// This is used across scan, traverse, bind, and other modules whenever a
49/// `_labels` field needs to be declared in a schema. Centralizing the
50/// definition avoids divergence and reduces boilerplate.
51pub fn labels_data_type() -> DataType {
52    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
53}
54
55/// Extract a `UInt64Array` of vertex/edge IDs from an Arrow column.
56///
57/// Accepts both `UInt64` (native VID type) and `Int64` (from parameter
58/// injection where `arrow_to_json_value` round-trips through `Value::Int`).
59/// For `Int64` columns the values are cast to `UInt64`.
60///
61/// # Errors
62///
63/// Returns a `DataFusionError::Execution` if the column is neither `UInt64`
64/// nor `Int64`.
65pub fn column_as_vid_array(
66    col: &dyn arrow_array::Array,
67) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
68    use arrow_array::{Int64Array, StructArray, UInt64Array};
69    use arrow_schema::DataType;
70
71    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
72        return Ok(std::borrow::Cow::Borrowed(arr));
73    }
74
75    if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
76        let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
77        return Ok(std::borrow::Cow::Owned(cast));
78    }
79
80    // Support entity-struct aliases (e.g., WITH coalesce(b, c) AS x) where
81    // traversal inputs may provide the source as a Struct with an "_vid" field.
82    if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
83        && let DataType::Struct(fields) = arr.data_type()
84        && let Some((vid_idx, _)) = fields.find("_vid")
85    {
86        return column_as_vid_array(arr.column(vid_idx).as_ref());
87    }
88
89    // Support CypherValue-encoded Node values in LargeBinary columns
90    // (e.g., from list comprehension loop variables over node collections)
91    // Also handles JSON round-tripped nodes (Value::Map with _id field)
92    if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
93        let vids = vids_from_large_binary(arr);
94        return Ok(std::borrow::Cow::Owned(vids));
95    }
96
97    // OPTIONAL MATCH can produce all-null columns with Arrow Null type
98    if *col.data_type() == DataType::Null {
99        let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
100        return Ok(std::borrow::Cow::Owned(vids));
101    }
102
103    Err(datafusion::error::DataFusionError::Execution(format!(
104        "VID column has type {:?}, expected UInt64 or Int64",
105        col.data_type()
106    )))
107}
108
109/// Extract a VID from a CypherValue.
110///
111/// Handles both `Value::Node` (native node) and `Value::Map` with `_id` field
112/// (JSON round-tripped node from `cv_array_to_large_list`).
113fn extract_vid_from_value(val: &Value) -> Option<u64> {
114    match val {
115        Value::Node(node) => Some(node.vid.as_u64()),
116        Value::Map(map) => {
117            // Handle round-tripped nodes that became Maps.
118            // Path nodes use struct fields (_vid, _label, properties) which
119            // round-trip through arrow_to_json_value as { "_vid": Int(N), ... }.
120            // Value::Node → serde_json uses { "_id": "N", ... }.
121            // Check both keys to handle either path.
122
123            // Check _vid first (from path struct → arrow_to_json_value round-trip)
124            if let Some(Value::Int(vid)) = map.get("_vid") {
125                return Some(*vid as u64);
126            }
127            // Also check _id (from Value::Node → serde_json round-trip)
128            if let Some(Value::String(id_str)) = map.get("_id") {
129                return id_str
130                    .strip_prefix("Vid(")
131                    .and_then(|s| s.strip_suffix(')'))
132                    .unwrap_or(id_str)
133                    .parse::<u64>()
134                    .ok();
135            }
136            if let Some(Value::Int(id)) = map.get("_id") {
137                return Some(*id as u64);
138            }
139            None
140        }
141        _ => None,
142    }
143}
144
145/// Extract VIDs from a `LargeBinaryArray` of CypherValue-encoded values.
146///
147/// Decodes each element and delegates to [`extract_vid_from_value`].
148/// Null elements and decode failures produce null VID entries.
149fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
150    use uni_common::cypher_value_codec;
151
152    (0..arr.len())
153        .map(|i| {
154            if arr.is_null(i) {
155                return None;
156            }
157            cypher_value_codec::decode(arr.value(i))
158                .ok()
159                .as_ref()
160                .and_then(extract_vid_from_value)
161        })
162        .collect()
163}
164
165/// Extract VIDs from a column of CypherValue-encoded Node values.
166///
167/// Takes a `LargeBinary` array where each element is a CypherValue-encoded
168/// value and extracts VIDs from Node values. Non-Node values produce nulls.
169/// Also handles JSON round-tripped node Maps from `cv_array_to_large_list`.
170pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
171    let binary_col = col
172        .as_any()
173        .downcast_ref::<arrow_array::LargeBinaryArray>()
174        .ok_or_else(|| {
175            datafusion::error::DataFusionError::Execution(
176                "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
177            )
178        })?;
179    Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
180}
181
182/// Extract a typed value from a column at a given row index.
183///
184/// Looks up `col_name` in the batch schema, downcasts to `T`, and applies
185/// `extract_fn` if the value is valid. Returns `None` if the column is missing,
186/// the downcast fails, or the value is null.
187pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
188    batch: &RecordBatch,
189    col_name: &str,
190    row_idx: usize,
191    extract_fn: impl FnOnce(&T, usize) -> R,
192) -> Option<R> {
193    let (idx, _) = batch.schema().column_with_name(col_name)?;
194    let col = batch.column(idx);
195    let arr = col.as_any().downcast_ref::<T>()?;
196    if arr.is_valid(row_idx) {
197        Some(extract_fn(arr, row_idx))
198    } else {
199        None
200    }
201}
202
203/// Build the standard node struct fields for path structures.
204///
205/// Used when materializing path objects containing nodes.
206/// Fields: `_vid`, `_labels`, `properties`
207pub fn node_struct_fields() -> arrow_schema::Fields {
208    arrow_schema::Fields::from(vec![
209        Field::new("_vid", DataType::UInt64, false),
210        Field::new("_labels", labels_data_type(), true),
211        Field::new("properties", DataType::LargeBinary, true),
212    ])
213}
214
215/// Build the standard edge struct fields for path structures.
216///
217/// Used when materializing path objects containing edges.
218/// Fields: `_eid`, `_type_name`, `_src`, `_dst`, `properties`
219pub fn edge_struct_fields() -> arrow_schema::Fields {
220    arrow_schema::Fields::from(vec![
221        Field::new("_eid", DataType::UInt64, false),
222        Field::new("_type_name", DataType::Utf8, false),
223        Field::new("_src", DataType::UInt64, false),
224        Field::new("_dst", DataType::UInt64, false),
225        Field::new("properties", DataType::LargeBinary, true),
226    ])
227}
228
229/// Encode a properties HashMap to CypherValue bytes for LargeBinary columns.
230///
231/// Used when materializing path properties that need to be stored in LargeBinary
232/// columns. Converts the HashMap into a `Value::Map` and encodes it using the
233/// CypherValue codec.
234pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
235    let val = uni_common::Value::Map(props.clone());
236    uni_common::cypher_value_codec::encode(&val)
237}
238
239/// Build edge list field for schema with given step variable name.
240///
241/// Creates a list of edge structs for the relationship variable in VLP patterns.
242/// For example, `r` in `MATCH (a)-[r*1..3]->(b)` gets a `List<EdgeStruct>`.
243pub fn build_edge_list_field(step_var: &str) -> Field {
244    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
245    // Field must be nullable to support OPTIONAL MATCH unmatched (r = NULL)
246    Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
247}
248
249/// Build path struct field for schema with given path variable name.
250///
251/// Creates a struct field with `nodes` and `relationships` lists.
252pub fn build_path_struct_field(path_var: &str) -> Field {
253    let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
254    let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
255
256    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
257    let relationships_field =
258        Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
259
260    Field::new(
261        path_var,
262        DataType::Struct(arrow_schema::Fields::from(vec![
263            nodes_field,
264            relationships_field,
265        ])),
266        true,
267    )
268}
269
270/// Extend an input schema with a path struct field.
271///
272/// Clones the fields from `input_schema` and appends a path struct field
273/// using [`build_path_struct_field`].
274pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
275    let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
276    fields.push(Arc::new(build_path_struct_field(path_variable)));
277    Arc::new(Schema::new(fields))
278}
279
280/// Build a path struct array from nodes and relationships list arrays.
281///
282/// Combines the nodes and relationships arrays into a single `StructArray` with
283/// the standard path structure (`nodes`, `relationships`), applying the given
284/// validity mask.
285pub fn build_path_struct_array(
286    nodes_array: ArrayRef,
287    rels_array: ArrayRef,
288    path_validity: Vec<bool>,
289) -> DFResult<arrow_array::StructArray> {
290    Ok(arrow_array::StructArray::try_new(
291        arrow_schema::Fields::from(vec![
292            Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
293            Arc::new(Field::new(
294                "relationships",
295                rels_array.data_type().clone(),
296                true,
297            )),
298        ]),
299        vec![nodes_array, rels_array],
300        Some(arrow::buffer::NullBuffer::from(path_validity)),
301    )?)
302}
303
304/// Create a `ListBuilder<StructBuilder>` for building edge list arrays.
305///
306/// Used when materializing edge lists for step variables (`r` in `[r*1..3]`)
307/// and path relationship arrays. Returns a builder whose struct fields match
308/// `edge_struct_fields()`.
309pub fn new_edge_list_builder()
310-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
311    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
312    arrow_array::builder::ListBuilder::new(StructBuilder::new(
313        edge_struct_fields(),
314        vec![
315            Box::new(UInt64Builder::new()),
316            Box::new(StringBuilder::new()),
317            Box::new(UInt64Builder::new()),
318            Box::new(UInt64Builder::new()),
319            Box::new(LargeBinaryBuilder::new()),
320        ],
321    ))
322}
323
324/// Create a `ListBuilder<StructBuilder>` for building node list arrays.
325///
326/// Used when materializing path node arrays. Returns a builder whose struct
327/// fields match `node_struct_fields()`.
328pub fn new_node_list_builder()
329-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
330    use arrow_array::builder::{
331        LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
332    };
333    arrow_array::builder::ListBuilder::new(StructBuilder::new(
334        node_struct_fields(),
335        vec![
336            Box::new(UInt64Builder::new()),
337            Box::new(ListBuilder::new(StringBuilder::new())),
338            Box::new(LargeBinaryBuilder::new()),
339        ],
340    ))
341}
342
343/// Append a single edge to an edge struct builder.
344///
345/// Writes `_eid`, `_type_name`, `_src`, `_dst`, and `properties` fields,
346/// then appends the struct row. The `query_ctx` is used to look up edge
347/// properties from the L0 visibility chain.
348pub fn append_edge_to_struct(
349    struct_builder: &mut arrow_array::builder::StructBuilder,
350    eid: uni_common::core::id::Eid,
351    type_name: &str,
352    src_vid: u64,
353    dst_vid: u64,
354    query_ctx: &uni_store::runtime::context::QueryContext,
355) {
356    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
357    use uni_store::runtime::l0_visibility;
358
359    struct_builder
360        .field_builder::<UInt64Builder>(0)
361        .unwrap()
362        .append_value(eid.as_u64());
363    struct_builder
364        .field_builder::<StringBuilder>(1)
365        .unwrap()
366        .append_value(type_name);
367    struct_builder
368        .field_builder::<UInt64Builder>(2)
369        .unwrap()
370        .append_value(src_vid);
371    struct_builder
372        .field_builder::<UInt64Builder>(3)
373        .unwrap()
374        .append_value(dst_vid);
375    let props_builder = struct_builder
376        .field_builder::<LargeBinaryBuilder>(4)
377        .unwrap();
378    if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
379        let cv_bytes = encode_props_to_cv(&props);
380        props_builder.append_value(&cv_bytes);
381    } else {
382        props_builder.append_null();
383    }
384    struct_builder.append(true);
385}
386
387/// Append a null edge struct row (placeholder values + null validity).
388///
389/// Arrow struct builders require all field builders to advance even for null rows.
390/// This appends default placeholder values and marks the struct row as null.
391fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
392    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
393
394    struct_builder
395        .field_builder::<UInt64Builder>(0)
396        .unwrap()
397        .append_value(0);
398    struct_builder
399        .field_builder::<StringBuilder>(1)
400        .unwrap()
401        .append_value("");
402    struct_builder
403        .field_builder::<UInt64Builder>(2)
404        .unwrap()
405        .append_value(0);
406    struct_builder
407        .field_builder::<UInt64Builder>(3)
408        .unwrap()
409        .append_value(0);
410    struct_builder
411        .field_builder::<LargeBinaryBuilder>(4)
412        .unwrap()
413        .append_null();
414    struct_builder.append(false);
415}
416
417/// Append an edge to a struct builder, handling the `Option<Eid>` case.
418///
419/// When `eid` is `Some`, resolves the type name from `batch_type_name` (primary)
420/// or L0 visibility (fallback), then delegates to [`append_edge_to_struct`].
421/// When `eid` is `None`, appends a null struct row.
422pub fn append_edge_to_struct_optional(
423    struct_builder: &mut arrow_array::builder::StructBuilder,
424    eid: Option<uni_common::core::id::Eid>,
425    src_vid: u64,
426    dst_vid: u64,
427    batch_type_name: Option<String>,
428    query_ctx: &uni_store::runtime::context::QueryContext,
429) {
430    match eid {
431        Some(e) => {
432            use uni_store::runtime::l0_visibility;
433            let type_name = batch_type_name
434                .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
435                .unwrap_or_default();
436            append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
437        }
438        None => append_null_edge_struct(struct_builder),
439    }
440}
441
442/// Append a single node to a node struct builder.
443///
444/// Writes `_vid`, `_labels`, and `properties` fields, then appends the struct
445/// row. The `query_ctx` is used to look up labels and properties from the L0
446/// visibility chain.
447pub fn append_node_to_struct(
448    struct_builder: &mut arrow_array::builder::StructBuilder,
449    vid: uni_common::core::id::Vid,
450    query_ctx: &uni_store::runtime::context::QueryContext,
451) {
452    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
453    use uni_store::runtime::l0_visibility;
454
455    struct_builder
456        .field_builder::<UInt64Builder>(0)
457        .unwrap()
458        .append_value(vid.as_u64());
459    let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
460    let labels_builder = struct_builder
461        .field_builder::<ListBuilder<StringBuilder>>(1)
462        .unwrap();
463    let values = labels_builder.values();
464    for lbl in &labels {
465        values.append_value(lbl);
466    }
467    labels_builder.append(true);
468    let props_builder = struct_builder
469        .field_builder::<LargeBinaryBuilder>(2)
470        .unwrap();
471    if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
472        let cv_bytes = encode_props_to_cv(&props);
473        props_builder.append_value(&cv_bytes);
474    } else {
475        props_builder.append_null();
476    }
477    struct_builder.append(true);
478}
479
480/// Append a null node struct row (placeholder values + null validity).
481///
482/// Arrow struct builders require all field builders to advance even for null rows.
483/// This appends default placeholder values and marks the struct row as null.
484fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
485    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
486
487    struct_builder
488        .field_builder::<UInt64Builder>(0)
489        .unwrap()
490        .append_value(0);
491    struct_builder
492        .field_builder::<ListBuilder<StringBuilder>>(1)
493        .unwrap()
494        .append(true);
495    struct_builder
496        .field_builder::<LargeBinaryBuilder>(2)
497        .unwrap()
498        .append_null();
499    struct_builder.append(false);
500}
501
502/// Append a node to a struct builder, handling the `Option<Vid>` case.
503///
504/// When `vid` is `Some`, delegates to [`append_node_to_struct`].
505/// When `vid` is `None`, appends a null struct row.
506pub fn append_node_to_struct_optional(
507    struct_builder: &mut arrow_array::builder::StructBuilder,
508    vid: Option<uni_common::core::id::Vid>,
509    query_ctx: &uni_store::runtime::context::QueryContext,
510) {
511    match vid {
512        Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
513        None => append_null_node_struct(struct_builder),
514    }
515}
516
517/// Re-encode a `LargeListArray` of CypherValue elements into a `LargeBinaryArray` of CypherValue arrays.
518///
519/// Each row in the input `LargeListArray` contains zero or more `LargeBinary`
520/// elements that are individually CypherValue-encoded values. This function decodes
521/// each element, wraps them into a `serde_json::Value::Array`, and re-encodes
522/// the whole array as a single CypherValue blob in the output `LargeBinaryArray`.
523///
524/// Null rows in the input produce null entries in the output.
525///
526/// # Errors
527///
528/// Returns a `DataFusionError::Execution` if the input is not a
529/// `LargeListArray` or if CypherValue decoding fails.
530pub fn large_list_of_cv_to_cv_array(
531    list: &datafusion::arrow::array::LargeListArray,
532) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
533    use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
534
535    let values = list.values();
536    let binary_values = values
537        .as_any()
538        .downcast_ref::<LargeBinaryArray>()
539        .ok_or_else(|| {
540            datafusion::error::DataFusionError::Execution(
541                "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
542            )
543        })?;
544
545    let mut builder = LargeBinaryBuilder::new();
546
547    for row_idx in 0..list.len() {
548        if list.is_null(row_idx) {
549            builder.append_null();
550            continue;
551        }
552
553        let start = list.offsets()[row_idx] as usize;
554        let end = list.offsets()[row_idx + 1] as usize;
555
556        let mut json_elements = Vec::with_capacity(end - start);
557        for elem_idx in start..end {
558            if binary_values.is_null(elem_idx) {
559                json_elements.push(serde_json::Value::Null);
560            } else {
561                let blob = binary_values.value(elem_idx);
562                match uni_common::cypher_value_codec::decode(blob) {
563                    Ok(uni_val) => {
564                        let json_val: serde_json::Value = uni_val.into();
565                        json_elements.push(json_val);
566                    }
567                    Err(_) => json_elements.push(serde_json::Value::Null),
568                }
569            }
570        }
571
572        let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
573        let bytes = uni_common::cypher_value_codec::encode(&uni_val);
574        builder.append_value(&bytes);
575    }
576
577    Ok(Arc::new(builder.finish()))
578}
579
580/// Convert a single Arrow array element at `idx` to `serde_json::Value`.
581///
582/// Handles the common scalar types (UInt64, Int64, Float64, Utf8, Boolean, LargeBinary).
583/// Returns `serde_json::Value::Null` for null values or unsupported types.
584fn arrow_element_to_json(
585    col: &dyn datafusion::arrow::array::Array,
586    idx: usize,
587) -> serde_json::Value {
588    use datafusion::arrow::array::{
589        BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
590    };
591
592    if col.is_null(idx) {
593        return serde_json::Value::Null;
594    }
595
596    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
597        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
598    } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
599        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
600    } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
601        serde_json::Number::from_f64(arr.value(idx))
602            .map(serde_json::Value::Number)
603            .unwrap_or(serde_json::Value::Null)
604    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
605        serde_json::Value::String(arr.value(idx).to_string())
606    } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
607        serde_json::Value::Bool(arr.value(idx))
608    } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
609        uni_common::cypher_value_codec::decode(arr.value(idx))
610            .map(|v| v.into())
611            .unwrap_or(serde_json::Value::Null)
612    } else {
613        serde_json::Value::Null
614    }
615}
616
617/// Convert a typed `LargeListArray` to a `LargeBinaryArray` of CypherValue arrays.
618///
619/// Each row in the input `LargeListArray` contains zero or more elements of a
620/// specific type (Int64, Float64, Utf8, Boolean, or nested LargeBinary). This
621/// function converts each row into a JSON array and encodes it as a CypherValue blob.
622///
623/// If the inner type is already `LargeBinary` (CypherValue), delegates to
624/// `large_list_of_cv_to_cv_array()`.
625///
626/// Null rows in the input produce null entries in the output.
627///
628/// # Errors
629///
630/// Returns a `DataFusionError::Execution` if CypherValue encoding fails.
631pub fn typed_large_list_to_cv_array(
632    list: &datafusion::arrow::array::LargeListArray,
633) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
634    use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
635
636    let values = list.values();
637
638    // If inner type is LargeBinary, delegate to existing function
639    if values.data_type() == &DataType::LargeBinary {
640        return large_list_of_cv_to_cv_array(list);
641    }
642
643    // Build the element-to-JSON converter closure. For Struct arrays, we need
644    // to iterate over fields; for scalar arrays, use arrow_element_to_json directly.
645    let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
646        DataType::UInt64
647        | DataType::Int64
648        | DataType::Float64
649        | DataType::Utf8
650        | DataType::Boolean => {
651            let values = values.clone();
652            Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
653        }
654        DataType::Struct(_) => {
655            let typed = values
656                .as_any()
657                .downcast_ref::<StructArray>()
658                .ok_or_else(|| {
659                    datafusion::error::DataFusionError::Execution(
660                        "Expected StructArray".to_string(),
661                    )
662                })?;
663            let fields: Vec<_> = typed.fields().iter().cloned().collect();
664            let columns: Vec<_> = (0..typed.num_columns())
665                .map(|i| typed.column(i).clone())
666                .collect();
667            let nulls = typed.nulls().cloned();
668            Box::new(move |idx| {
669                if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
670                    return serde_json::Value::Null;
671                }
672                let mut map = serde_json::Map::new();
673                for (field_idx, field) in fields.iter().enumerate() {
674                    let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
675                    map.insert(field.name().clone(), value);
676                }
677                serde_json::Value::Object(map)
678            })
679        }
680        other => {
681            return Err(datafusion::error::DataFusionError::Execution(format!(
682                "Unsupported element type for typed_large_list_to_cv_array: {:?}",
683                other
684            )));
685        }
686    };
687
688    let mut builder = LargeBinaryBuilder::new();
689
690    for row_idx in 0..list.len() {
691        if list.is_null(row_idx) {
692            builder.append_null();
693            continue;
694        }
695
696        let start = list.offsets()[row_idx] as usize;
697        let end = list.offsets()[row_idx + 1] as usize;
698        let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
699
700        let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
701        let bytes = uni_common::cypher_value_codec::encode(&uni_val);
702        builder.append_value(&bytes);
703    }
704
705    Ok(Arc::new(builder.finish()))
706}
707
708/// Convert a `LargeBinaryArray` of CypherValue-encoded arrays into a `LargeListArray`.
709///
710/// Each element in the input array is a CypherValue blob encoding a JSON array (e.g. `[1,2,3]`).
711/// Elements are converted to the specified `element_type`. For example, if `element_type`
712/// is `Int64`, CypherValue numbers are parsed as i64 values.
713///
714/// Non-array CypherValue values and nulls produce empty lists.
715pub fn cv_array_to_large_list(
716    array: &dyn datafusion::arrow::array::Array,
717    element_type: &DataType,
718) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
719    use datafusion::arrow::array::LargeBinaryArray;
720    use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
721
722    let binary_arr = array
723        .as_any()
724        .downcast_ref::<LargeBinaryArray>()
725        .ok_or_else(|| {
726            datafusion::error::DataFusionError::Execution(
727                "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
728            )
729        })?;
730
731    // Collect all JSON elements across all rows
732    let num_rows = binary_arr.len();
733    let mut all_elements: Vec<Vec<serde_json::Value>> = Vec::with_capacity(num_rows);
734    let mut nulls = Vec::with_capacity(num_rows);
735
736    for i in 0..num_rows {
737        if binary_arr.is_null(i) {
738            all_elements.push(Vec::new());
739            nulls.push(false);
740            continue;
741        }
742
743        let blob = binary_arr.value(i);
744        let uni_val = match uni_common::cypher_value_codec::decode(blob) {
745            Ok(v) => v,
746            Err(_) => {
747                all_elements.push(Vec::new());
748                nulls.push(false);
749                continue;
750            }
751        };
752        let json_val_decoded: serde_json::Value = uni_val.into();
753
754        match json_val_decoded {
755            serde_json::Value::Array(elements) => {
756                all_elements.push(elements);
757                nulls.push(true);
758            }
759            _ => {
760                all_elements.push(Vec::new());
761                nulls.push(true);
762            }
763        }
764    }
765
766    // Build typed values array and offsets
767    let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
768    offsets.push(0);
769
770    let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
771        DataType::Int64 => {
772            let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
773            for elems in &all_elements {
774                for elem in elems {
775                    if let serde_json::Value::Number(n) = elem {
776                        if let Some(i) = n.as_i64() {
777                            builder.append_value(i);
778                        } else if let Some(f) = n.as_f64() {
779                            builder.append_value(f as i64);
780                        } else {
781                            builder.append_null();
782                        }
783                    } else {
784                        builder.append_null();
785                    }
786                }
787                offsets.push(offsets.last().unwrap() + elems.len() as i64);
788            }
789            Arc::new(builder.finish())
790        }
791        DataType::Float64 => {
792            let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
793            for elems in &all_elements {
794                for elem in elems {
795                    if let serde_json::Value::Number(n) = elem
796                        && let Some(f) = n.as_f64()
797                    {
798                        builder.append_value(f);
799                    } else {
800                        builder.append_null();
801                    }
802                }
803                offsets.push(offsets.last().unwrap() + elems.len() as i64);
804            }
805            Arc::new(builder.finish())
806        }
807        DataType::Utf8 | DataType::LargeUtf8 => {
808            let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
809            for elems in &all_elements {
810                for elem in elems {
811                    match elem {
812                        serde_json::Value::String(s) => builder.append_value(s),
813                        serde_json::Value::Null => builder.append_null(),
814                        other => builder.append_value(other.to_string()),
815                    }
816                }
817                offsets.push(offsets.last().unwrap() + elems.len() as i64);
818            }
819            Arc::new(builder.finish())
820        }
821        DataType::Boolean => {
822            let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
823            for elems in &all_elements {
824                for elem in elems {
825                    if let serde_json::Value::Bool(b) = elem {
826                        builder.append_value(*b);
827                    } else {
828                        builder.append_null();
829                    }
830                }
831                offsets.push(offsets.last().unwrap() + elems.len() as i64);
832            }
833            Arc::new(builder.finish())
834        }
835        // Fallback: keep as CypherValue LargeBinary blobs
836        _ => {
837            let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
838            for elems in &all_elements {
839                for elem in elems {
840                    let uni_val: uni_common::Value = elem.clone().into();
841                    let bytes = uni_common::cypher_value_codec::encode(&uni_val);
842                    builder.append_value(&bytes);
843                }
844                offsets.push(offsets.last().unwrap() + elems.len() as i64);
845            }
846            Arc::new(builder.finish())
847        }
848    };
849
850    let field = Arc::new(Field::new("item", element_type.clone(), true));
851    let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
852    let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
853
854    let large_list = datafusion::arrow::array::LargeListArray::new(
855        field,
856        offset_buffer,
857        values_array,
858        Some(null_buffer),
859    );
860
861    Ok(Arc::new(large_list))
862}
863
864/// Collect all record batches from all partitions of an execution plan.
865///
866/// Iterates over each partition, executes it, and collects all resulting
867/// batches into a single `Vec`. Shared by `execute_subplan` and `run_apply`.
868pub async fn collect_all_partitions(
869    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
870    task_ctx: Arc<datafusion::execution::TaskContext>,
871) -> DFResult<Vec<RecordBatch>> {
872    let partition_count = plan.properties().output_partitioning().partition_count();
873
874    let mut all_batches = Vec::new();
875    for partition in 0..partition_count {
876        let stream = plan.execute(partition, task_ctx.clone())?;
877        let batches: Vec<RecordBatch> = stream.try_collect().await?;
878        all_batches.extend(batches);
879    }
880    Ok(all_batches)
881}
882
883/// Execute a logical plan using a fresh HybridPhysicalPlanner with the given params.
884///
885/// Shared by `RecursiveCTEExec`, `GraphApplyExec`, and `ExistsExecExpr`.
886pub async fn execute_subplan(
887    plan: &LogicalPlan,
888    params: &HashMap<String, Value>,
889    outer_values: &HashMap<String, Value>,
890    graph_ctx: &Arc<GraphExecutionContext>,
891    session_ctx: &Arc<RwLock<SessionContext>>,
892    storage: &Arc<StorageManager>,
893    schema_info: &Arc<UniSchema>,
894) -> DFResult<Vec<RecordBatch>> {
895    let planner_construction_start = std::time::Instant::now();
896    let l0_context = graph_ctx.l0_context().clone();
897    let prop_manager = graph_ctx.property_manager().clone();
898
899    let planner = HybridPhysicalPlanner::with_l0_context(
900        session_ctx.clone(),
901        storage.clone(),
902        l0_context,
903        prop_manager,
904        schema_info.clone(),
905        params.clone(),
906        outer_values.clone(),
907    );
908    let planner_construction_elapsed = planner_construction_start.elapsed();
909    tracing::debug!(
910        "execute_subplan: planner construction took {:?}",
911        planner_construction_elapsed
912    );
913
914    let planning_start = std::time::Instant::now();
915    let execution_plan = planner.plan(plan).map_err(|e| {
916        datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {}", e))
917    })?;
918    let planning_elapsed = planning_start.elapsed();
919    tracing::debug!("execute_subplan: planning took {:?}", planning_elapsed);
920
921    let execution_start = std::time::Instant::now();
922    let task_ctx = session_ctx.read().task_ctx();
923    let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
924    let execution_elapsed = execution_start.elapsed();
925    tracing::debug!("execute_subplan: execution took {:?}", execution_elapsed);
926
927    Ok(all_batches)
928}
929
930/// Extract a single row from a RecordBatch as a HashMap of column name → Value.
931///
932/// Used to build parameters for correlated subqueries (Apply, EXISTS).
933pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
934    let schema = batch.schema();
935    let mut row = HashMap::new();
936    for col_idx in 0..batch.num_columns() {
937        let col_name = schema.field(col_idx).name().clone();
938        let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
939        row.insert(col_name, val);
940    }
941    row
942}
943
944/// Infer the output schema of a ProcedureCall logical plan node.
945///
946/// This is a simplified version of `GraphProcedureCallExec::build_schema()` that
947/// doesn't require target_properties or graph_ctx. It covers common procedure types
948/// with basic scalar type inference. For unknown procedures or complex node expansions,
949/// it falls back to Utf8.
950fn infer_procedure_call_schema(
951    procedure_name: &str,
952    yield_items: &[(String, Option<String>)],
953    _schema_info: &UniSchema,
954) -> SchemaRef {
955    let infer_type = |name: &str| -> DataType {
956        match procedure_name {
957            "uni.schema.labels" => match name {
958                "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
959                _ => DataType::Utf8,
960            },
961            "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
962                "propertyCount" => DataType::Int64,
963                _ => DataType::Utf8,
964            },
965            "uni.schema.constraints" => match name {
966                "enabled" => DataType::Boolean,
967                _ => DataType::Utf8,
968            },
969            "uni.schema.labelInfo" => match name {
970                "nullable" | "indexed" | "unique" => DataType::Boolean,
971                _ => DataType::Utf8,
972            },
973            "uni.vector.query" | "uni.fts.query" | "uni.search" => {
974                // Search procedures: infer types via canonical yield mapping.
975                // Node expansion happens at execution time in GraphProcedureCallExec.
976                match map_yield_to_canonical(name).as_str() {
977                    "distance" => DataType::Float64,
978                    "score" | "vector_score" | "fts_score" | "raw_score" => DataType::Float32,
979                    "vid" => DataType::Int64,
980                    _ => DataType::Utf8,
981                }
982            }
983            // uni.schema.indexes, unknown procedures, and fallback: all Utf8
984            _ => DataType::Utf8,
985        }
986    };
987
988    let fields: Vec<Field> = yield_items
989        .iter()
990        .map(|(name, alias)| {
991            let col_name = alias.as_ref().unwrap_or(name);
992            Field::new(col_name, infer_type(name), true)
993        })
994        .collect();
995
996    Arc::new(Schema::new(fields))
997}
998
999/// Infer the output schema of a logical plan using UniSchema property metadata.
1000///
1001/// This is needed because correlated subqueries reference outer variables that
1002/// don't exist as physical columns at planning time, so we can't dry-run plan
1003/// the subquery to get its schema. Instead we walk the logical plan and use
1004/// `UniSchema` property metadata to infer types.
1005pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1006    // Walk to outermost Project
1007    if let LogicalPlan::Project { projections, .. } = plan {
1008        let fields: Vec<Field> = projections
1009            .iter()
1010            .map(|(expr, alias)| {
1011                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1012                let dt = infer_expr_type(expr, schema_info);
1013                Field::new(name, dt, true)
1014            })
1015            .collect();
1016        return Arc::new(Schema::new(fields));
1017    }
1018
1019    // For non-Project plans, walk through wrapping nodes
1020    match plan {
1021        LogicalPlan::Sort { input, .. }
1022        | LogicalPlan::Limit { input, .. }
1023        | LogicalPlan::Filter { input, .. }
1024        | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1025
1026        LogicalPlan::ProcedureCall {
1027            procedure_name,
1028            yield_items,
1029            ..
1030        } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1031
1032        _ => {
1033            // Fallback: empty schema
1034            Arc::new(Schema::empty())
1035        }
1036    }
1037}
1038
1039/// Infer Arrow DataType for a Cypher expression using schema metadata.
1040fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1041    match expr {
1042        Expr::Property(base, key) => {
1043            if let Expr::Variable(_) = base.as_ref() {
1044                // Look up key across all labels/edge types in schema
1045                for props in schema_info.properties.values() {
1046                    if let Some(meta) = props.get(key.as_str()) {
1047                        return meta.r#type.to_arrow();
1048                    }
1049                }
1050                DataType::LargeBinary
1051            } else {
1052                DataType::LargeBinary
1053            }
1054        }
1055        Expr::BinaryOp { left, op, right } => match op {
1056            BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1057                let lt = infer_expr_type(left, schema_info);
1058                let rt = infer_expr_type(right, schema_info);
1059                numeric_promotion(&lt, &rt)
1060            }
1061            BinaryOp::Eq
1062            | BinaryOp::NotEq
1063            | BinaryOp::Lt
1064            | BinaryOp::LtEq
1065            | BinaryOp::Gt
1066            | BinaryOp::GtEq
1067            | BinaryOp::And
1068            | BinaryOp::Or => DataType::Boolean,
1069            _ => DataType::LargeBinary,
1070        },
1071        Expr::Literal(lit) => match lit {
1072            CypherLiteral::Integer(_) => DataType::Int64,
1073            CypherLiteral::Float(_) => DataType::Float64,
1074            CypherLiteral::String(_) => DataType::Utf8,
1075            CypherLiteral::Bool(_) => DataType::Boolean,
1076            CypherLiteral::Null => DataType::Null,
1077            CypherLiteral::Bytes(_) => DataType::LargeBinary,
1078        },
1079        Expr::Variable(_) => DataType::LargeBinary,
1080        Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1081            "count" => DataType::Int64,
1082            "sum" | "avg" => {
1083                if let Some(arg) = args.first() {
1084                    let arg_type = infer_expr_type(arg, schema_info);
1085                    if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1086                        DataType::Float64
1087                    } else {
1088                        DataType::Int64
1089                    }
1090                } else {
1091                    DataType::Int64
1092                }
1093            }
1094            "min" | "max" => {
1095                if let Some(arg) = args.first() {
1096                    infer_expr_type(arg, schema_info)
1097                } else {
1098                    DataType::LargeBinary
1099                }
1100            }
1101            "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1102            | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1103            "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1104            "tofloat" => DataType::Float64,
1105            "toboolean" => DataType::Boolean,
1106            _ => DataType::LargeBinary,
1107        },
1108        _ => DataType::LargeBinary,
1109    }
1110}
1111
1112/// Numeric type promotion for binary arithmetic.
1113fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1114    match (left, right) {
1115        (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1116        (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1117        (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1118        (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1119        _ => DataType::Int64,
1120    }
1121}
1122
1123/// Evaluate a simple expression to get a `uni_common::Value`.
1124///
1125/// Supports:
1126/// - Literal values
1127/// - Parameter references ($param)
1128/// - Literal lists
1129pub(crate) fn evaluate_simple_expr(
1130    expr: &Expr,
1131    params: &HashMap<String, Value>,
1132) -> DFResult<Value> {
1133    match expr {
1134        Expr::Literal(lit) => Ok(lit.to_value()),
1135
1136        Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1137            datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1138        }),
1139
1140        Expr::List(items) => {
1141            let values: Vec<Value> = items
1142                .iter()
1143                .map(|item| evaluate_simple_expr(item, params))
1144                .collect::<DFResult<_>>()?;
1145            Ok(Value::List(values))
1146        }
1147
1148        _ => Err(datafusion::error::DataFusionError::Execution(format!(
1149            "Unsupported expression type for procedure argument: {:?}",
1150            expr
1151        ))),
1152    }
1153}
1154
1155/// Merge edge property metadata across multiple edge types.
1156///
1157/// When a traversal spans several edge types, property columns must accommodate
1158/// all of them. This function collects property metadata from each type and
1159/// resolves conflicts: if two types define the same property with different
1160/// data types, the merged type widens to `CypherValue`. Nullability is merged
1161/// with OR (if either is nullable, the result is nullable).
1162pub fn merged_edge_schema_props(
1163    uni_schema: &UniSchema,
1164    edge_type_ids: &[u32],
1165) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1166    let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1167    let mut sorted_ids = edge_type_ids.to_vec();
1168    sorted_ids.sort_unstable();
1169
1170    for edge_type_id in sorted_ids {
1171        if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1172            && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1173        {
1174            for (prop_name, meta) in props {
1175                match merged.get_mut(prop_name) {
1176                    Some(existing) => {
1177                        if existing.r#type != meta.r#type {
1178                            existing.r#type = uni_common::core::schema::DataType::CypherValue;
1179                        }
1180                        existing.nullable |= meta.nullable;
1181                    }
1182                    None => {
1183                        merged.insert(prop_name.clone(), meta.clone());
1184                    }
1185                }
1186            }
1187        }
1188    }
1189
1190    merged
1191}
1192
1193// ---------------------------------------------------------------------------
1194// Shared key extraction for Locy operators (Priority, Fold, BestBy, Fixpoint)
1195// ---------------------------------------------------------------------------
1196
1197/// A hashable scalar key extracted from an Arrow array row.
1198///
1199/// Used across Locy operators for grouping and deduplication.
1200#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1201pub(crate) enum ScalarKey {
1202    Null,
1203    Bool(bool),
1204    Int64(i64),
1205    Utf8(String),
1206    Binary(Vec<u8>),
1207}
1208
1209/// Extract a composite key from a row of a `RecordBatch`.
1210///
1211/// For each column index in `key_indices`, reads the scalar value at `row_idx`
1212/// and converts it to a `ScalarKey`. Float64 values are hashed by their bit
1213/// representation for exact grouping.
1214pub(crate) fn extract_scalar_key(
1215    batch: &RecordBatch,
1216    key_indices: &[usize],
1217    row_idx: usize,
1218) -> Vec<ScalarKey> {
1219    use arrow::array::Array;
1220    key_indices
1221        .iter()
1222        .map(|&col_idx| {
1223            let col = batch.column(col_idx);
1224            if col.is_null(row_idx) {
1225                return ScalarKey::Null;
1226            }
1227            match col.data_type() {
1228                arrow_schema::DataType::Boolean => {
1229                    let arr = col
1230                        .as_any()
1231                        .downcast_ref::<arrow_array::BooleanArray>()
1232                        .unwrap();
1233                    ScalarKey::Bool(arr.value(row_idx))
1234                }
1235                arrow_schema::DataType::Int64 => {
1236                    let arr = col
1237                        .as_any()
1238                        .downcast_ref::<arrow_array::Int64Array>()
1239                        .unwrap();
1240                    ScalarKey::Int64(arr.value(row_idx))
1241                }
1242                arrow_schema::DataType::Utf8 => {
1243                    let arr = col
1244                        .as_any()
1245                        .downcast_ref::<arrow_array::StringArray>()
1246                        .unwrap();
1247                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1248                }
1249                arrow_schema::DataType::LargeBinary => {
1250                    let arr = col
1251                        .as_any()
1252                        .downcast_ref::<arrow_array::LargeBinaryArray>()
1253                        .unwrap();
1254                    ScalarKey::Binary(arr.value(row_idx).to_vec())
1255                }
1256                arrow_schema::DataType::Float64 => {
1257                    // Hash f64 as bits for grouping
1258                    let arr = col
1259                        .as_any()
1260                        .downcast_ref::<arrow_array::Float64Array>()
1261                        .unwrap();
1262                    ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1263                }
1264                arrow_schema::DataType::LargeUtf8 => {
1265                    let arr = col
1266                        .as_any()
1267                        .downcast_ref::<arrow_array::LargeStringArray>()
1268                        .unwrap();
1269                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1270                }
1271                arrow_schema::DataType::Struct(_) => {
1272                    // Serialize struct to string via arrow display for hashing
1273                    let formatter = arrow::util::display::ArrayFormatter::try_new(
1274                        col.as_ref(),
1275                        &arrow::util::display::FormatOptions::default(),
1276                    );
1277                    match formatter {
1278                        Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1279                        Err(_) => ScalarKey::Utf8(format!("struct@{row_idx}")),
1280                    }
1281                }
1282                _ => {
1283                    // Fallback: use arrow display formatter
1284                    let formatter = arrow::util::display::ArrayFormatter::try_new(
1285                        col.as_ref(),
1286                        &arrow::util::display::FormatOptions::default(),
1287                    );
1288                    match formatter {
1289                        Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1290                        Err(_) => ScalarKey::Utf8(format!("unknown@{row_idx}")),
1291                    }
1292                }
1293            }
1294        })
1295        .collect()
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use arrow_array::{LargeBinaryArray, UInt64Array};
1302    use arrow_schema::Schema;
1303
1304    #[test]
1305    fn test_extract_row_params_loses_uint64_to_int() {
1306        let schema = Arc::new(Schema::new(vec![Field::new(
1307            "n._vid",
1308            DataType::UInt64,
1309            true,
1310        )]));
1311        let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1312            .expect("batch should be valid");
1313
1314        let params = extract_row_params(&batch, 0);
1315        assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1316    }
1317
1318    #[test]
1319    fn test_extract_row_params_decodes_largebinary_to_map() {
1320        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1321        let schema = Arc::new(Schema::new(vec![Field::new(
1322            "m._all_props",
1323            DataType::LargeBinary,
1324            true,
1325        )]));
1326        let batch = RecordBatch::try_new(
1327            schema,
1328            vec![Arc::new(LargeBinaryArray::from(vec![Some(
1329                encoded.as_slice(),
1330            )]))],
1331        )
1332        .expect("batch should be valid");
1333
1334        let params = extract_row_params(&batch, 0);
1335        assert_eq!(
1336            params.get("m._all_props"),
1337            Some(&Value::Map(HashMap::new()))
1338        );
1339    }
1340}