Skip to main content

uni_query/query/df_graph/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Common helpers shared across graph execution plan implementations.
5//!
6//! This module provides shared utilities to reduce code duplication across
7//! the df_graph module's execution plan implementations.
8
9use arrow_array::{ArrayRef, RecordBatch};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use datafusion::arrow::array::Array;
12use datafusion::common::Result as DFResult;
13use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
14use datafusion::physical_plan::PlanProperties;
15use datafusion::prelude::SessionContext;
16use futures::TryStreamExt;
17use parking_lot::RwLock;
18use std::collections::HashMap;
19use std::sync::Arc;
20use uni_common::Value;
21use uni_common::core::schema::Schema as UniSchema;
22use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr};
23use uni_store::storage::manager::StorageManager;
24
25use super::GraphExecutionContext;
26use super::procedure_call::map_yield_to_canonical;
27use super::unwind::arrow_to_json_value;
28use crate::query::df_graph::MutationContext;
29use crate::query::df_planner::HybridPhysicalPlanner;
30use crate::query::executor::core::{OperatorStats, collect_plan_metrics};
31use crate::query::planner::LogicalPlan;
32
33/// Convert an `ArrowError` into a `DataFusionError`.
34///
35/// Wraps the Arrow-level error into DataFusion's `ArrowError` variant. Use this
36/// with `.map_err(arrow_err)` when calling raw Arrow compute kernels from
37/// DataFusion execution plans.
38pub fn arrow_err(e: arrow::error::ArrowError) -> datafusion::error::DataFusionError {
39    datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
40}
41
42/// Compute standard plan properties for graph operators.
43///
44/// All graph operators use the same plan properties:
45/// - Unknown partitioning with 1 partition
46/// - Incremental emission type
47/// - Bounded execution
48pub fn compute_plan_properties(schema: SchemaRef) -> Arc<PlanProperties> {
49    Arc::new(PlanProperties::new(
50        EquivalenceProperties::new(schema),
51        Partitioning::UnknownPartitioning(1),
52        datafusion::physical_plan::execution_plan::EmissionType::Incremental,
53        datafusion::physical_plan::execution_plan::Boundedness::Bounded,
54    ))
55}
56
57/// Return the Arrow `DataType` for `_labels` columns: `List<Utf8>`.
58///
59/// This is used across scan, traverse, bind, and other modules whenever a
60/// `_labels` field needs to be declared in a schema. Centralizing the
61/// definition avoids divergence and reduces boilerplate.
62pub fn labels_data_type() -> DataType {
63    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
64}
65
66/// Extract a `UInt64Array` of vertex/edge IDs from an Arrow column.
67///
68/// Accepts both `UInt64` (native VID type) and `Int64` (from parameter
69/// injection where `arrow_to_json_value` round-trips through `Value::Int`).
70/// For `Int64` columns the values are cast to `UInt64`.
71///
72/// # Errors
73///
74/// Returns a `DataFusionError::Execution` if the column is neither `UInt64`
75/// nor `Int64`.
76pub fn column_as_vid_array(
77    col: &dyn arrow_array::Array,
78) -> datafusion::error::Result<std::borrow::Cow<'_, arrow_array::UInt64Array>> {
79    use arrow_array::{Int64Array, StructArray, UInt64Array};
80    use arrow_schema::DataType;
81
82    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
83        return Ok(std::borrow::Cow::Borrowed(arr));
84    }
85
86    if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
87        let cast: UInt64Array = arr.iter().map(|v| v.map(|i| i as u64)).collect();
88        return Ok(std::borrow::Cow::Owned(cast));
89    }
90
91    // Support entity-struct aliases (e.g., WITH coalesce(b, c) AS x) where
92    // traversal inputs may provide the source as a Struct with an "_vid" field.
93    if let Some(arr) = col.as_any().downcast_ref::<StructArray>()
94        && let DataType::Struct(fields) = arr.data_type()
95        && let Some((vid_idx, _)) = fields.find("_vid")
96    {
97        return column_as_vid_array(arr.column(vid_idx).as_ref());
98    }
99
100    // Support CypherValue-encoded Node values in LargeBinary columns
101    // (e.g., from list comprehension loop variables over node collections)
102    // Also handles JSON round-tripped nodes (Value::Map with _id field)
103    if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
104        let vids = vids_from_large_binary(arr);
105        return Ok(std::borrow::Cow::Owned(vids));
106    }
107
108    // OPTIONAL MATCH can produce all-null columns with Arrow Null type
109    if *col.data_type() == DataType::Null {
110        let vids: UInt64Array = (0..col.len()).map(|_| None::<u64>).collect();
111        return Ok(std::borrow::Cow::Owned(vids));
112    }
113
114    Err(datafusion::error::DataFusionError::Execution(format!(
115        "VID column has type {:?}, expected UInt64 or Int64",
116        col.data_type()
117    )))
118}
119
120/// Extract a VID from a CypherValue.
121///
122/// Handles both `Value::Node` (native node) and `Value::Map` with `_id` field
123/// (JSON round-tripped node from `cv_array_to_large_list`).
124fn extract_vid_from_value(val: &Value) -> Option<u64> {
125    match val {
126        Value::Node(node) => Some(node.vid.as_u64()),
127        Value::Map(map) => {
128            // Handle round-tripped nodes that became Maps.
129            // Path nodes use struct fields (_vid, _label, properties) which
130            // round-trip through arrow_to_json_value as { "_vid": Int(N), ... }.
131            // Value::Node → serde_json uses { "_id": "N", ... }.
132            // Check both keys to handle either path.
133
134            // Check _vid first (from path struct → arrow_to_json_value round-trip)
135            if let Some(Value::Int(vid)) = map.get("_vid") {
136                return Some(*vid as u64);
137            }
138            // Also check _id (from Value::Node → serde_json round-trip)
139            if let Some(Value::String(id_str)) = map.get("_id") {
140                return id_str
141                    .strip_prefix("Vid(")
142                    .and_then(|s| s.strip_suffix(')'))
143                    .unwrap_or(id_str)
144                    .parse::<u64>()
145                    .ok();
146            }
147            if let Some(Value::Int(id)) = map.get("_id") {
148                return Some(*id as u64);
149            }
150            None
151        }
152        _ => None,
153    }
154}
155
156/// Extract VIDs from a `LargeBinaryArray` of CypherValue-encoded values.
157///
158/// Decodes each element and delegates to [`extract_vid_from_value`].
159/// Null elements and decode failures produce null VID entries.
160fn vids_from_large_binary(arr: &arrow_array::LargeBinaryArray) -> arrow_array::UInt64Array {
161    use uni_common::cypher_value_codec;
162
163    (0..arr.len())
164        .map(|i| {
165            if arr.is_null(i) {
166                return None;
167            }
168            cypher_value_codec::decode(arr.value(i))
169                .ok()
170                .as_ref()
171                .and_then(extract_vid_from_value)
172        })
173        .collect()
174}
175
176/// Extract VIDs from a column of CypherValue-encoded Node values.
177///
178/// Takes a `LargeBinary` array where each element is a CypherValue-encoded
179/// value and extracts VIDs from Node values. Non-Node values produce nulls.
180/// Also handles JSON round-tripped node Maps from `cv_array_to_large_list`.
181pub fn extract_vids_from_cypher_value_column(col: &dyn Array) -> DFResult<arrow_array::ArrayRef> {
182    let binary_col = col
183        .as_any()
184        .downcast_ref::<arrow_array::LargeBinaryArray>()
185        .ok_or_else(|| {
186            datafusion::error::DataFusionError::Execution(
187                "extract_vids_from_cypher_value_column: expected LargeBinary column".to_string(),
188            )
189        })?;
190    Ok(Arc::new(vids_from_large_binary(binary_col)) as arrow_array::ArrayRef)
191}
192
193/// Extract a typed value from a column at a given row index.
194///
195/// Looks up `col_name` in the batch schema, downcasts to `T`, and applies
196/// `extract_fn` if the value is valid. Returns `None` if the column is missing,
197/// the downcast fails, or the value is null.
198pub(crate) fn extract_column_value<T: arrow_array::Array + 'static, R>(
199    batch: &RecordBatch,
200    col_name: &str,
201    row_idx: usize,
202    extract_fn: impl FnOnce(&T, usize) -> R,
203) -> Option<R> {
204    let (idx, _) = batch.schema().column_with_name(col_name)?;
205    let col = batch.column(idx);
206    let arr = col.as_any().downcast_ref::<T>()?;
207    if arr.is_valid(row_idx) {
208        Some(extract_fn(arr, row_idx))
209    } else {
210        None
211    }
212}
213
214/// Build the standard node struct fields for path structures.
215///
216/// Used when materializing path objects containing nodes.
217/// Fields: `_vid`, `_labels`, `properties`
218pub fn node_struct_fields() -> arrow_schema::Fields {
219    arrow_schema::Fields::from(vec![
220        Field::new("_vid", DataType::UInt64, false),
221        Field::new("_labels", labels_data_type(), true),
222        Field::new("properties", DataType::LargeBinary, true),
223    ])
224}
225
226/// Build the standard edge struct fields for path structures.
227///
228/// Used when materializing path objects containing edges.
229/// Fields: `_eid`, `_type_name`, `_src`, `_dst`, `properties`
230pub fn edge_struct_fields() -> arrow_schema::Fields {
231    arrow_schema::Fields::from(vec![
232        Field::new("_eid", DataType::UInt64, false),
233        Field::new("_type_name", DataType::Utf8, false),
234        Field::new("_src", DataType::UInt64, false),
235        Field::new("_dst", DataType::UInt64, false),
236        Field::new("properties", DataType::LargeBinary, true),
237    ])
238}
239
240/// Encode a properties HashMap to CypherValue bytes for LargeBinary columns.
241///
242/// Used when materializing path properties that need to be stored in LargeBinary
243/// columns. Converts the HashMap into a `Value::Map` and encodes it using the
244/// CypherValue codec.
245pub fn encode_props_to_cv(props: &std::collections::HashMap<String, uni_common::Value>) -> Vec<u8> {
246    let val = uni_common::Value::Map(props.clone());
247    uni_common::cypher_value_codec::encode(&val)
248}
249
250/// Build edge list field for schema with given step variable name.
251///
252/// Creates a list of edge structs for the relationship variable in VLP patterns.
253/// For example, `r` in `MATCH (a)-[r*1..3]->(b)` gets a `List<EdgeStruct>`.
254pub fn build_edge_list_field(step_var: &str) -> Field {
255    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
256    // Field must be nullable to support OPTIONAL MATCH unmatched (r = NULL)
257    Field::new(step_var, DataType::List(Arc::new(edge_item)), true)
258}
259
260/// Build path struct field for schema with given path variable name.
261///
262/// Creates a struct field with `nodes` and `relationships` lists.
263pub fn build_path_struct_field(path_var: &str) -> Field {
264    let node_item = Field::new("item", DataType::Struct(node_struct_fields()), true);
265    let nodes_field = Field::new("nodes", DataType::List(Arc::new(node_item)), true);
266
267    let edge_item = Field::new("item", DataType::Struct(edge_struct_fields()), true);
268    let relationships_field =
269        Field::new("relationships", DataType::List(Arc::new(edge_item)), true);
270
271    Field::new(
272        path_var,
273        DataType::Struct(arrow_schema::Fields::from(vec![
274            nodes_field,
275            relationships_field,
276        ])),
277        true,
278    )
279}
280
281/// Extend an input schema with a path struct field.
282///
283/// Clones the fields from `input_schema` and appends a path struct field
284/// using [`build_path_struct_field`].
285pub fn extend_schema_with_path(input_schema: SchemaRef, path_variable: &str) -> SchemaRef {
286    let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
287    fields.push(Arc::new(build_path_struct_field(path_variable)));
288    Arc::new(Schema::new(fields))
289}
290
291/// Build a path struct array from nodes and relationships list arrays.
292///
293/// Combines the nodes and relationships arrays into a single `StructArray` with
294/// the standard path structure (`nodes`, `relationships`), applying the given
295/// validity mask.
296pub fn build_path_struct_array(
297    nodes_array: ArrayRef,
298    rels_array: ArrayRef,
299    path_validity: Vec<bool>,
300) -> DFResult<arrow_array::StructArray> {
301    Ok(arrow_array::StructArray::try_new(
302        arrow_schema::Fields::from(vec![
303            Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true)),
304            Arc::new(Field::new(
305                "relationships",
306                rels_array.data_type().clone(),
307                true,
308            )),
309        ]),
310        vec![nodes_array, rels_array],
311        Some(arrow::buffer::NullBuffer::from(path_validity)),
312    )?)
313}
314
315/// Create a `ListBuilder<StructBuilder>` for building edge list arrays.
316///
317/// Used when materializing edge lists for step variables (`r` in `[r*1..3]`)
318/// and path relationship arrays. Returns a builder whose struct fields match
319/// `edge_struct_fields()`.
320pub fn new_edge_list_builder()
321-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
322    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, StructBuilder, UInt64Builder};
323    arrow_array::builder::ListBuilder::new(StructBuilder::new(
324        edge_struct_fields(),
325        vec![
326            Box::new(UInt64Builder::new()),
327            Box::new(StringBuilder::new()),
328            Box::new(UInt64Builder::new()),
329            Box::new(UInt64Builder::new()),
330            Box::new(LargeBinaryBuilder::new()),
331        ],
332    ))
333}
334
335/// Create a `ListBuilder<StructBuilder>` for building node list arrays.
336///
337/// Used when materializing path node arrays. Returns a builder whose struct
338/// fields match `node_struct_fields()`.
339pub fn new_node_list_builder()
340-> arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder> {
341    use arrow_array::builder::{
342        LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
343    };
344    arrow_array::builder::ListBuilder::new(StructBuilder::new(
345        node_struct_fields(),
346        vec![
347            Box::new(UInt64Builder::new()),
348            Box::new(ListBuilder::new(StringBuilder::new())),
349            Box::new(LargeBinaryBuilder::new()),
350        ],
351    ))
352}
353
354/// Append a single edge to an edge struct builder.
355///
356/// Writes `_eid`, `_type_name`, `_src`, `_dst`, and `properties` fields,
357/// then appends the struct row. The `query_ctx` is used to look up edge
358/// properties from the L0 visibility chain.
359pub fn append_edge_to_struct(
360    struct_builder: &mut arrow_array::builder::StructBuilder,
361    eid: uni_common::core::id::Eid,
362    type_name: &str,
363    src_vid: u64,
364    dst_vid: u64,
365    query_ctx: &uni_store::runtime::context::QueryContext,
366) {
367    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
368    use uni_store::runtime::l0_visibility;
369
370    struct_builder
371        .field_builder::<UInt64Builder>(0)
372        .unwrap()
373        .append_value(eid.as_u64());
374    struct_builder
375        .field_builder::<StringBuilder>(1)
376        .unwrap()
377        .append_value(type_name);
378    struct_builder
379        .field_builder::<UInt64Builder>(2)
380        .unwrap()
381        .append_value(src_vid);
382    struct_builder
383        .field_builder::<UInt64Builder>(3)
384        .unwrap()
385        .append_value(dst_vid);
386    let props_builder = struct_builder
387        .field_builder::<LargeBinaryBuilder>(4)
388        .unwrap();
389    if let Some(props) = l0_visibility::get_edge_properties(eid, query_ctx) {
390        let cv_bytes = encode_props_to_cv(&props);
391        props_builder.append_value(&cv_bytes);
392    } else {
393        props_builder.append_null();
394    }
395    struct_builder.append(true);
396}
397
398/// Append a null edge struct row (placeholder values + null validity).
399///
400/// Arrow struct builders require all field builders to advance even for null rows.
401/// This appends default placeholder values and marks the struct row as null.
402fn append_null_edge_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
403    use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
404
405    struct_builder
406        .field_builder::<UInt64Builder>(0)
407        .unwrap()
408        .append_value(0);
409    struct_builder
410        .field_builder::<StringBuilder>(1)
411        .unwrap()
412        .append_value("");
413    struct_builder
414        .field_builder::<UInt64Builder>(2)
415        .unwrap()
416        .append_value(0);
417    struct_builder
418        .field_builder::<UInt64Builder>(3)
419        .unwrap()
420        .append_value(0);
421    struct_builder
422        .field_builder::<LargeBinaryBuilder>(4)
423        .unwrap()
424        .append_null();
425    struct_builder.append(false);
426}
427
428/// Append an edge to a struct builder, handling the `Option<Eid>` case.
429///
430/// When `eid` is `Some`, resolves the type name from `batch_type_name` (primary)
431/// or L0 visibility (fallback), then delegates to [`append_edge_to_struct`].
432/// When `eid` is `None`, appends a null struct row.
433pub fn append_edge_to_struct_optional(
434    struct_builder: &mut arrow_array::builder::StructBuilder,
435    eid: Option<uni_common::core::id::Eid>,
436    src_vid: u64,
437    dst_vid: u64,
438    batch_type_name: Option<String>,
439    query_ctx: &uni_store::runtime::context::QueryContext,
440) {
441    match eid {
442        Some(e) => {
443            use uni_store::runtime::l0_visibility;
444            let type_name = batch_type_name
445                .or_else(|| l0_visibility::get_edge_type(e, query_ctx))
446                .unwrap_or_default();
447            append_edge_to_struct(struct_builder, e, &type_name, src_vid, dst_vid, query_ctx);
448        }
449        None => append_null_edge_struct(struct_builder),
450    }
451}
452
453/// Append a single node to a node struct builder.
454///
455/// Writes `_vid`, `_labels`, and `properties` fields, then appends the struct
456/// row. The `query_ctx` is used to look up labels and properties from the L0
457/// visibility chain.
458pub fn append_node_to_struct(
459    struct_builder: &mut arrow_array::builder::StructBuilder,
460    vid: uni_common::core::id::Vid,
461    query_ctx: &uni_store::runtime::context::QueryContext,
462) {
463    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
464    use uni_store::runtime::l0_visibility;
465
466    struct_builder
467        .field_builder::<UInt64Builder>(0)
468        .unwrap()
469        .append_value(vid.as_u64());
470    let labels = l0_visibility::get_vertex_labels(vid, query_ctx);
471    let labels_builder = struct_builder
472        .field_builder::<ListBuilder<StringBuilder>>(1)
473        .unwrap();
474    let values = labels_builder.values();
475    for lbl in &labels {
476        values.append_value(lbl);
477    }
478    labels_builder.append(true);
479    let props_builder = struct_builder
480        .field_builder::<LargeBinaryBuilder>(2)
481        .unwrap();
482    if let Some(props) = l0_visibility::get_vertex_properties(vid, query_ctx) {
483        let cv_bytes = encode_props_to_cv(&props);
484        props_builder.append_value(&cv_bytes);
485    } else {
486        props_builder.append_null();
487    }
488    struct_builder.append(true);
489}
490
491/// Append a null node struct row (placeholder values + null validity).
492///
493/// Arrow struct builders require all field builders to advance even for null rows.
494/// This appends default placeholder values and marks the struct row as null.
495fn append_null_node_struct(struct_builder: &mut arrow_array::builder::StructBuilder) {
496    use arrow_array::builder::{LargeBinaryBuilder, ListBuilder, StringBuilder, UInt64Builder};
497
498    struct_builder
499        .field_builder::<UInt64Builder>(0)
500        .unwrap()
501        .append_value(0);
502    struct_builder
503        .field_builder::<ListBuilder<StringBuilder>>(1)
504        .unwrap()
505        .append(true);
506    struct_builder
507        .field_builder::<LargeBinaryBuilder>(2)
508        .unwrap()
509        .append_null();
510    struct_builder.append(false);
511}
512
513/// Append a node to a struct builder, handling the `Option<Vid>` case.
514///
515/// When `vid` is `Some`, delegates to [`append_node_to_struct`].
516/// When `vid` is `None`, appends a null struct row.
517pub fn append_node_to_struct_optional(
518    struct_builder: &mut arrow_array::builder::StructBuilder,
519    vid: Option<uni_common::core::id::Vid>,
520    query_ctx: &uni_store::runtime::context::QueryContext,
521) {
522    match vid {
523        Some(v) => append_node_to_struct(struct_builder, v, query_ctx),
524        None => append_null_node_struct(struct_builder),
525    }
526}
527
528/// Re-encode a `LargeListArray` of CypherValue elements into a `LargeBinaryArray` of CypherValue arrays.
529///
530/// Each row in the input `LargeListArray` contains zero or more `LargeBinary`
531/// elements that are individually CypherValue-encoded values. This function decodes
532/// each element, wraps them into a `serde_json::Value::Array`, and re-encodes
533/// the whole array as a single CypherValue blob in the output `LargeBinaryArray`.
534///
535/// Null rows in the input produce null entries in the output.
536///
537/// # Errors
538///
539/// Returns a `DataFusionError::Execution` if the input is not a
540/// `LargeListArray` or if CypherValue decoding fails.
541pub fn large_list_of_cv_to_cv_array(
542    list: &datafusion::arrow::array::LargeListArray,
543) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
544    use datafusion::arrow::array::{LargeBinaryArray, LargeBinaryBuilder};
545
546    let values = list.values();
547    let binary_values = values
548        .as_any()
549        .downcast_ref::<LargeBinaryArray>()
550        .ok_or_else(|| {
551            datafusion::error::DataFusionError::Execution(
552                "large_list_of_cv_to_cv_array: inner values must be LargeBinaryArray".to_string(),
553            )
554        })?;
555
556    let mut builder = LargeBinaryBuilder::new();
557
558    for row_idx in 0..list.len() {
559        if list.is_null(row_idx) {
560            builder.append_null();
561            continue;
562        }
563
564        let start = list.offsets()[row_idx] as usize;
565        let end = list.offsets()[row_idx + 1] as usize;
566
567        // Stay in Value-space when rebuilding the CypherValue list. The
568        // `Value -> serde_json::Value` bridge base64-stringifies `Value::Bytes`,
569        // silently corrupting raw-`Bytes` list elements; the codec round-trips
570        // `Bytes` and CV-encoded values identically, so this detour is lossless.
571        let mut items = Vec::with_capacity(end - start);
572        for elem_idx in start..end {
573            if binary_values.is_null(elem_idx) {
574                items.push(uni_common::Value::Null);
575            } else {
576                let blob = binary_values.value(elem_idx);
577                match uni_common::cypher_value_codec::decode(blob) {
578                    Ok(uni_val) => items.push(uni_val),
579                    Err(_) => items.push(uni_common::Value::Null),
580                }
581            }
582        }
583
584        let bytes = uni_common::cypher_value_codec::encode(&uni_common::Value::List(items));
585        builder.append_value(&bytes);
586    }
587
588    Ok(Arc::new(builder.finish()))
589}
590
591/// Convert a single Arrow array element at `idx` to `serde_json::Value`.
592///
593/// Handles the common scalar types (UInt64, Int64, Float64, Utf8, Boolean, LargeBinary).
594/// Returns `serde_json::Value::Null` for null values or unsupported types.
595fn arrow_element_to_json(
596    col: &dyn datafusion::arrow::array::Array,
597    idx: usize,
598) -> serde_json::Value {
599    use datafusion::arrow::array::{
600        BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array,
601    };
602
603    if col.is_null(idx) {
604        return serde_json::Value::Null;
605    }
606
607    if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
608        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
609    } else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
610        serde_json::Value::Number(serde_json::Number::from(arr.value(idx)))
611    } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
612        serde_json::Number::from_f64(arr.value(idx))
613            .map(serde_json::Value::Number)
614            .unwrap_or(serde_json::Value::Null)
615    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
616        serde_json::Value::String(arr.value(idx).to_string())
617    } else if let Some(arr) = col.as_any().downcast_ref::<BooleanArray>() {
618        serde_json::Value::Bool(arr.value(idx))
619    } else if let Some(arr) = col.as_any().downcast_ref::<arrow_array::LargeBinaryArray>() {
620        uni_common::cypher_value_codec::decode(arr.value(idx))
621            .map(|v| v.into())
622            .unwrap_or(serde_json::Value::Null)
623    } else {
624        serde_json::Value::Null
625    }
626}
627
628/// Convert a typed `LargeListArray` to a `LargeBinaryArray` of CypherValue arrays.
629///
630/// Each row in the input `LargeListArray` contains zero or more elements of a
631/// specific type (Int64, Float64, Utf8, Boolean, or nested LargeBinary). This
632/// function converts each row into a JSON array and encodes it as a CypherValue blob.
633///
634/// If the inner type is already `LargeBinary` (CypherValue), delegates to
635/// `large_list_of_cv_to_cv_array()`.
636///
637/// Null rows in the input produce null entries in the output.
638///
639/// # Errors
640///
641/// Returns a `DataFusionError::Execution` if CypherValue encoding fails.
642pub fn typed_large_list_to_cv_array(
643    list: &datafusion::arrow::array::LargeListArray,
644) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
645    use datafusion::arrow::array::{LargeBinaryBuilder, StructArray};
646
647    let values = list.values();
648
649    // If inner type is LargeBinary, delegate to existing function
650    if values.data_type() == &DataType::LargeBinary {
651        return large_list_of_cv_to_cv_array(list);
652    }
653
654    // Build the element-to-JSON converter closure. For Struct arrays, we need
655    // to iterate over fields; for scalar arrays, use arrow_element_to_json directly.
656    let elem_to_json: Box<dyn Fn(usize) -> serde_json::Value> = match values.data_type() {
657        DataType::UInt64
658        | DataType::Int64
659        | DataType::Float64
660        | DataType::Utf8
661        | DataType::Boolean => {
662            let values = values.clone();
663            Box::new(move |idx| arrow_element_to_json(values.as_ref(), idx))
664        }
665        DataType::Struct(_) => {
666            let typed = values
667                .as_any()
668                .downcast_ref::<StructArray>()
669                .ok_or_else(|| {
670                    datafusion::error::DataFusionError::Execution(
671                        "Expected StructArray".to_string(),
672                    )
673                })?;
674            let fields: Vec<_> = typed.fields().iter().cloned().collect();
675            let columns: Vec<_> = (0..typed.num_columns())
676                .map(|i| typed.column(i).clone())
677                .collect();
678            let nulls = typed.nulls().cloned();
679            Box::new(move |idx| {
680                if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
681                    return serde_json::Value::Null;
682                }
683                let mut map = serde_json::Map::new();
684                for (field_idx, field) in fields.iter().enumerate() {
685                    let value = arrow_element_to_json(columns[field_idx].as_ref(), idx);
686                    map.insert(field.name().clone(), value);
687                }
688                serde_json::Value::Object(map)
689            })
690        }
691        other => {
692            return Err(datafusion::error::DataFusionError::Execution(format!(
693                "Unsupported element type for typed_large_list_to_cv_array: {:?}",
694                other
695            )));
696        }
697    };
698
699    let mut builder = LargeBinaryBuilder::new();
700
701    for row_idx in 0..list.len() {
702        if list.is_null(row_idx) {
703            builder.append_null();
704            continue;
705        }
706
707        let start = list.offsets()[row_idx] as usize;
708        let end = list.offsets()[row_idx + 1] as usize;
709        let json_elements: Vec<serde_json::Value> = (start..end).map(&elem_to_json).collect();
710
711        let uni_val: uni_common::Value = serde_json::Value::Array(json_elements).into();
712        let bytes = uni_common::cypher_value_codec::encode(&uni_val);
713        builder.append_value(&bytes);
714    }
715
716    Ok(Arc::new(builder.finish()))
717}
718
719/// Convert a `LargeBinaryArray` of CypherValue-encoded arrays into a `LargeListArray`.
720///
721/// Each element in the input array is a CypherValue blob encoding a JSON array (e.g. `[1,2,3]`).
722/// Elements are converted to the specified `element_type`. For example, if `element_type`
723/// is `Int64`, CypherValue numbers are parsed as i64 values.
724///
725/// Non-array CypherValue values and nulls produce empty lists.
726pub fn cv_array_to_large_list(
727    array: &dyn datafusion::arrow::array::Array,
728    element_type: &DataType,
729) -> datafusion::error::Result<Arc<dyn datafusion::arrow::array::Array>> {
730    use datafusion::arrow::array::LargeBinaryArray;
731    use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
732
733    let binary_arr = array
734        .as_any()
735        .downcast_ref::<LargeBinaryArray>()
736        .ok_or_else(|| {
737            datafusion::error::DataFusionError::Execution(
738                "cv_array_to_large_list: expected LargeBinaryArray".to_string(),
739            )
740        })?;
741
742    // Collect all elements across all rows in Value-space (not serde_json).
743    // The `Value -> serde_json::Value` bridge base64-stringifies `Value::Bytes`,
744    // which corrupts raw-`Bytes` list elements (e.g. `[x IN collect(b.data) | x]`).
745    // The codec round-trips `Bytes` and CV-encoded values identically, so staying
746    // in Value-space is lossless for every element kind.
747    let num_rows = binary_arr.len();
748    let mut all_elements: Vec<Vec<uni_common::Value>> = Vec::with_capacity(num_rows);
749    let mut nulls = Vec::with_capacity(num_rows);
750
751    for i in 0..num_rows {
752        if binary_arr.is_null(i) {
753            all_elements.push(Vec::new());
754            nulls.push(false);
755            continue;
756        }
757
758        let blob = binary_arr.value(i);
759        match uni_common::cypher_value_codec::decode(blob) {
760            Ok(uni_common::Value::List(elements)) => {
761                all_elements.push(elements);
762                nulls.push(true);
763            }
764            Ok(_) => {
765                all_elements.push(Vec::new());
766                nulls.push(true);
767            }
768            Err(_) => {
769                all_elements.push(Vec::new());
770                nulls.push(false);
771            }
772        }
773    }
774
775    // Build typed values array and offsets
776    let mut offsets: Vec<i64> = Vec::with_capacity(num_rows + 1);
777    offsets.push(0);
778
779    let values_array: Arc<dyn datafusion::arrow::array::Array> = match element_type {
780        DataType::Int64 => {
781            let mut builder = datafusion::arrow::array::builder::Int64Builder::new();
782            for elems in &all_elements {
783                for elem in elems {
784                    match elem {
785                        uni_common::Value::Int(i) => builder.append_value(*i),
786                        uni_common::Value::Float(f) => builder.append_value(*f as i64),
787                        _ => builder.append_null(),
788                    }
789                }
790                offsets.push(offsets.last().unwrap() + elems.len() as i64);
791            }
792            Arc::new(builder.finish())
793        }
794        DataType::Float64 => {
795            let mut builder = datafusion::arrow::array::builder::Float64Builder::new();
796            for elems in &all_elements {
797                for elem in elems {
798                    match elem {
799                        uni_common::Value::Float(f) => builder.append_value(*f),
800                        uni_common::Value::Int(i) => builder.append_value(*i as f64),
801                        _ => builder.append_null(),
802                    }
803                }
804                offsets.push(offsets.last().unwrap() + elems.len() as i64);
805            }
806            Arc::new(builder.finish())
807        }
808        DataType::Utf8 | DataType::LargeUtf8 => {
809            let mut builder = datafusion::arrow::array::builder::StringBuilder::new();
810            for elems in &all_elements {
811                for elem in elems {
812                    match elem {
813                        uni_common::Value::String(s) => builder.append_value(s),
814                        uni_common::Value::Null => builder.append_null(),
815                        // Non-string element in a Utf8-typed list: preserve the
816                        // prior rendering (never reached for raw `Bytes` lists).
817                        other => {
818                            let json: serde_json::Value = other.clone().into();
819                            builder.append_value(json.to_string());
820                        }
821                    }
822                }
823                offsets.push(offsets.last().unwrap() + elems.len() as i64);
824            }
825            Arc::new(builder.finish())
826        }
827        DataType::Boolean => {
828            let mut builder = datafusion::arrow::array::builder::BooleanBuilder::new();
829            for elems in &all_elements {
830                for elem in elems {
831                    match elem {
832                        uni_common::Value::Bool(b) => builder.append_value(*b),
833                        _ => builder.append_null(),
834                    }
835                }
836                offsets.push(offsets.last().unwrap() + elems.len() as i64);
837            }
838            Arc::new(builder.finish())
839        }
840        // Fallback: keep as CypherValue LargeBinary blobs. Encoding straight from
841        // the decoded `Value` preserves raw `Bytes` (and CV-encoded values).
842        _ => {
843            let mut builder = datafusion::arrow::array::builder::LargeBinaryBuilder::new();
844            for elems in &all_elements {
845                for elem in elems {
846                    let bytes = uni_common::cypher_value_codec::encode(elem);
847                    builder.append_value(&bytes);
848                }
849                offsets.push(offsets.last().unwrap() + elems.len() as i64);
850            }
851            Arc::new(builder.finish())
852        }
853    };
854
855    let field = Arc::new(Field::new("item", element_type.clone(), true));
856    let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
857    let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
858
859    let large_list = datafusion::arrow::array::LargeListArray::new(
860        field,
861        offset_buffer,
862        values_array,
863        Some(null_buffer),
864    );
865
866    Ok(Arc::new(large_list))
867}
868
869/// Collect all record batches from all partitions of an execution plan.
870///
871/// Iterates over each partition, executes it, and collects all resulting
872/// batches into a single `Vec`. Shared by `execute_subplan` and `run_apply`.
873pub async fn collect_all_partitions(
874    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
875    task_ctx: Arc<datafusion::execution::TaskContext>,
876) -> DFResult<Vec<RecordBatch>> {
877    let partition_count = plan.properties().output_partitioning().partition_count();
878
879    let mut all_batches = Vec::new();
880    for partition in 0..partition_count {
881        let stream = plan.execute(partition, task_ctx.clone())?;
882        let batches: Vec<RecordBatch> = stream.try_collect().await?;
883        all_batches.extend(batches);
884    }
885    Ok(all_batches)
886}
887
888/// Execute a logical plan using a fresh HybridPhysicalPlanner with the given params.
889///
890/// Shared by `RecursiveCTEExec`, `GraphApplyExec`, and `ExistsExecExpr`.
891/// Pass `mutation_ctx = Some(...)` to enable writes inside the sub-plan
892/// (`CALL { ... SET/CREATE/MERGE/DELETE }`). `None` is appropriate for
893/// read-only sub-plans (Locy rule evaluation, EXISTS predicate).
894#[expect(
895    clippy::too_many_arguments,
896    reason = "Threading mutation_ctx for CALL subquery writes"
897)]
898pub async fn execute_subplan(
899    plan: &LogicalPlan,
900    params: &HashMap<String, Value>,
901    outer_values: &HashMap<String, Value>,
902    graph_ctx: &Arc<GraphExecutionContext>,
903    session_ctx: &Arc<RwLock<SessionContext>>,
904    storage: &Arc<StorageManager>,
905    schema_info: &Arc<UniSchema>,
906    mutation_ctx: Option<&Arc<MutationContext>>,
907) -> DFResult<Vec<RecordBatch>> {
908    let (batches, _plan) = execute_subplan_with_outer_vars(
909        plan,
910        params,
911        outer_values,
912        graph_ctx,
913        session_ctx,
914        storage,
915        schema_info,
916        &std::collections::HashSet::new(),
917        mutation_ctx,
918    )
919    .await?;
920    Ok(batches)
921}
922
923/// Like [`execute_subplan`], but also returns the per-operator metrics of the
924/// clause-body physical plan for profiling.
925///
926/// Used by the Locy `profile()` path to capture a Cypher-style operator tree
927/// ([`OperatorStats`]) for each rule clause body, per fixpoint iteration. The
928/// metrics are read from the plan's `BaselineMetrics` after execution via
929/// `collect_plan_metrics`, so this MUST run the plan to completion first
930/// (which it does, like `execute_subplan`). The non-profiled path calls
931/// [`execute_subplan`] instead and pays nothing.
932#[expect(
933    clippy::too_many_arguments,
934    reason = "Mirrors execute_subplan's parameter list; profiling adds only the return value"
935)]
936pub async fn execute_subplan_collecting(
937    plan: &LogicalPlan,
938    params: &HashMap<String, Value>,
939    outer_values: &HashMap<String, Value>,
940    graph_ctx: &Arc<GraphExecutionContext>,
941    session_ctx: &Arc<RwLock<SessionContext>>,
942    storage: &Arc<StorageManager>,
943    schema_info: &Arc<UniSchema>,
944    mutation_ctx: Option<&Arc<MutationContext>>,
945) -> DFResult<(Vec<RecordBatch>, Vec<OperatorStats>)> {
946    let (batches, plan) = execute_subplan_with_outer_vars(
947        plan,
948        params,
949        outer_values,
950        graph_ctx,
951        session_ctx,
952        storage,
953        schema_info,
954        &std::collections::HashSet::new(),
955        mutation_ctx,
956    )
957    .await?;
958    let operators = collect_plan_metrics(&plan);
959    Ok((batches, operators))
960}
961
962#[expect(
963    clippy::too_many_arguments,
964    reason = "Threading outer_entity_vars for nested EXISTS and mutation_ctx for CALL writes"
965)]
966/// Like `execute_subplan`, but with outer entity variable names for nested EXISTS.
967///
968/// The `outer_entity_vars` set is threaded to the `HybridPhysicalPlanner` so its
969/// expression compiler can distinguish fresh pattern bindings from correlated
970/// references in nested pattern-predicate EXISTS.
971pub async fn execute_subplan_with_outer_vars(
972    plan: &LogicalPlan,
973    params: &HashMap<String, Value>,
974    outer_values: &HashMap<String, Value>,
975    graph_ctx: &Arc<GraphExecutionContext>,
976    session_ctx: &Arc<RwLock<SessionContext>>,
977    storage: &Arc<StorageManager>,
978    schema_info: &Arc<UniSchema>,
979    outer_entity_vars: &std::collections::HashSet<String>,
980    mutation_ctx: Option<&Arc<MutationContext>>,
981) -> DFResult<(
982    Vec<RecordBatch>,
983    Arc<dyn datafusion::physical_plan::ExecutionPlan>,
984)> {
985    let mut planner = HybridPhysicalPlanner::with_l0_context(
986        session_ctx.clone(),
987        storage.clone(),
988        graph_ctx.l0_context().clone(),
989        graph_ctx.property_manager().clone(),
990        schema_info.clone(),
991        params.clone(),
992        outer_values.clone(),
993    );
994    planner.set_outer_entity_vars(outer_entity_vars.clone());
995
996    // Propagate registries from parent context so procedures remain available
997    // inside correlated subqueries (Apply operator).
998    if let Some(registry) = graph_ctx.algo_registry() {
999        planner = planner.with_algo_registry(registry.clone());
1000    }
1001    if let Some(registry) = graph_ctx.procedure_registry() {
1002        planner = planner.with_procedure_registry(registry.clone());
1003    }
1004    if let Some(runtime) = graph_ctx.xervo_runtime() {
1005        planner = planner.with_xervo_runtime(runtime.clone());
1006    }
1007
1008    // Propagate the outer MutationContext so writes inside CALL { ... }
1009    // subqueries (SET, CREATE, MERGE, DELETE) and RecursiveCTE bodies can
1010    // route through the same transaction's L0 buffer.
1011    if let Some(mc) = mutation_ctx {
1012        planner = planner.with_mutation_context(mc.clone());
1013    }
1014
1015    let execution_plan = planner.plan(plan).map_err(|e| {
1016        datafusion::error::DataFusionError::Execution(format!("Sub-plan error: {e}"))
1017    })?;
1018
1019    let task_ctx = session_ctx.read().task_ctx();
1020    let all_batches = collect_all_partitions(&execution_plan, task_ctx).await?;
1021
1022    Ok((all_batches, execution_plan))
1023}
1024
1025/// Extract a single row from a RecordBatch as a HashMap of column name → Value.
1026///
1027/// Used to build parameters for correlated subqueries (Apply, EXISTS).
1028pub fn extract_row_params(batch: &RecordBatch, row_idx: usize) -> HashMap<String, Value> {
1029    let schema = batch.schema();
1030    (0..batch.num_columns())
1031        .map(|col_idx| {
1032            let col_name = schema.field(col_idx).name().clone();
1033            let val = arrow_to_json_value(batch.column(col_idx).as_ref(), row_idx);
1034            (col_name, val)
1035        })
1036        .collect()
1037}
1038
1039/// Infer the output schema of a ProcedureCall logical plan node.
1040///
1041/// This is a simplified version of `GraphProcedureCallExec::build_schema()` that
1042/// doesn't require target_properties or graph_ctx. It covers common procedure types
1043/// with basic scalar type inference. For unknown procedures or complex node expansions,
1044/// it falls back to Utf8.
1045fn infer_procedure_call_schema(
1046    procedure_name: &str,
1047    yield_items: &[(String, Option<String>)],
1048    _schema_info: &UniSchema,
1049) -> SchemaRef {
1050    let infer_type = |name: &str| -> DataType {
1051        match procedure_name {
1052            "uni.schema.labels" => match name {
1053                "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
1054                _ => DataType::Utf8,
1055            },
1056            "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => match name {
1057                "propertyCount" => DataType::Int64,
1058                _ => DataType::Utf8,
1059            },
1060            "uni.schema.constraints" => match name {
1061                "enabled" => DataType::Boolean,
1062                _ => DataType::Utf8,
1063            },
1064            "uni.schema.labelInfo" => match name {
1065                "nullable" | "indexed" | "unique" => DataType::Boolean,
1066                _ => DataType::Utf8,
1067            },
1068            // Node-yield procedures (search family): infer scalar yield
1069            // types via the canonical yield-name → Arrow-type table.
1070            // Node expansion happens at execution time in
1071            // `GraphProcedureCallExec`.
1072            n if super::procedure_call::is_node_yield_procedure_static(n) => {
1073                super::procedure_call::canonical_search_type(map_yield_to_canonical(name))
1074            }
1075            // uni.schema.indexes, unknown procedures, and fallback: all Utf8
1076            _ => DataType::Utf8,
1077        }
1078    };
1079
1080    let fields: Vec<Field> = yield_items
1081        .iter()
1082        .map(|(name, alias)| {
1083            let col_name = alias.as_ref().unwrap_or(name);
1084            Field::new(col_name, infer_type(name), true)
1085        })
1086        .collect();
1087
1088    Arc::new(Schema::new(fields))
1089}
1090
1091/// Infer the output schema of a logical plan using UniSchema property metadata.
1092///
1093/// This is needed because correlated subqueries reference outer variables that
1094/// don't exist as physical columns at planning time, so we can't dry-run plan
1095/// the subquery to get its schema. Instead we walk the logical plan and use
1096/// `UniSchema` property metadata to infer types.
1097pub fn infer_logical_plan_schema(plan: &LogicalPlan, schema_info: &UniSchema) -> SchemaRef {
1098    // Walk to outermost Project
1099    if let LogicalPlan::Project { projections, .. } = plan {
1100        let fields: Vec<Field> = projections
1101            .iter()
1102            .map(|(expr, alias)| {
1103                let name = alias.clone().unwrap_or_else(|| expr.to_string_repr());
1104                let dt = infer_expr_type(expr, schema_info);
1105                Field::new(name, dt, true)
1106            })
1107            .collect();
1108        return Arc::new(Schema::new(fields));
1109    }
1110
1111    // For non-Project plans, walk through wrapping nodes
1112    match plan {
1113        LogicalPlan::Sort { input, .. }
1114        | LogicalPlan::Limit { input, .. }
1115        | LogicalPlan::Filter { input, .. }
1116        | LogicalPlan::Distinct { input } => infer_logical_plan_schema(input, schema_info),
1117
1118        LogicalPlan::ProcedureCall {
1119            procedure_name,
1120            yield_items,
1121            ..
1122        } => infer_procedure_call_schema(procedure_name, yield_items, schema_info),
1123
1124        _ => {
1125            // Fallback: empty schema
1126            Arc::new(Schema::empty())
1127        }
1128    }
1129}
1130
1131/// Infer Arrow DataType for a Cypher expression using schema metadata.
1132fn infer_expr_type(expr: &Expr, schema_info: &UniSchema) -> DataType {
1133    match expr {
1134        Expr::Property(base, key) => {
1135            if let Expr::Variable(_) = base.as_ref() {
1136                // Look up key across all labels/edge types in schema
1137                for props in schema_info.properties.values() {
1138                    if let Some(meta) = props.get(key.as_str()) {
1139                        return meta.r#type.to_arrow();
1140                    }
1141                }
1142                DataType::LargeBinary
1143            } else {
1144                DataType::LargeBinary
1145            }
1146        }
1147        Expr::BinaryOp { left, op, right } => match op {
1148            BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Mod => {
1149                let lt = infer_expr_type(left, schema_info);
1150                let rt = infer_expr_type(right, schema_info);
1151                numeric_promotion(&lt, &rt)
1152            }
1153            BinaryOp::Eq
1154            | BinaryOp::NotEq
1155            | BinaryOp::Lt
1156            | BinaryOp::LtEq
1157            | BinaryOp::Gt
1158            | BinaryOp::GtEq
1159            | BinaryOp::And
1160            | BinaryOp::Or => DataType::Boolean,
1161            _ => DataType::LargeBinary,
1162        },
1163        Expr::Literal(lit) => match lit {
1164            CypherLiteral::Integer(_) => DataType::Int64,
1165            CypherLiteral::Float(_) => DataType::Float64,
1166            CypherLiteral::String(_) => DataType::Utf8,
1167            CypherLiteral::Bool(_) => DataType::Boolean,
1168            CypherLiteral::Null => DataType::Null,
1169            CypherLiteral::Bytes(_) => DataType::LargeBinary,
1170        },
1171        Expr::Variable(_) => DataType::LargeBinary,
1172        Expr::FunctionCall { name, args, .. } => match name.to_lowercase().as_str() {
1173            "count" => DataType::Int64,
1174            "sum" | "avg" => {
1175                if let Some(arg) = args.first() {
1176                    let arg_type = infer_expr_type(arg, schema_info);
1177                    if matches!(arg_type, DataType::Float32 | DataType::Float64) {
1178                        DataType::Float64
1179                    } else {
1180                        DataType::Int64
1181                    }
1182                } else {
1183                    DataType::Int64
1184                }
1185            }
1186            "min" | "max" => {
1187                if let Some(arg) = args.first() {
1188                    infer_expr_type(arg, schema_info)
1189                } else {
1190                    DataType::LargeBinary
1191                }
1192            }
1193            "tostring" | "trim" | "ltrim" | "rtrim" | "tolower" | "toupper" | "left" | "right"
1194            | "substring" | "replace" | "reverse" | "type" => DataType::Utf8,
1195            "tointeger" | "toint" | "size" | "length" | "id" => DataType::Int64,
1196            "tofloat" => DataType::Float64,
1197            // Numeric statistical aggregates always yield Float64.
1198            "stdev" | "stddev" | "stdevp" | "stddevp" | "variance" | "variancep" => {
1199                DataType::Float64
1200            }
1201            "toboolean" => DataType::Boolean,
1202            _ => DataType::LargeBinary,
1203        },
1204        _ => DataType::LargeBinary,
1205    }
1206}
1207
1208/// Numeric type promotion for binary arithmetic.
1209fn numeric_promotion(left: &DataType, right: &DataType) -> DataType {
1210    match (left, right) {
1211        (DataType::Float64, _) | (_, DataType::Float64) => DataType::Float64,
1212        (DataType::Float32, _) | (_, DataType::Float32) => DataType::Float64,
1213        (DataType::Int64, _) | (_, DataType::Int64) => DataType::Int64,
1214        (DataType::Int32, _) | (_, DataType::Int32) => DataType::Int64,
1215        _ => DataType::Int64,
1216    }
1217}
1218
1219/// Evaluate a simple expression to get a `uni_common::Value`.
1220///
1221/// Supports:
1222/// - Literal values
1223/// - Parameter references ($param)
1224/// - Variable references (node/edge variables from MATCH)
1225/// - Literal lists
1226/// - Literal maps ({key: value, ...})
1227pub(crate) fn evaluate_simple_expr(
1228    expr: &Expr,
1229    params: &HashMap<String, Value>,
1230    outer_values: &HashMap<String, Value>,
1231) -> DFResult<Value> {
1232    match expr {
1233        Expr::Literal(lit) => Ok(lit.to_value()),
1234
1235        Expr::Parameter(name) => params.get(name).cloned().ok_or_else(|| {
1236            datafusion::error::DataFusionError::Execution(format!("Parameter '{}' not found", name))
1237        }),
1238
1239        Expr::Variable(name) => {
1240            // Node variables are stored as "{name}._vid" in outer_values
1241            let vid_key = format!("{}._vid", name);
1242            if let Some(val) = outer_values.get(&vid_key) {
1243                return Ok(val.clone());
1244            }
1245            // Fall back to plain name (edge variables, scalar columns)
1246            outer_values.get(name).cloned().ok_or_else(|| {
1247                datafusion::error::DataFusionError::Execution(format!(
1248                    "Variable '{}' not found in scope (looked for '{}' and '{}')",
1249                    name, vid_key, name
1250                ))
1251            })
1252        }
1253
1254        Expr::List(items) => {
1255            let values: Vec<Value> = items
1256                .iter()
1257                .map(|item| evaluate_simple_expr(item, params, outer_values))
1258                .collect::<DFResult<_>>()?;
1259            Ok(Value::List(values))
1260        }
1261
1262        Expr::Map(entries) => {
1263            let map: HashMap<String, Value> = entries
1264                .iter()
1265                .map(|(k, v)| {
1266                    evaluate_simple_expr(v, params, outer_values).map(|val| (k.clone(), val))
1267                })
1268                .collect::<DFResult<_>>()?;
1269            Ok(Value::Map(map))
1270        }
1271
1272        _ => Err(datafusion::error::DataFusionError::Execution(format!(
1273            "Unsupported expression type for procedure argument: {:?}",
1274            expr
1275        ))),
1276    }
1277}
1278
1279/// Merge edge property metadata across multiple edge types.
1280///
1281/// When a traversal spans several edge types, property columns must accommodate
1282/// all of them. This function collects property metadata from each type and
1283/// resolves conflicts: if two types define the same property with different
1284/// data types, the merged type widens to `CypherValue`. Nullability is merged
1285/// with OR (if either is nullable, the result is nullable).
1286pub fn merged_edge_schema_props(
1287    uni_schema: &UniSchema,
1288    edge_type_ids: &[u32],
1289) -> HashMap<String, uni_common::core::schema::PropertyMeta> {
1290    let mut merged: HashMap<String, uni_common::core::schema::PropertyMeta> = HashMap::new();
1291    let mut sorted_ids = edge_type_ids.to_vec();
1292    sorted_ids.sort_unstable();
1293
1294    for edge_type_id in sorted_ids {
1295        if let Some(edge_type_name) = uni_schema.edge_type_name_by_id_unified(edge_type_id)
1296            && let Some(props) = uni_schema.properties.get(edge_type_name.as_str())
1297        {
1298            for (prop_name, meta) in props {
1299                match merged.get_mut(prop_name) {
1300                    Some(existing) => {
1301                        if existing.r#type != meta.r#type {
1302                            existing.r#type = uni_common::core::schema::DataType::CypherValue;
1303                        }
1304                        existing.nullable |= meta.nullable;
1305                    }
1306                    None => {
1307                        merged.insert(prop_name.clone(), meta.clone());
1308                    }
1309                }
1310            }
1311        }
1312    }
1313
1314    merged
1315}
1316
1317// ---------------------------------------------------------------------------
1318// Shared key extraction for Locy operators (Priority, Fold, BestBy, Fixpoint)
1319// ---------------------------------------------------------------------------
1320
1321/// A hashable scalar key extracted from an Arrow array row.
1322///
1323/// Used across Locy operators for grouping and deduplication.
1324#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1325pub(crate) enum ScalarKey {
1326    Null,
1327    Bool(bool),
1328    Int64(i64),
1329    Utf8(String),
1330    Binary(Vec<u8>),
1331}
1332
1333/// Extract a composite key from a row of a `RecordBatch`.
1334///
1335/// For each column index in `key_indices`, reads the scalar value at `row_idx`
1336/// and converts it to a `ScalarKey`. Float64 values are hashed by their bit
1337/// representation for exact grouping.
1338pub(crate) fn extract_scalar_key(
1339    batch: &RecordBatch,
1340    key_indices: &[usize],
1341    row_idx: usize,
1342) -> Vec<ScalarKey> {
1343    use arrow::array::Array;
1344    key_indices
1345        .iter()
1346        .map(|&col_idx| {
1347            let col = batch.column(col_idx);
1348            if col.is_null(row_idx) {
1349                return ScalarKey::Null;
1350            }
1351            match col.data_type() {
1352                arrow_schema::DataType::Boolean => {
1353                    let arr = col
1354                        .as_any()
1355                        .downcast_ref::<arrow_array::BooleanArray>()
1356                        .unwrap();
1357                    ScalarKey::Bool(arr.value(row_idx))
1358                }
1359                arrow_schema::DataType::Int64 => {
1360                    let arr = col
1361                        .as_any()
1362                        .downcast_ref::<arrow_array::Int64Array>()
1363                        .unwrap();
1364                    ScalarKey::Int64(arr.value(row_idx))
1365                }
1366                arrow_schema::DataType::Utf8 => {
1367                    let arr = col
1368                        .as_any()
1369                        .downcast_ref::<arrow_array::StringArray>()
1370                        .unwrap();
1371                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1372                }
1373                arrow_schema::DataType::LargeBinary => {
1374                    let arr = col
1375                        .as_any()
1376                        .downcast_ref::<arrow_array::LargeBinaryArray>()
1377                        .unwrap();
1378                    ScalarKey::Binary(arr.value(row_idx).to_vec())
1379                }
1380                arrow_schema::DataType::Float64 => {
1381                    // Hash f64 as bits for grouping
1382                    let arr = col
1383                        .as_any()
1384                        .downcast_ref::<arrow_array::Float64Array>()
1385                        .unwrap();
1386                    ScalarKey::Int64(arr.value(row_idx).to_bits() as i64)
1387                }
1388                arrow_schema::DataType::LargeUtf8 => {
1389                    let arr = col
1390                        .as_any()
1391                        .downcast_ref::<arrow_array::LargeStringArray>()
1392                        .unwrap();
1393                    ScalarKey::Utf8(arr.value(row_idx).to_string())
1394                }
1395                _ => {
1396                    // Fallback (including Struct): use arrow display formatter
1397                    let formatter = arrow::util::display::ArrayFormatter::try_new(
1398                        col.as_ref(),
1399                        &arrow::util::display::FormatOptions::default(),
1400                    );
1401                    match formatter {
1402                        Ok(f) => ScalarKey::Utf8(f.value(row_idx).to_string()),
1403                        Err(_) => ScalarKey::Utf8(format!("opaque@{row_idx}")),
1404                    }
1405                }
1406            }
1407        })
1408        .collect()
1409}
1410
1411// `calculate_score` moved to `uni-query-functions::similar_to` (a pure
1412// leaf module). Re-export for back-compat with code that imports it via
1413// `df_graph::common::calculate_score`.
1414pub use uni_query_functions::similar_to::calculate_score;
1415
1416#[cfg(test)]
1417mod tests {
1418    use super::*;
1419    use arrow_array::{LargeBinaryArray, UInt64Array};
1420    use arrow_schema::Schema;
1421
1422    #[test]
1423    fn test_extract_row_params_loses_uint64_to_int() {
1424        let schema = Arc::new(Schema::new(vec![Field::new(
1425            "n._vid",
1426            DataType::UInt64,
1427            true,
1428        )]));
1429        let batch = RecordBatch::try_new(schema, vec![Arc::new(UInt64Array::from(vec![Some(7)]))])
1430            .expect("batch should be valid");
1431
1432        let params = extract_row_params(&batch, 0);
1433        assert_eq!(params.get("n._vid"), Some(&Value::Int(7)));
1434    }
1435
1436    #[test]
1437    fn test_extract_row_params_decodes_largebinary_to_map() {
1438        let encoded = uni_common::cypher_value_codec::encode(&Value::Map(HashMap::new()));
1439        let schema = Arc::new(Schema::new(vec![Field::new(
1440            "m._all_props",
1441            DataType::LargeBinary,
1442            true,
1443        )]));
1444        let batch = RecordBatch::try_new(
1445            schema,
1446            vec![Arc::new(LargeBinaryArray::from(vec![Some(
1447                encoded.as_slice(),
1448            )]))],
1449        )
1450        .expect("batch should be valid");
1451
1452        let params = extract_row_params(&batch, 0);
1453        assert_eq!(
1454            params.get("m._all_props"),
1455            Some(&Value::Map(HashMap::new()))
1456        );
1457    }
1458}