Skip to main content

uni_query/query/df_graph/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Common helpers shared across graph execution plan implementations.
5//!
6//! This module provides shared utilities to reduce code duplication across
7//! the df_graph module's execution plan implementations.
8
9use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_graph::MutationContext;
29use crate::query::df_planner::HybridPhysicalPlanner;
30use crate::query::planner::LogicalPlan;
31
32/// Convert an `ArrowError` into a `DataFusionError`.
33///
34/// Wraps the Arrow-level error into DataFusion's `ArrowError` variant. Use this
35/// with `.map_err(arrow_err)` when calling raw Arrow compute kernels from
36/// DataFusion execution plans.
37pub fn arrow_err(e: arrow::error::ArrowError) -> datafusion::error::DataFusionError {
38    datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
39}
40
41/// Compute standard plan properties for graph operators.
42///
43/// All graph operators use the same plan properties:
44/// - Unknown partitioning with 1 partition
45/// - Incremental emission type
46/// - Bounded execution
47pub fn compute_plan_properties(schema: SchemaRef) -> Arc<PlanProperties> {
48    Arc::new(PlanProperties::new(
49        EquivalenceProperties::new(schema),
50        Partitioning::UnknownPartitioning(1),
51        datafusion::physical_plan::execution_plan::EmissionType::Incremental,
52        datafusion::physical_plan::execution_plan::Boundedness::Bounded,
53    ))
54}
55
56/// Return the Arrow `DataType` for `_labels` columns: `List<Utf8>`.
57///
58/// This is used across scan, traverse, bind, and other modules whenever a
59/// `_labels` field needs to be declared in a schema. Centralizing the
60/// definition avoids divergence and reduces boilerplate.
61pub fn labels_data_type() -> DataType {
62    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
63}
64
65/// Extract a `UInt64Array` of vertex/edge IDs from an Arrow column.
66///
67/// Accepts both `UInt64` (native VID type) and `Int64` (from parameter
68/// injection where `arrow_to_json_value` round-trips through `Value::Int`).
69/// For `Int64` columns the values are cast to `UInt64`.
70///
71/// # Errors
72///
73/// Returns a `DataFusionError::Execution` if the column is neither `UInt64`
74/// nor `Int64`.
75pub fn column_as_vid_array(
76    col: &dyn arrow_array::Array,
77) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
78    use arrow_array::{Int64Array, StructArray, UInt64Array};
79    use arrow_schema::DataType;
80
81    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
82        return Ok(std::borrow::Cow::Borrowed(arr));
83    }
84
85    if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
86        let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
87        return Ok(std::borrow::Cow::Owned(cast));
88    }
89
90    // Support entity-struct aliases (e.g., WITH coalesce(b, c) AS x) where
91    // traversal inputs may provide the source as a Struct with an "_vid" field.
92    if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
93        && let DataType::Struct(fields) = arr.data_type()
94        && let Some((vid_idx, _)) = fields.find("_vid")
95    {
96        return column_as_vid_array(arr.column(vid_idx).as_ref());
97    }
98
99    // Support CypherValue-encoded Node values in LargeBinary columns
100    // (e.g., from list comprehension loop variables over node collections)
101    // Also handles JSON round-tripped nodes (Value::Map with _id field)
102    if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
103        let vids = vids_from_large_binary(arr);
104        return Ok(std::borrow::Cow::Owned(vids));
105    }
106
107    // OPTIONAL MATCH can produce all-null columns with Arrow Null type
108    if *col.data_type() == DataType::Null {
109        let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
110        return Ok(std::borrow::Cow::Owned(vids));
111    }
112
113    Err(datafusion::error::DataFusionError::Execution(format!(
114        "VID column has type {:?}, expected UInt64 or Int64",
115        col.data_type()
116    )))
117}
118
119/// Extract a VID from a CypherValue.
120///
121/// Handles both `Value::Node` (native node) and `Value::Map` with `_id` field
122/// (JSON round-tripped node from `cv_array_to_large_list`).
123fn extract_vid_from_value(val: &Value) -> Option<u64> {
124    match val {
125        Value::Node(node) => Some(node.vid.as_u64()),
126        Value::Map(map) => {
127            // Handle round-tripped nodes that became Maps.
128            // Path nodes use struct fields (_vid, _label, properties) which
129            // round-trip through arrow_to_json_value as { "_vid": Int(N), ... }.
130            // Value::Node → serde_json uses { "_id": "N", ... }.
131            // Check both keys to handle either path.
132
133            // Check _vid first (from path struct → arrow_to_json_value round-trip)
134            if let Some(Value::Int(vid)) = map.get("_vid") {
135                return Some(*vid as u64);
136            }
137            // Also check _id (from Value::Node → serde_json round-trip)
138            if let Some(Value::String(id_str)) = map.get("_id") {
139                return id_str
140                    .strip_prefix("Vid(")
141                    .and_then(|s| s.strip_suffix(')'))
142                    .unwrap_or(id_str)
143                    .parse::<u64>()
144                    .ok();
145            }
146            if let Some(Value::Int(id)) = map.get("_id") {
147                return Some(*id as u64);
148            }
149            None
150        }
151        _ => None,
152    }
153}
154
155/// Extract VIDs from a `LargeBinaryArray` of CypherValue-encoded values.
156///
157/// Decodes each element and delegates to [`extract_vid_from_value`].
158/// Null elements and decode failures produce null VID entries.
159fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
160    use uni_common::cypher_value_codec;
161
162    (0..arr.len())
163        .map(|i| {
164            if arr.is_null(i) {
165                return None;
166            }
167            cypher_value_codec::decode(arr.value(i))
168                .ok()
169                .as_ref()
170                .and_then(extract_vid_from_value)
171        })
172        .collect()
173}
174
175/// Extract VIDs from a column of CypherValue-encoded Node values.
176///
177/// Takes a `LargeBinary` array where each element is a CypherValue-encoded
178/// value and extracts VIDs from Node values. Non-Node values produce nulls.
179/// Also handles JSON round-tripped node Maps from `cv_array_to_large_list`.
180pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
181    let binary_col = col
182        .as_any()
183        .downcast_ref::<arrow_array::LargeBinaryArray>()
184        .ok_or_else(|| {
185            datafusion::error::DataFusionError::Execution(
186                "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
187            )
188        })?;
189    Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
190}
191
192/// Extract a typed value from a column at a given row index.
193///
194/// Looks up `col_name` in the batch schema, downcasts to `T`, and applies
195/// `extract_fn` if the value is valid. Returns `None` if the column is missing,
196/// the downcast fails, or the value is null.
197pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
198    batch: &RecordBatch,
199    col_name: &str,
200    row_idx: usize,
201    extract_fn: impl FnOnce(&T, usize) -> R,
202) -> Option<R> {
203    let (idx, _) = batch.schema().column_with_name(col_name)?;
204    let col = batch.column(idx);
205    let arr = col.as_any().downcast_ref::<T>()?;
206    if arr.is_valid(row_idx) {
207        Some(extract_fn(arr, row_idx))
208    } else {
209        None
210    }
211}
212
213/// Build the standard node struct fields for path structures.
214///
215/// Used when materializing path objects containing nodes.
216/// Fields: `_vid`, `_labels`, `properties`
217pub fn node_struct_fields() -> arrow_schema::Fields {
218    arrow_schema::Fields::from(vec![
219        Field::new("_vid", DataType::UInt64, false),
220        Field::new("_labels", labels_data_type(), true),
221        Field::new("properties", DataType::LargeBinary, true),
222    ])
223}
224
225/// Build the standard edge struct fields for path structures.
226///
227/// Used when materializing path objects containing edges.
228/// Fields: `_eid`, `_type_name`, `_src`, `_dst`, `properties`
229pub fn edge_struct_fields() -> arrow_schema::Fields {
230    arrow_schema::Fields::from(vec![
231        Field::new("_eid", DataType::UInt64, false),
232        Field::new("_type_name", DataType::Utf8, false),
233        Field::new("_src", DataType::UInt64, false),
234        Field::new("_dst", DataType::UInt64, false),
235        Field::new("properties", DataType::LargeBinary, true),
236    ])
237}
238
239/// Encode a properties HashMap to CypherValue bytes for LargeBinary columns.
240///
241/// Used when materializing path properties that need to be stored in LargeBinary
242/// columns. Converts the HashMap into a `Value::Map` and encodes it using the
243/// CypherValue codec.
244pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
245    let val = uni_common::Value::Map(props.clone());
246    uni_common::cypher_value_codec::encode(&val)
247}
248
249/// Build edge list field for schema with given step variable name.
250///
251/// Creates a list of edge structs for the relationship variable in VLP patterns.
252/// For example, `r` in `MATCH (a)-[r*1..3]->(b)` gets a `List<EdgeStruct>`.
253pub fn build_edge_list_field(step_var: &str) -> Field {
254    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
255    // Field must be nullable to support OPTIONAL MATCH unmatched (r = NULL)
256    Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
257}
258
259/// Build path struct field for schema with given path variable name.
260///
261/// Creates a struct field with `nodes` and `relationships` lists.
262pub fn build_path_struct_field(path_var: &str) -> Field {
263    let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
264    let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
265
266    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
267    let relationships_field =
268        Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
269
270    Field::new(
271        path_var,
272        DataType::Struct(arrow_schema::Fields::from(vec![
273            nodes_field,
274            relationships_field,
275        ])),
276        true,
277    )
278}
279
280/// Extend an input schema with a path struct field.
281///
282/// Clones the fields from `input_schema` and appends a path struct field
283/// using [`build_path_struct_field`].
284pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
285    let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
286    fields.push(Arc::new(build_path_struct_field(path_variable)));
287    Arc::new(Schema::new(fields))
288}
289
290/// Build a path struct array from nodes and relationships list arrays.
291///
292/// Combines the nodes and relationships arrays into a single `StructArray` with
293/// the standard path structure (`nodes`, `relationships`), applying the given
294/// validity mask.
295pub fn build_path_struct_array(
296    nodes_array: ArrayRef,
297    rels_array: ArrayRef,
298    path_validity: Vec<bool>,
299) -> DFResult<arrow_array::StructArray> {
300    Ok(arrow_array::StructArray::try_new(
301        arrow_schema::Fields::from(vec![
302            Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
303            Arc::new(Field::new(
304                "relationships",
305                rels_array.data_type().clone(),
306                true,
307            )),
308        ]),
309        vec![nodes_array, rels_array],
310        Some(arrow::buffer::NullBuffer::from(path_validity)),
311    )?)
312}
313
314/// Create a `ListBuilder<StructBuilder>` for building edge list arrays.
315///
316/// Used when materializing edge lists for step variables (`r` in `[r*1..3]`)
317/// and path relationship arrays. Returns a builder whose struct fields match
318/// `edge_struct_fields()`.
319pub fn new_edge_list_builder()
320-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
321    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
322    arrow_array::builder::ListBuilder::new(StructBuilder::new(
323        edge_struct_fields(),
324        vec![
325            Box::new(UInt64Builder::new()),
326            Box::new(StringBuilder::new()),
327            Box::new(UInt64Builder::new()),
328            Box::new(UInt64Builder::new()),
329            Box::new(LargeBinaryBuilder::new()),
330        ],
331    ))
332}
333
334/// Create a `ListBuilder<StructBuilder>` for building node list arrays.
335///
336/// Used when materializing path node arrays. Returns a builder whose struct
337/// fields match `node_struct_fields()`.
338pub fn new_node_list_builder()
339-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
340    use arrow_array::builder::{
341        LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
342    };
343    arrow_array::builder::ListBuilder::new(StructBuilder::new(
344        node_struct_fields(),
345        vec![
346            Box::new(UInt64Builder::new()),
347            Box::new(ListBuilder::new(StringBuilder::new())),
348            Box::new(LargeBinaryBuilder::new()),
349        ],
350    ))
351}
352
353/// Append a single edge to an edge struct builder.
354///
355/// Writes `_eid`, `_type_name`, `_src`, `_dst`, and `properties` fields,
356/// then appends the struct row. The `query_ctx` is used to look up edge
357/// properties from the L0 visibility chain.
358pub fn append_edge_to_struct(
359    struct_builder: &mut arrow_array::builder::StructBuilder,
360    eid: uni_common::core::id::Eid,
361    type_name: &str,
362    src_vid: u64,
363    dst_vid: u64,
364    query_ctx: &uni_store::runtime::context::QueryContext,
365) {
366    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
367    use uni_store::runtime::l0_visibility;
368
369    struct_builder
370        .field_builder::<UInt64Builder>(0)
371        .unwrap()
372        .append_value(eid.as_u64());
373    struct_builder
374        .field_builder::<StringBuilder>(1)
375        .unwrap()
376        .append_value(type_name);
377    struct_builder
378        .field_builder::<UInt64Builder>(2)
379        .unwrap()
380        .append_value(src_vid);
381    struct_builder
382        .field_builder::<UInt64Builder>(3)
383        .unwrap()
384        .append_value(dst_vid);
385    let props_builder = struct_builder
386        .field_builder::<LargeBinaryBuilder>(4)
387        .unwrap();
388    if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
389        let cv_bytes = encode_props_to_cv(&props);
390        props_builder.append_value(&cv_bytes);
391    } else {
392        props_builder.append_null();
393    }
394    struct_builder.append(true);
395}
396
397/// Append a null edge struct row (placeholder values + null validity).
398///
399/// Arrow struct builders require all field builders to advance even for null rows.
400/// This appends default placeholder values and marks the struct row as null.
401fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
402    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
403
404    struct_builder
405        .field_builder::<UInt64Builder>(0)
406        .unwrap()
407        .append_value(0);
408    struct_builder
409        .field_builder::<StringBuilder>(1)
410        .unwrap()
411        .append_value("");
412    struct_builder
413        .field_builder::<UInt64Builder>(2)
414        .unwrap()
415        .append_value(0);
416    struct_builder
417        .field_builder::<UInt64Builder>(3)
418        .unwrap()
419        .append_value(0);
420    struct_builder
421        .field_builder::<LargeBinaryBuilder>(4)
422        .unwrap()
423        .append_null();
424    struct_builder.append(false);
425}
426
427/// Append an edge to a struct builder, handling the `Option<Eid>` case.
428///
429/// When `eid` is `Some`, resolves the type name from `batch_type_name` (primary)
430/// or L0 visibility (fallback), then delegates to [`append_edge_to_struct`].
431/// When `eid` is `None`, appends a null struct row.
432pub fn append_edge_to_struct_optional(
433    struct_builder: &mut arrow_array::builder::StructBuilder,
434    eid: Option<uni_common::core::id::Eid>,
435    src_vid: u64,
436    dst_vid: u64,
437    batch_type_name: Option<String>,
438    query_ctx: &uni_store::runtime::context::QueryContext,
439) {
440    match eid {
441        Some(e) => {
442            use uni_store::runtime::l0_visibility;
443            let type_name = batch_type_name
444                .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
445                .unwrap_or_default();
446            append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
447        }
448        None => append_null_edge_struct(struct_builder),
449    }
450}
451
452/// Append a single node to a node struct builder.
453///
454/// Writes `_vid`, `_labels`, and `properties` fields, then appends the struct
455/// row. The `query_ctx` is used to look up labels and properties from the L0
456/// visibility chain.
457pub fn append_node_to_struct(
458    struct_builder: &mut arrow_array::builder::StructBuilder,
459    vid: uni_common::core::id::Vid,
460    query_ctx: &uni_store::runtime::context::QueryContext,
461) {
462    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
463    use uni_store::runtime::l0_visibility;
464
465    struct_builder
466        .field_builder::<UInt64Builder>(0)
467        .unwrap()
468        .append_value(vid.as_u64());
469    let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
470    let labels_builder = struct_builder
471        .field_builder::<ListBuilder<StringBuilder>>(1)
472        .unwrap();
473    let values = labels_builder.values();
474    for lbl in &labels {
475        values.append_value(lbl);
476    }
477    labels_builder.append(true);
478    let props_builder = struct_builder
479        .field_builder::<LargeBinaryBuilder>(2)
480        .unwrap();
481    if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
482        let cv_bytes = encode_props_to_cv(&props);
483        props_builder.append_value(&cv_bytes);
484    } else {
485        props_builder.append_null();
486    }
487    struct_builder.append(true);
488}
489
490/// Append a null node struct row (placeholder values + null validity).
491///
492/// Arrow struct builders require all field builders to advance even for null rows.
493/// This appends default placeholder values and marks the struct row as null.
494fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
495    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
496
497    struct_builder
498        .field_builder::<UInt64Builder>(0)
499        .unwrap()
500        .append_value(0);
501    struct_builder
502        .field_builder::<ListBuilder<StringBuilder>>(1)
503        .unwrap()
504        .append(true);
505    struct_builder
506        .field_builder::<LargeBinaryBuilder>(2)
507        .unwrap()
508        .append_null();
509    struct_builder.append(false);
510}
511
512/// Append a node to a struct builder, handling the `Option<Vid>` case.
513///
514/// When `vid` is `Some`, delegates to [`append_node_to_struct`].
515/// When `vid` is `None`, appends a null struct row.
516pub fn append_node_to_struct_optional(
517    struct_builder: &mut arrow_array::builder::StructBuilder,
518    vid: Option<uni_common::core::id::Vid>,
519    query_ctx: &uni_store::runtime::context::QueryContext,
520) {
521    match vid {
522        Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
523        None => append_null_node_struct(struct_builder),
524    }
525}
526
527/// Re-encode a `LargeListArray` of CypherValue elements into a `LargeBinaryArray` of CypherValue arrays.
528///
529/// Each row in the input `LargeListArray` contains zero or more `LargeBinary`
530/// elements that are individually CypherValue-encoded values. This function decodes
531/// each element, wraps them into a `serde_json::Value::Array`, and re-encodes
532/// the whole array as a single CypherValue blob in the output `LargeBinaryArray`.
533///
534/// Null rows in the input produce null entries in the output.
535///
536/// # Errors
537///
538/// Returns a `DataFusionError::Execution` if the input is not a
539/// `LargeListArray` or if CypherValue decoding fails.
540pub fn large_list_of_cv_to_cv_array(
541    list: &datafusion::arrow::array::LargeListArray,
542) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
543    use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
544
545    let values = list.values();
546    let binary_values = values
547        .as_any()
548        .downcast_ref::<LargeBinaryArray>()
549        .ok_or_else(|| {
550            datafusion::error::DataFusionError::Execution(
551                "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
552            )
553        })?;
554
555    let mut builder = LargeBinaryBuilder::new();
556
557    for row_idx in 0..list.len() {
558        if list.is_null(row_idx) {
559            builder.append_null();
560            continue;
561        }
562
563        let start = list.offsets()[row_idx] as usize;
564        let end = list.offsets()[row_idx + 1] as usize;
565
566        let mut json_elements = Vec::with_capacity(end - start);
567        for elem_idx in start..end {
568            if binary_values.is_null(elem_idx) {
569                json_elements.push(serde_json::Value::Null);
570            } else {
571                let blob = binary_values.value(elem_idx);
572                match uni_common::cypher_value_codec::decode(blob) {
573                    Ok(uni_val) => {
574                        let json_val: serde_json::Value = uni_val.into();
575                        json_elements.push(json_val);
576                    }
577                    Err(_) => json_elements.push(serde_json::Value::Null),
578                }
579            }
580        }
581
582        let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
583        let bytes = uni_common::cypher_value_codec::encode(&uni_val);
584        builder.append_value(&bytes);
585    }
586
587    Ok(Arc::new(builder.finish()))
588}
589
590/// Convert a single Arrow array element at `idx` to `serde_json::Value`.
591///
592/// Handles the common scalar types (UInt64, Int64, Float64, Utf8, Boolean, LargeBinary).
593/// Returns `serde_json::Value::Null` for null values or unsupported types.
594fn arrow_element_to_json(
595    col: &dyn datafusion::arrow::array::Array,
596    idx: usize,
597) -> serde_json::Value {
598    use datafusion::arrow::array::{
599        BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
600    };
601
602    if col.is_null(idx) {
603        return serde_json::Value::Null;
604    }
605
606    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
607        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
608    } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
609        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
610    } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
611        serde_json::Number::from_f64(arr.value(idx))
612            .map(serde_json::Value::Number)
613            .unwrap_or(serde_json::Value::Null)
614    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
615        serde_json::Value::String(arr.value(idx).to_string())
616    } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
617        serde_json::Value::Bool(arr.value(idx))
618    } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
619        uni_common::cypher_value_codec::decode(arr.value(idx))
620            .map(|v| v.into())
621            .unwrap_or(serde_json::Value::Null)
622    } else {
623        serde_json::Value::Null
624    }
625}
626
627/// Convert a typed `LargeListArray` to a `LargeBinaryArray` of CypherValue arrays.
628///
629/// Each row in the input `LargeListArray` contains zero or more elements of a
630/// specific type (Int64, Float64, Utf8, Boolean, or nested LargeBinary). This
631/// function converts each row into a JSON array and encodes it as a CypherValue blob.
632///
633/// If the inner type is already `LargeBinary` (CypherValue), delegates to
634/// `large_list_of_cv_to_cv_array()`.
635///
636/// Null rows in the input produce null entries in the output.
637///
638/// # Errors
639///
640/// Returns a `DataFusionError::Execution` if CypherValue encoding fails.
641pub fn typed_large_list_to_cv_array(
642    list: &datafusion::arrow::array::LargeListArray,
643) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
644    use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
645
646    let values = list.values();
647
648    // If inner type is LargeBinary, delegate to existing function
649    if values.data_type() == &DataType::LargeBinary {
650        return large_list_of_cv_to_cv_array(list);
651    }
652
653    // Build the element-to-JSON converter closure. For Struct arrays, we need
654    // to iterate over fields; for scalar arrays, use arrow_element_to_json directly.
655    let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
656        DataType::UInt64
657        | DataType::Int64
658        | DataType::Float64
659        | DataType::Utf8
660        | DataType::Boolean => {
661            let values = values.clone();
662            Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
663        }
664        DataType::Struct(_) => {
665            let typed = values
666                .as_any()
667                .downcast_ref::<StructArray>()
668                .ok_or_else(|| {
669                    datafusion::error::DataFusionError::Execution(
670                        "Expected StructArray".to_string(),
671                    )
672                })?;
673            let fields: Vec<_> = typed.fields().iter().cloned().collect();
674            let columns: Vec<_> = (0..typed.num_columns())
675                .map(|i| typed.column(i).clone())
676                .collect();
677            let nulls = typed.nulls().cloned();
678            Box::new(move |idx| {
679                if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
680                    return serde_json::Value::Null;
681                }
682                let mut map = serde_json::Map::new();
683                for (field_idx, field) in fields.iter().enumerate() {
684                    let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
685                    map.insert(field.name().clone(), value);
686                }
687                serde_json::Value::Object(map)
688            })
689        }
690        other => {
691            return Err(datafusion::error::DataFusionError::Execution(format!(
692                "Unsupported element type for typed_large_list_to_cv_array: {:?}",
693                other
694            )));
695        }
696    };
697
698    let mut builder = LargeBinaryBuilder::new();
699
700    for row_idx in 0..list.len() {
701        if list.is_null(row_idx) {
702            builder.append_null();
703            continue;
704        }
705
706        let start = list.offsets()[row_idx] as usize;
707        let end = list.offsets()[row_idx + 1] as usize;
708        let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
709
710        let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
711        let bytes = uni_common::cypher_value_codec::encode(&uni_val);
712        builder.append_value(&bytes);
713    }
714
715    Ok(Arc::new(builder.finish()))
716}
717
718/// Convert a `LargeBinaryArray` of CypherValue-encoded arrays into a `LargeListArray`.
719///
720/// Each element in the input array is a CypherValue blob encoding a JSON array (e.g. `[1,2,3]`).
721/// Elements are converted to the specified `element_type`. For example, if `element_type`
722/// is `Int64`, CypherValue numbers are parsed as i64 values.
723///
724/// Non-array CypherValue values and nulls produce empty lists.
725pub fn cv_array_to_large_list(
726    array: &dyn datafusion::arrow::array::Array,
727    element_type: &DataType,
728) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
729    use datafusion::arrow::array::LargeBinaryArray;
730    use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
731
732    let binary_arr = array
733        .as_any()
734        .downcast_ref::<LargeBinaryArray>()
735        .ok_or_else(|| {
736            datafusion::error::DataFusionError::Execution(
737                "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
738            )
739        })?;
740
741    // Collect all JSON elements across all rows
742    let num_rows = binary_arr.len();
743    let mut all_elements: Vec<Vec<serde_json::Value>> = Vec::with_capacity(num_rows);
744    let mut nulls = Vec::with_capacity(num_rows);
745
746    for i in 0..num_rows {
747        if binary_arr.is_null(i) {
748            all_elements.push(Vec::new());
749            nulls.push(false);
750            continue;
751        }
752
753        let blob = binary_arr.value(i);
754        let uni_val = match uni_common::cypher_value_codec::decode(blob) {
755            Ok(v) => v,
756            Err(_) => {
757                all_elements.push(Vec::new());
758                nulls.push(false);
759                continue;
760            }
761        };
762        let json_val_decoded: serde_json::Value = uni_val.into();
763
764        match json_val_decoded {
765            serde_json::Value::Array(elements) => {
766                all_elements.push(elements);
767                nulls.push(true);
768            }
769            _ => {
770                all_elements.push(Vec::new());
771                nulls.push(true);
772            }
773        }
774    }
775
776    // Build typed values array and offsets
777    let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
778    offsets.push(0);
779
780    let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
781        DataType::Int64 => {
782            let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
783            for elems in &all_elements {
784                for elem in elems {
785                    if let serde_json::Value::Number(n) = elem {
786                        if let Some(i) = n.as_i64() {
787                            builder.append_value(i);
788                        } else if let Some(f) = n.as_f64() {
789                            builder.append_value(f as i64);
790                        } else {
791                            builder.append_null();
792                        }
793                    } else {
794                        builder.append_null();
795                    }
796                }
797                offsets.push(offsets.last().unwrap() + elems.len() as i64);
798            }
799            Arc::new(builder.finish())
800        }
801        DataType::Float64 => {
802            let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
803            for elems in &all_elements {
804                for elem in elems {
805                    if let serde_json::Value::Number(n) = elem
806                        && let Some(f) = n.as_f64()
807                    {
808                        builder.append_value(f);
809                    } else {
810                        builder.append_null();
811                    }
812                }
813                offsets.push(offsets.last().unwrap() + elems.len() as i64);
814            }
815            Arc::new(builder.finish())
816        }
817        DataType::Utf8 | DataType::LargeUtf8 => {
818            let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
819            for elems in &all_elements {
820                for elem in elems {
821                    match elem {
822                        serde_json::Value::String(s) => builder.append_value(s),
823                        serde_json::Value::Null => builder.append_null(),
824                        other => builder.append_value(other.to_string()),
825                    }
826                }
827                offsets.push(offsets.last().unwrap() + elems.len() as i64);
828            }
829            Arc::new(builder.finish())
830        }
831        DataType::Boolean => {
832            let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
833            for elems in &all_elements {
834                for elem in elems {
835                    if let serde_json::Value::Bool(b) = elem {
836                        builder.append_value(*b);
837                    } else {
838                        builder.append_null();
839                    }
840                }
841                offsets.push(offsets.last().unwrap() + elems.len() as i64);
842            }
843            Arc::new(builder.finish())
844        }
845        // Fallback: keep as CypherValue LargeBinary blobs
846        _ => {
847            let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
848            for elems in &all_elements {
849                for elem in elems {
850                    let uni_val: uni_common::Value = elem.clone().into();
851                    let bytes = uni_common::cypher_value_codec::encode(&uni_val);
852                    builder.append_value(&bytes);
853                }
854                offsets.push(offsets.last().unwrap() + elems.len() as i64);
855            }
856            Arc::new(builder.finish())
857        }
858    };
859
860    let field = Arc::new(Field::new("item", element_type.clone(), true));
861    let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
862    let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
863
864    let large_list = datafusion::arrow::array::LargeListArray::new(
865        field,
866        offset_buffer,
867        values_array,
868        Some(null_buffer),
869    );
870
871    Ok(Arc::new(large_list))
872}
873
874/// Collect all record batches from all partitions of an execution plan.
875///
876/// Iterates over each partition, executes it, and collects all resulting
877/// batches into a single `Vec`. Shared by `execute_subplan` and `run_apply`.
878pub async fn collect_all_partitions(
879    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
880    task_ctx: Arc<datafusion::execution::TaskContext>,
881) -> DFResult<Vec<RecordBatch>> {
882    let partition_count = plan.properties().output_partitioning().partition_count();
883
884    let mut all_batches = Vec::new();
885    for partition in 0..partition_count {
886        let stream = plan.execute(partition, task_ctx.clone())?;
887        let batches: Vec<RecordBatch> = stream.try_collect().await?;
888        all_batches.extend(batches);
889    }
890    Ok(all_batches)
891}
892
893/// Execute a logical plan using a fresh HybridPhysicalPlanner with the given params.
894///
895/// Shared by `RecursiveCTEExec`, `GraphApplyExec`, and `ExistsExecExpr`.
896/// Pass `mutation_ctx = Some(...)` to enable writes inside the sub-plan
897/// (`CALL { ... SET/CREATE/MERGE/DELETE }`). `None` is appropriate for
898/// read-only sub-plans (Locy rule evaluation, EXISTS predicate).
899#[expect(
900    clippy::too_many_arguments,
901    reason = "Threading mutation_ctx for CALL subquery writes"
902)]
903pub async fn execute_subplan(
904    plan: &LogicalPlan,
905    params: &HashMap<String, Value>,
906    outer_values: &HashMap<String, Value>,
907    graph_ctx: &Arc<GraphExecutionContext>,
908    session_ctx: &Arc<RwLock<SessionContext>>,
909    storage: &Arc<StorageManager>,
910    schema_info: &Arc<UniSchema>,
911    mutation_ctx: Option<&Arc<MutationContext>>,
912) -> DFResult<Vec<RecordBatch>> {
913    execute_subplan_with_outer_vars(
914        plan,
915        params,
916        outer_values,
917        graph_ctx,
918        session_ctx,
919        storage,
920        schema_info,
921        &std::collections::HashSet::new(),
922        mutation_ctx,
923    )
924    .await
925}
926
927#[expect(
928    clippy::too_many_arguments,
929    reason = "Threading outer_entity_vars for nested EXISTS and mutation_ctx for CALL writes"
930)]
931/// Like `execute_subplan`, but with outer entity variable names for nested EXISTS.
932///
933/// The `outer_entity_vars` set is threaded to the `HybridPhysicalPlanner` so its
934/// expression compiler can distinguish fresh pattern bindings from correlated
935/// references in nested pattern-predicate EXISTS.
936pub async fn execute_subplan_with_outer_vars(
937    plan: &LogicalPlan,
938    params: &HashMap<String, Value>,
939    outer_values: &HashMap<String, Value>,
940    graph_ctx: &Arc<GraphExecutionContext>,
941    session_ctx: &Arc<RwLock<SessionContext>>,
942    storage: &Arc<StorageManager>,
943    schema_info: &Arc<UniSchema>,
944    outer_entity_vars: &std::collections::HashSet<String>,
945    mutation_ctx: Option<&Arc<MutationContext>>,
946) -> DFResult<Vec<RecordBatch>> {
947    let mut planner = HybridPhysicalPlanner::with_l0_context(
948        session_ctx.clone(),
949        storage.clone(),
950        graph_ctx.l0_context().clone(),
951        graph_ctx.property_manager().clone(),
952        schema_info.clone(),
953        params.clone(),
954        outer_values.clone(),
955    );
956    planner.set_outer_entity_vars(outer_entity_vars.clone());
957
958    // Propagate registries from parent context so procedures remain available
959    // inside correlated subqueries (Apply operator).
960    if let Some(registry) = graph_ctx.algo_registry() {
961        planner = planner.with_algo_registry(registry.clone());
962    }
963    if let Some(registry) = graph_ctx.procedure_registry() {
964        planner = planner.with_procedure_registry(registry.clone());
965    }
966    if let Some(runtime) = graph_ctx.xervo_runtime() {
967        planner = planner.with_xervo_runtime(runtime.clone());
968    }
969
970    // Propagate the outer MutationContext so writes inside CALL { ... }
971    // subqueries (SET, CREATE, MERGE, DELETE) and RecursiveCTE bodies can
972    // route through the same transaction's L0 buffer.
973    if let Some(mc) = mutation_ctx {
974        planner = planner.with_mutation_context(mc.clone());
975    }
976
977    let execution_plan = planner.plan(plan).map_err(|e| {
978        datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {e}"))
979    })?;
980
981    let task_ctx = session_ctx.read().task_ctx();
982    let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
983
984    Ok(all_batches)
985}
986
987/// Extract a single row from a RecordBatch as a HashMap of column name → Value.
988///
989/// Used to build parameters for correlated subqueries (Apply, EXISTS).
990pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
991    let schema = batch.schema();
992    (0..batch.num_columns())
993        .map(|col_idx| {
994            let col_name = schema.field(col_idx).name().clone();
995            let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
996            (col_name, val)
997        })
998        .collect()
999}
1000
1001/// Infer the output schema of a ProcedureCall logical plan node.
1002///
1003/// This is a simplified version of `GraphProcedureCallExec::build_schema()` that
1004/// doesn't require target_properties or graph_ctx. It covers common procedure types
1005/// with basic scalar type inference. For unknown procedures or complex node expansions,
1006/// it falls back to Utf8.
1007fn infer_procedure_call_schema(
1008    procedure_name: &str,
1009    yield_items: &[(String, Option<String>)],
1010    _schema_info: &UniSchema,
1011) -> SchemaRef {
1012    let infer_type = |name: &str| -> DataType {
1013        match procedure_name {
1014            "uni.schema.labels" => match name {
1015                "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
1016                _ => DataType::Utf8,
1017            },
1018            "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
1019                "propertyCount" => DataType::Int64,
1020                _ => DataType::Utf8,
1021            },
1022            "uni.schema.constraints" => match name {
1023                "enabled" => DataType::Boolean,
1024                _ => DataType::Utf8,
1025            },
1026            "uni.schema.labelInfo" => match name {
1027                "nullable" | "indexed" | "unique" => DataType::Boolean,
1028                _ => DataType::Utf8,
1029            },
1030            // Node-yield procedures (search family): infer scalar yield
1031            // types via the canonical yield-name → Arrow-type table.
1032            // Node expansion happens at execution time in
1033            // `GraphProcedureCallExec`.
1034            n if super::procedure_call::is_node_yield_procedure_static(n) => {
1035                super::procedure_call::canonical_search_type(map_yield_to_canonical(name))
1036            }
1037            // uni.schema.indexes, unknown procedures, and fallback: all Utf8
1038            _ => DataType::Utf8,
1039        }
1040    };
1041
1042    let fields: Vec<Field> = yield_items
1043        .iter()
1044        .map(|(name, alias)| {
1045            let col_name = alias.as_ref().unwrap_or(name);
1046            Field::new(col_name, infer_type(name), true)
1047        })
1048        .collect();
1049
1050    Arc::new(Schema::new(fields))
1051}
1052
1053/// Infer the output schema of a logical plan using UniSchema property metadata.
1054///
1055/// This is needed because correlated subqueries reference outer variables that
1056/// don't exist as physical columns at planning time, so we can't dry-run plan
1057/// the subquery to get its schema. Instead we walk the logical plan and use
1058/// `UniSchema` property metadata to infer types.
1059pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1060    // Walk to outermost Project
1061    if let LogicalPlan::Project { projections, .. } = plan {
1062        let fields: Vec<Field> = projections
1063            .iter()
1064            .map(|(expr, alias)| {
1065                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1066                let dt = infer_expr_type(expr, schema_info);
1067                Field::new(name, dt, true)
1068            })
1069            .collect();
1070        return Arc::new(Schema::new(fields));
1071    }
1072
1073    // For non-Project plans, walk through wrapping nodes
1074    match plan {
1075        LogicalPlan::Sort { input, .. }
1076        | LogicalPlan::Limit { input, .. }
1077        | LogicalPlan::Filter { input, .. }
1078        | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1079
1080        LogicalPlan::ProcedureCall {
1081            procedure_name,
1082            yield_items,
1083            ..
1084        } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1085
1086        _ => {
1087            // Fallback: empty schema
1088            Arc::new(Schema::empty())
1089        }
1090    }
1091}
1092
1093/// Infer Arrow DataType for a Cypher expression using schema metadata.
1094fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1095    match expr {
1096        Expr::Property(base, key) => {
1097            if let Expr::Variable(_) = base.as_ref() {
1098                // Look up key across all labels/edge types in schema
1099                for props in schema_info.properties.values() {
1100                    if let Some(meta) = props.get(key.as_str()) {
1101                        return meta.r#type.to_arrow();
1102                    }
1103                }
1104                DataType::LargeBinary
1105            } else {
1106                DataType::LargeBinary
1107            }
1108        }
1109        Expr::BinaryOp { left, op, right } => match op {
1110            BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1111                let lt = infer_expr_type(left, schema_info);
1112                let rt = infer_expr_type(right, schema_info);
1113                numeric_promotion(&lt, &rt)
1114            }
1115            BinaryOp::Eq
1116            | BinaryOp::NotEq
1117            | BinaryOp::Lt
1118            | BinaryOp::LtEq
1119            | BinaryOp::Gt
1120            | BinaryOp::GtEq
1121            | BinaryOp::And
1122            | BinaryOp::Or => DataType::Boolean,
1123            _ => DataType::LargeBinary,
1124        },
1125        Expr::Literal(lit) => match lit {
1126            CypherLiteral::Integer(_) => DataType::Int64,
1127            CypherLiteral::Float(_) => DataType::Float64,
1128            CypherLiteral::String(_) => DataType::Utf8,
1129            CypherLiteral::Bool(_) => DataType::Boolean,
1130            CypherLiteral::Null => DataType::Null,
1131            CypherLiteral::Bytes(_) => DataType::LargeBinary,
1132        },
1133        Expr::Variable(_) => DataType::LargeBinary,
1134        Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1135            "count" => DataType::Int64,
1136            "sum" | "avg" => {
1137                if let Some(arg) = args.first() {
1138                    let arg_type = infer_expr_type(arg, schema_info);
1139                    if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1140                        DataType::Float64
1141                    } else {
1142                        DataType::Int64
1143                    }
1144                } else {
1145                    DataType::Int64
1146                }
1147            }
1148            "min" | "max" => {
1149                if let Some(arg) = args.first() {
1150                    infer_expr_type(arg, schema_info)
1151                } else {
1152                    DataType::LargeBinary
1153                }
1154            }
1155            "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1156            | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1157            "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1158            "tofloat" => DataType::Float64,
1159            "toboolean" => DataType::Boolean,
1160            _ => DataType::LargeBinary,
1161        },
1162        _ => DataType::LargeBinary,
1163    }
1164}
1165
1166/// Numeric type promotion for binary arithmetic.
1167fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1168    match (left, right) {
1169        (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1170        (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1171        (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1172        (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1173        _ => DataType::Int64,
1174    }
1175}
1176
1177/// Evaluate a simple expression to get a `uni_common::Value`.
1178///
1179/// Supports:
1180/// - Literal values
1181/// - Parameter references ($param)
1182/// - Variable references (node/edge variables from MATCH)
1183/// - Literal lists
1184/// - Literal maps ({key: value, ...})
1185pub(crate) fn evaluate_simple_expr(
1186    expr: &Expr,
1187    params: &HashMap<String, Value>,
1188    outer_values: &HashMap<String, Value>,
1189) -> DFResult<Value> {
1190    match expr {
1191        Expr::Literal(lit) => Ok(lit.to_value()),
1192
1193        Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1194            datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1195        }),
1196
1197        Expr::Variable(name) => {
1198            // Node variables are stored as "{name}._vid" in outer_values
1199            let vid_key = format!("{}._vid", name);
1200            if let Some(val) = outer_values.get(&vid_key) {
1201                return Ok(val.clone());
1202            }
1203            // Fall back to plain name (edge variables, scalar columns)
1204            outer_values.get(name).cloned().ok_or_else(|| {
1205                datafusion::error::DataFusionError::Execution(format!(
1206                    "Variable '{}' not found in scope (looked for '{}' and '{}')",
1207                    name, vid_key, name
1208                ))
1209            })
1210        }
1211
1212        Expr::List(items) => {
1213            let values: Vec<Value> = items
1214                .iter()
1215                .map(|item| evaluate_simple_expr(item, params, outer_values))
1216                .collect::<DFResult<_>>()?;
1217            Ok(Value::List(values))
1218        }
1219
1220        Expr::Map(entries) => {
1221            let map: HashMap<String, Value> = entries
1222                .iter()
1223                .map(|(k, v)| {
1224                    evaluate_simple_expr(v, params, outer_values).map(|val| (k.clone(), val))
1225                })
1226                .collect::<DFResult<_>>()?;
1227            Ok(Value::Map(map))
1228        }
1229
1230        _ => Err(datafusion::error::DataFusionError::Execution(format!(
1231            "Unsupported expression type for procedure argument: {:?}",
1232            expr
1233        ))),
1234    }
1235}
1236
1237/// Merge edge property metadata across multiple edge types.
1238///
1239/// When a traversal spans several edge types, property columns must accommodate
1240/// all of them. This function collects property metadata from each type and
1241/// resolves conflicts: if two types define the same property with different
1242/// data types, the merged type widens to `CypherValue`. Nullability is merged
1243/// with OR (if either is nullable, the result is nullable).
1244pub fn merged_edge_schema_props(
1245    uni_schema: &UniSchema,
1246    edge_type_ids: &[u32],
1247) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1248    let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1249    let mut sorted_ids = edge_type_ids.to_vec();
1250    sorted_ids.sort_unstable();
1251
1252    for edge_type_id in sorted_ids {
1253        if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1254            && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1255        {
1256            for (prop_name, meta) in props {
1257                match merged.get_mut(prop_name) {
1258                    Some(existing) => {
1259                        if existing.r#type != meta.r#type {
1260                            existing.r#type = uni_common::core::schema::DataType::CypherValue;
1261                        }
1262                        existing.nullable |= meta.nullable;
1263                    }
1264                    None => {
1265                        merged.insert(prop_name.clone(), meta.clone());
1266                    }
1267                }
1268            }
1269        }
1270    }
1271
1272    merged
1273}
1274
1275// ---------------------------------------------------------------------------
1276// Shared key extraction for Locy operators (Priority, Fold, BestBy, Fixpoint)
1277// ---------------------------------------------------------------------------
1278
1279/// A hashable scalar key extracted from an Arrow array row.
1280///
1281/// Used across Locy operators for grouping and deduplication.
1282#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1283pub(crate) enum ScalarKey {
1284    Null,
1285    Bool(bool),
1286    Int64(i64),
1287    Utf8(String),
1288    Binary(Vec<u8>),
1289}
1290
1291/// Extract a composite key from a row of a `RecordBatch`.
1292///
1293/// For each column index in `key_indices`, reads the scalar value at `row_idx`
1294/// and converts it to a `ScalarKey`. Float64 values are hashed by their bit
1295/// representation for exact grouping.
1296pub(crate) fn extract_scalar_key(
1297    batch: &RecordBatch,
1298    key_indices: &[usize],
1299    row_idx: usize,
1300) -> Vec<ScalarKey> {
1301    use arrow::array::Array;
1302    key_indices
1303        .iter()
1304        .map(|&col_idx| {
1305            let col = batch.column(col_idx);
1306            if col.is_null(row_idx) {
1307                return ScalarKey::Null;
1308            }
1309            match col.data_type() {
1310                arrow_schema::DataType::Boolean => {
1311                    let arr = col
1312                        .as_any()
1313                        .downcast_ref::<arrow_array::BooleanArray>()
1314                        .unwrap();
1315                    ScalarKey::Bool(arr.value(row_idx))
1316                }
1317                arrow_schema::DataType::Int64 => {
1318                    let arr = col
1319                        .as_any()
1320                        .downcast_ref::<arrow_array::Int64Array>()
1321                        .unwrap();
1322                    ScalarKey::Int64(arr.value(row_idx))
1323                }
1324                arrow_schema::DataType::Utf8 => {
1325                    let arr = col
1326                        .as_any()
1327                        .downcast_ref::<arrow_array::StringArray>()
1328                        .unwrap();
1329                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1330                }
1331                arrow_schema::DataType::LargeBinary => {
1332                    let arr = col
1333                        .as_any()
1334                        .downcast_ref::<arrow_array::LargeBinaryArray>()
1335                        .unwrap();
1336                    ScalarKey::Binary(arr.value(row_idx).to_vec())
1337                }
1338                arrow_schema::DataType::Float64 => {
1339                    // Hash f64 as bits for grouping
1340                    let arr = col
1341                        .as_any()
1342                        .downcast_ref::<arrow_array::Float64Array>()
1343                        .unwrap();
1344                    ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1345                }
1346                arrow_schema::DataType::LargeUtf8 => {
1347                    let arr = col
1348                        .as_any()
1349                        .downcast_ref::<arrow_array::LargeStringArray>()
1350                        .unwrap();
1351                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1352                }
1353                _ => {
1354                    // Fallback (including Struct): use arrow display formatter
1355                    let formatter = arrow::util::display::ArrayFormatter::try_new(
1356                        col.as_ref(),
1357                        &arrow::util::display::FormatOptions::default(),
1358                    );
1359                    match formatter {
1360                        Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1361                        Err(_) => ScalarKey::Utf8(format!("opaque@{row_idx}")),
1362                    }
1363                }
1364            }
1365        })
1366        .collect()
1367}
1368
1369// `calculate_score` moved to `uni-query-functions::similar_to` (a pure
1370// leaf module). Re-export for back-compat with code that imports it via
1371// `df_graph::common::calculate_score`.
1372pub use uni_query_functions::similar_to::calculate_score;
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377    use arrow_array::{LargeBinaryArray, UInt64Array};
1378    use arrow_schema::Schema;
1379
1380    #[test]
1381    fn test_extract_row_params_loses_uint64_to_int() {
1382        let schema = Arc::new(Schema::new(vec![Field::new(
1383            "n._vid",
1384            DataType::UInt64,
1385            true,
1386        )]));
1387        let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1388            .expect("batch should be valid");
1389
1390        let params = extract_row_params(&batch, 0);
1391        assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1392    }
1393
1394    #[test]
1395    fn test_extract_row_params_decodes_largebinary_to_map() {
1396        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1397        let schema = Arc::new(Schema::new(vec![Field::new(
1398            "m._all_props",
1399            DataType::LargeBinary,
1400            true,
1401        )]));
1402        let batch = RecordBatch::try_new(
1403            schema,
1404            vec![Arc::new(LargeBinaryArray::from(vec![Some(
1405                encoded.as_slice(),
1406            )]))],
1407        )
1408        .expect("batch should be valid");
1409
1410        let params = extract_row_params(&batch, 0);
1411        assert_eq!(
1412            params.get("m._all_props"),
1413            Some(&Value::Map(HashMap::new()))
1414        );
1415    }
1416}